1
0
mirror of https://github.com/djohnlewis/stackdump synced 2025-12-07 00:13:33 +00:00

Initial commit. Still building up the env and some parsing code.

This commit is contained in:
Samuel Lai
2011-09-11 14:29:39 +10:00
commit af2eafeccd
301 changed files with 82327 additions and 0 deletions

View File

@@ -0,0 +1,45 @@
"""CherryPy Library"""
# Deprecated in CherryPy 3.2 -- remove in CherryPy 3.3
from cherrypy.lib.reprconf import _Builder, unrepr, modules, attributes
class file_generator(object):
"""Yield the given input (a file object) in chunks (default 64k). (Core)"""
def __init__(self, input, chunkSize=65536):
self.input = input
self.chunkSize = chunkSize
def __iter__(self):
return self
def __next__(self):
chunk = self.input.read(self.chunkSize)
if chunk:
return chunk
else:
if hasattr(self.input, 'close'):
self.input.close()
raise StopIteration()
next = __next__
def file_generator_limited(fileobj, count, chunk_size=65536):
"""Yield the given file object in chunks, stopping after `count`
bytes has been emitted. Default chunk size is 64kB. (Core)
"""
remaining = count
while remaining > 0:
chunk = fileobj.read(min(chunk_size, remaining))
chunklen = len(chunk)
if chunklen == 0:
return
remaining -= chunklen
yield chunk
def set_vary_header(response, header_name):
"Add a Vary header to a response"
varies = response.headers.get("Vary", "")
varies = [x.strip() for x in varies.split(",") if x.strip()]
if header_name not in varies:
varies.append(header_name)
response.headers['Vary'] = ", ".join(varies)

View File

@@ -0,0 +1,87 @@
import cherrypy
from cherrypy.lib import httpauth
def check_auth(users, encrypt=None, realm=None):
"""If an authorization header contains credentials, return True, else False."""
request = cherrypy.serving.request
if 'authorization' in request.headers:
# make sure the provided credentials are correctly set
ah = httpauth.parseAuthorization(request.headers['authorization'])
if ah is None:
raise cherrypy.HTTPError(400, 'Bad Request')
if not encrypt:
encrypt = httpauth.DIGEST_AUTH_ENCODERS[httpauth.MD5]
if hasattr(users, '__call__'):
try:
# backward compatibility
users = users() # expect it to return a dictionary
if not isinstance(users, dict):
raise ValueError("Authentication users must be a dictionary")
# fetch the user password
password = users.get(ah["username"], None)
except TypeError:
# returns a password (encrypted or clear text)
password = users(ah["username"])
else:
if not isinstance(users, dict):
raise ValueError("Authentication users must be a dictionary")
# fetch the user password
password = users.get(ah["username"], None)
# validate the authorization by re-computing it here
# and compare it with what the user-agent provided
if httpauth.checkResponse(ah, password, method=request.method,
encrypt=encrypt, realm=realm):
request.login = ah["username"]
return True
request.login = False
return False
def basic_auth(realm, users, encrypt=None, debug=False):
"""If auth fails, raise 401 with a basic authentication header.
realm
A string containing the authentication realm.
users
A dict of the form: {username: password} or a callable returning a dict.
encrypt
callable used to encrypt the password returned from the user-agent.
if None it defaults to a md5 encryption.
"""
if check_auth(users, encrypt):
if debug:
cherrypy.log('Auth successful', 'TOOLS.BASIC_AUTH')
return
# inform the user-agent this path is protected
cherrypy.serving.response.headers['www-authenticate'] = httpauth.basicAuth(realm)
raise cherrypy.HTTPError(401, "You are not authorized to access that resource")
def digest_auth(realm, users, debug=False):
"""If auth fails, raise 401 with a digest authentication header.
realm
A string containing the authentication realm.
users
A dict of the form: {username: password} or a callable returning a dict.
"""
if check_auth(users, realm=realm):
if debug:
cherrypy.log('Auth successful', 'TOOLS.DIGEST_AUTH')
return
# inform the user-agent this path is protected
cherrypy.serving.response.headers['www-authenticate'] = httpauth.digestAuth(realm)
raise cherrypy.HTTPError(401, "You are not authorized to access that resource")

View File

@@ -0,0 +1,87 @@
# This file is part of CherryPy <http://www.cherrypy.org/>
# -*- coding: utf-8 -*-
# vim:ts=4:sw=4:expandtab:fileencoding=utf-8
__doc__ = """This module provides a CherryPy 3.x tool which implements
the server-side of HTTP Basic Access Authentication, as described in :rfc:`2617`.
Example usage, using the built-in checkpassword_dict function which uses a dict
as the credentials store::
userpassdict = {'bird' : 'bebop', 'ornette' : 'wayout'}
checkpassword = cherrypy.lib.auth_basic.checkpassword_dict(userpassdict)
basic_auth = {'tools.auth_basic.on': True,
'tools.auth_basic.realm': 'earth',
'tools.auth_basic.checkpassword': checkpassword,
}
app_config = { '/' : basic_auth }
"""
__author__ = 'visteya'
__date__ = 'April 2009'
import binascii
from cherrypy._cpcompat import base64_decode
import cherrypy
def checkpassword_dict(user_password_dict):
"""Returns a checkpassword function which checks credentials
against a dictionary of the form: {username : password}.
If you want a simple dictionary-based authentication scheme, use
checkpassword_dict(my_credentials_dict) as the value for the
checkpassword argument to basic_auth().
"""
def checkpassword(realm, user, password):
p = user_password_dict.get(user)
return p and p == password or False
return checkpassword
def basic_auth(realm, checkpassword, debug=False):
"""A CherryPy tool which hooks at before_handler to perform
HTTP Basic Access Authentication, as specified in :rfc:`2617`.
If the request has an 'authorization' header with a 'Basic' scheme, this
tool attempts to authenticate the credentials supplied in that header. If
the request has no 'authorization' header, or if it does but the scheme is
not 'Basic', or if authentication fails, the tool sends a 401 response with
a 'WWW-Authenticate' Basic header.
realm
A string containing the authentication realm.
checkpassword
A callable which checks the authentication credentials.
Its signature is checkpassword(realm, username, password). where
username and password are the values obtained from the request's
'authorization' header. If authentication succeeds, checkpassword
returns True, else it returns False.
"""
if '"' in realm:
raise ValueError('Realm cannot contain the " (quote) character.')
request = cherrypy.serving.request
auth_header = request.headers.get('authorization')
if auth_header is not None:
try:
scheme, params = auth_header.split(' ', 1)
if scheme.lower() == 'basic':
username, password = base64_decode(params).split(':', 1)
if checkpassword(realm, username, password):
if debug:
cherrypy.log('Auth succeeded', 'TOOLS.AUTH_BASIC')
request.login = username
return # successful authentication
except (ValueError, binascii.Error): # split() error, base64.decodestring() error
raise cherrypy.HTTPError(400, 'Bad Request')
# Respond with 401 status and a WWW-Authenticate header
cherrypy.serving.response.headers['www-authenticate'] = 'Basic realm="%s"' % realm
raise cherrypy.HTTPError(401, "You are not authorized to access that resource")

View File

@@ -0,0 +1,365 @@
# This file is part of CherryPy <http://www.cherrypy.org/>
# -*- coding: utf-8 -*-
# vim:ts=4:sw=4:expandtab:fileencoding=utf-8
__doc__ = """An implementation of the server-side of HTTP Digest Access
Authentication, which is described in :rfc:`2617`.
Example usage, using the built-in get_ha1_dict_plain function which uses a dict
of plaintext passwords as the credentials store::
userpassdict = {'alice' : '4x5istwelve'}
get_ha1 = cherrypy.lib.auth_digest.get_ha1_dict_plain(userpassdict)
digest_auth = {'tools.auth_digest.on': True,
'tools.auth_digest.realm': 'wonderland',
'tools.auth_digest.get_ha1': get_ha1,
'tools.auth_digest.key': 'a565c27146791cfb',
}
app_config = { '/' : digest_auth }
"""
__author__ = 'visteya'
__date__ = 'April 2009'
import time
from cherrypy._cpcompat import parse_http_list, parse_keqv_list
import cherrypy
from cherrypy._cpcompat import md5, ntob
md5_hex = lambda s: md5(ntob(s)).hexdigest()
qop_auth = 'auth'
qop_auth_int = 'auth-int'
valid_qops = (qop_auth, qop_auth_int)
valid_algorithms = ('MD5', 'MD5-sess')
def TRACE(msg):
cherrypy.log(msg, context='TOOLS.AUTH_DIGEST')
# Three helper functions for users of the tool, providing three variants
# of get_ha1() functions for three different kinds of credential stores.
def get_ha1_dict_plain(user_password_dict):
"""Returns a get_ha1 function which obtains a plaintext password from a
dictionary of the form: {username : password}.
If you want a simple dictionary-based authentication scheme, with plaintext
passwords, use get_ha1_dict_plain(my_userpass_dict) as the value for the
get_ha1 argument to digest_auth().
"""
def get_ha1(realm, username):
password = user_password_dict.get(username)
if password:
return md5_hex('%s:%s:%s' % (username, realm, password))
return None
return get_ha1
def get_ha1_dict(user_ha1_dict):
"""Returns a get_ha1 function which obtains a HA1 password hash from a
dictionary of the form: {username : HA1}.
If you want a dictionary-based authentication scheme, but with
pre-computed HA1 hashes instead of plain-text passwords, use
get_ha1_dict(my_userha1_dict) as the value for the get_ha1
argument to digest_auth().
"""
def get_ha1(realm, username):
return user_ha1_dict.get(user)
return get_ha1
def get_ha1_file_htdigest(filename):
"""Returns a get_ha1 function which obtains a HA1 password hash from a
flat file with lines of the same format as that produced by the Apache
htdigest utility. For example, for realm 'wonderland', username 'alice',
and password '4x5istwelve', the htdigest line would be::
alice:wonderland:3238cdfe91a8b2ed8e39646921a02d4c
If you want to use an Apache htdigest file as the credentials store,
then use get_ha1_file_htdigest(my_htdigest_file) as the value for the
get_ha1 argument to digest_auth(). It is recommended that the filename
argument be an absolute path, to avoid problems.
"""
def get_ha1(realm, username):
result = None
f = open(filename, 'r')
for line in f:
u, r, ha1 = line.rstrip().split(':')
if u == username and r == realm:
result = ha1
break
f.close()
return result
return get_ha1
def synthesize_nonce(s, key, timestamp=None):
"""Synthesize a nonce value which resists spoofing and can be checked for staleness.
Returns a string suitable as the value for 'nonce' in the www-authenticate header.
s
A string related to the resource, such as the hostname of the server.
key
A secret string known only to the server.
timestamp
An integer seconds-since-the-epoch timestamp
"""
if timestamp is None:
timestamp = int(time.time())
h = md5_hex('%s:%s:%s' % (timestamp, s, key))
nonce = '%s:%s' % (timestamp, h)
return nonce
def H(s):
"""The hash function H"""
return md5_hex(s)
class HttpDigestAuthorization (object):
"""Class to parse a Digest Authorization header and perform re-calculation
of the digest.
"""
def errmsg(self, s):
return 'Digest Authorization header: %s' % s
def __init__(self, auth_header, http_method, debug=False):
self.http_method = http_method
self.debug = debug
scheme, params = auth_header.split(" ", 1)
self.scheme = scheme.lower()
if self.scheme != 'digest':
raise ValueError('Authorization scheme is not "Digest"')
self.auth_header = auth_header
# make a dict of the params
items = parse_http_list(params)
paramsd = parse_keqv_list(items)
self.realm = paramsd.get('realm')
self.username = paramsd.get('username')
self.nonce = paramsd.get('nonce')
self.uri = paramsd.get('uri')
self.method = paramsd.get('method')
self.response = paramsd.get('response') # the response digest
self.algorithm = paramsd.get('algorithm', 'MD5')
self.cnonce = paramsd.get('cnonce')
self.opaque = paramsd.get('opaque')
self.qop = paramsd.get('qop') # qop
self.nc = paramsd.get('nc') # nonce count
# perform some correctness checks
if self.algorithm not in valid_algorithms:
raise ValueError(self.errmsg("Unsupported value for algorithm: '%s'" % self.algorithm))
has_reqd = self.username and \
self.realm and \
self.nonce and \
self.uri and \
self.response
if not has_reqd:
raise ValueError(self.errmsg("Not all required parameters are present."))
if self.qop:
if self.qop not in valid_qops:
raise ValueError(self.errmsg("Unsupported value for qop: '%s'" % self.qop))
if not (self.cnonce and self.nc):
raise ValueError(self.errmsg("If qop is sent then cnonce and nc MUST be present"))
else:
if self.cnonce or self.nc:
raise ValueError(self.errmsg("If qop is not sent, neither cnonce nor nc can be present"))
def __str__(self):
return 'authorization : %s' % self.auth_header
def validate_nonce(self, s, key):
"""Validate the nonce.
Returns True if nonce was generated by synthesize_nonce() and the timestamp
is not spoofed, else returns False.
s
A string related to the resource, such as the hostname of the server.
key
A secret string known only to the server.
Both s and key must be the same values which were used to synthesize the nonce
we are trying to validate.
"""
try:
timestamp, hashpart = self.nonce.split(':', 1)
s_timestamp, s_hashpart = synthesize_nonce(s, key, timestamp).split(':', 1)
is_valid = s_hashpart == hashpart
if self.debug:
TRACE('validate_nonce: %s' % is_valid)
return is_valid
except ValueError: # split() error
pass
return False
def is_nonce_stale(self, max_age_seconds=600):
"""Returns True if a validated nonce is stale. The nonce contains a
timestamp in plaintext and also a secure hash of the timestamp. You should
first validate the nonce to ensure the plaintext timestamp is not spoofed.
"""
try:
timestamp, hashpart = self.nonce.split(':', 1)
if int(timestamp) + max_age_seconds > int(time.time()):
return False
except ValueError: # int() error
pass
if self.debug:
TRACE("nonce is stale")
return True
def HA2(self, entity_body=''):
"""Returns the H(A2) string. See :rfc:`2617` section 3.2.2.3."""
# RFC 2617 3.2.2.3
# If the "qop" directive's value is "auth" or is unspecified, then A2 is:
# A2 = method ":" digest-uri-value
#
# If the "qop" value is "auth-int", then A2 is:
# A2 = method ":" digest-uri-value ":" H(entity-body)
if self.qop is None or self.qop == "auth":
a2 = '%s:%s' % (self.http_method, self.uri)
elif self.qop == "auth-int":
a2 = "%s:%s:%s" % (self.http_method, self.uri, H(entity_body))
else:
# in theory, this should never happen, since I validate qop in __init__()
raise ValueError(self.errmsg("Unrecognized value for qop!"))
return H(a2)
def request_digest(self, ha1, entity_body=''):
"""Calculates the Request-Digest. See :rfc:`2617` section 3.2.2.1.
ha1
The HA1 string obtained from the credentials store.
entity_body
If 'qop' is set to 'auth-int', then A2 includes a hash
of the "entity body". The entity body is the part of the
message which follows the HTTP headers. See :rfc:`2617` section
4.3. This refers to the entity the user agent sent in the request which
has the Authorization header. Typically GET requests don't have an entity,
and POST requests do.
"""
ha2 = self.HA2(entity_body)
# Request-Digest -- RFC 2617 3.2.2.1
if self.qop:
req = "%s:%s:%s:%s:%s" % (self.nonce, self.nc, self.cnonce, self.qop, ha2)
else:
req = "%s:%s" % (self.nonce, ha2)
# RFC 2617 3.2.2.2
#
# If the "algorithm" directive's value is "MD5" or is unspecified, then A1 is:
# A1 = unq(username-value) ":" unq(realm-value) ":" passwd
#
# If the "algorithm" directive's value is "MD5-sess", then A1 is
# calculated only once - on the first request by the client following
# receipt of a WWW-Authenticate challenge from the server.
# A1 = H( unq(username-value) ":" unq(realm-value) ":" passwd )
# ":" unq(nonce-value) ":" unq(cnonce-value)
if self.algorithm == 'MD5-sess':
ha1 = H('%s:%s:%s' % (ha1, self.nonce, self.cnonce))
digest = H('%s:%s' % (ha1, req))
return digest
def www_authenticate(realm, key, algorithm='MD5', nonce=None, qop=qop_auth, stale=False):
"""Constructs a WWW-Authenticate header for Digest authentication."""
if qop not in valid_qops:
raise ValueError("Unsupported value for qop: '%s'" % qop)
if algorithm not in valid_algorithms:
raise ValueError("Unsupported value for algorithm: '%s'" % algorithm)
if nonce is None:
nonce = synthesize_nonce(realm, key)
s = 'Digest realm="%s", nonce="%s", algorithm="%s", qop="%s"' % (
realm, nonce, algorithm, qop)
if stale:
s += ', stale="true"'
return s
def digest_auth(realm, get_ha1, key, debug=False):
"""A CherryPy tool which hooks at before_handler to perform
HTTP Digest Access Authentication, as specified in :rfc:`2617`.
If the request has an 'authorization' header with a 'Digest' scheme, this
tool authenticates the credentials supplied in that header. If
the request has no 'authorization' header, or if it does but the scheme is
not "Digest", or if authentication fails, the tool sends a 401 response with
a 'WWW-Authenticate' Digest header.
realm
A string containing the authentication realm.
get_ha1
A callable which looks up a username in a credentials store
and returns the HA1 string, which is defined in the RFC to be
MD5(username : realm : password). The function's signature is:
``get_ha1(realm, username)``
where username is obtained from the request's 'authorization' header.
If username is not found in the credentials store, get_ha1() returns
None.
key
A secret string known only to the server, used in the synthesis of nonces.
"""
request = cherrypy.serving.request
auth_header = request.headers.get('authorization')
nonce_is_stale = False
if auth_header is not None:
try:
auth = HttpDigestAuthorization(auth_header, request.method, debug=debug)
except ValueError:
raise cherrypy.HTTPError(400, "The Authorization header could not be parsed.")
if debug:
TRACE(str(auth))
if auth.validate_nonce(realm, key):
ha1 = get_ha1(realm, auth.username)
if ha1 is not None:
# note that for request.body to be available we need to hook in at
# before_handler, not on_start_resource like 3.1.x digest_auth does.
digest = auth.request_digest(ha1, entity_body=request.body)
if digest == auth.response: # authenticated
if debug:
TRACE("digest matches auth.response")
# Now check if nonce is stale.
# The choice of ten minutes' lifetime for nonce is somewhat arbitrary
nonce_is_stale = auth.is_nonce_stale(max_age_seconds=600)
if not nonce_is_stale:
request.login = auth.username
if debug:
TRACE("authentication of %s successful" % auth.username)
return
# Respond with 401 status and a WWW-Authenticate header
header = www_authenticate(realm, key, stale=nonce_is_stale)
if debug:
TRACE(header)
cherrypy.serving.response.headers['WWW-Authenticate'] = header
raise cherrypy.HTTPError(401, "You are not authorized to access that resource")

View File

@@ -0,0 +1,465 @@
"""
CherryPy implements a simple caching system as a pluggable Tool. This tool tries
to be an (in-process) HTTP/1.1-compliant cache. It's not quite there yet, but
it's probably good enough for most sites.
In general, GET responses are cached (along with selecting headers) and, if
another request arrives for the same resource, the caching Tool will return 304
Not Modified if possible, or serve the cached response otherwise. It also sets
request.cached to True if serving a cached representation, and sets
request.cacheable to False (so it doesn't get cached again).
If POST, PUT, or DELETE requests are made for a cached resource, they invalidate
(delete) any cached response.
Usage
=====
Configuration file example::
[/]
tools.caching.on = True
tools.caching.delay = 3600
You may use a class other than the default
:class:`MemoryCache<cherrypy.lib.caching.MemoryCache>` by supplying the config
entry ``cache_class``; supply the full dotted name of the replacement class
as the config value. It must implement the basic methods ``get``, ``put``,
``delete``, and ``clear``.
You may set any attribute, including overriding methods, on the cache
instance by providing them in config. The above sets the
:attr:`delay<cherrypy.lib.caching.MemoryCache.delay>` attribute, for example.
"""
import datetime
import sys
import threading
import time
import cherrypy
from cherrypy.lib import cptools, httputil
from cherrypy._cpcompat import copyitems, ntob, set_daemon, sorted
class Cache(object):
"""Base class for Cache implementations."""
def get(self):
"""Return the current variant if in the cache, else None."""
raise NotImplemented
def put(self, obj, size):
"""Store the current variant in the cache."""
raise NotImplemented
def delete(self):
"""Remove ALL cached variants of the current resource."""
raise NotImplemented
def clear(self):
"""Reset the cache to its initial, empty state."""
raise NotImplemented
# ------------------------------- Memory Cache ------------------------------- #
class AntiStampedeCache(dict):
"""A storage system for cached items which reduces stampede collisions."""
def wait(self, key, timeout=5, debug=False):
"""Return the cached value for the given key, or None.
If timeout is not None, and the value is already
being calculated by another thread, wait until the given timeout has
elapsed. If the value is available before the timeout expires, it is
returned. If not, None is returned, and a sentinel placed in the cache
to signal other threads to wait.
If timeout is None, no waiting is performed nor sentinels used.
"""
value = self.get(key)
if isinstance(value, threading._Event):
if timeout is None:
# Ignore the other thread and recalc it ourselves.
if debug:
cherrypy.log('No timeout', 'TOOLS.CACHING')
return None
# Wait until it's done or times out.
if debug:
cherrypy.log('Waiting up to %s seconds' % timeout, 'TOOLS.CACHING')
value.wait(timeout)
if value.result is not None:
# The other thread finished its calculation. Use it.
if debug:
cherrypy.log('Result!', 'TOOLS.CACHING')
return value.result
# Timed out. Stick an Event in the slot so other threads wait
# on this one to finish calculating the value.
if debug:
cherrypy.log('Timed out', 'TOOLS.CACHING')
e = threading.Event()
e.result = None
dict.__setitem__(self, key, e)
return None
elif value is None:
# Stick an Event in the slot so other threads wait
# on this one to finish calculating the value.
if debug:
cherrypy.log('Timed out', 'TOOLS.CACHING')
e = threading.Event()
e.result = None
dict.__setitem__(self, key, e)
return value
def __setitem__(self, key, value):
"""Set the cached value for the given key."""
existing = self.get(key)
dict.__setitem__(self, key, value)
if isinstance(existing, threading._Event):
# Set Event.result so other threads waiting on it have
# immediate access without needing to poll the cache again.
existing.result = value
existing.set()
class MemoryCache(Cache):
"""An in-memory cache for varying response content.
Each key in self.store is a URI, and each value is an AntiStampedeCache.
The response for any given URI may vary based on the values of
"selecting request headers"; that is, those named in the Vary
response header. We assume the list of header names to be constant
for each URI throughout the lifetime of the application, and store
that list in ``self.store[uri].selecting_headers``.
The items contained in ``self.store[uri]`` have keys which are tuples of
request header values (in the same order as the names in its
selecting_headers), and values which are the actual responses.
"""
maxobjects = 1000
"""The maximum number of cached objects; defaults to 1000."""
maxobj_size = 100000
"""The maximum size of each cached object in bytes; defaults to 100 KB."""
maxsize = 10000000
"""The maximum size of the entire cache in bytes; defaults to 10 MB."""
delay = 600
"""Seconds until the cached content expires; defaults to 600 (10 minutes)."""
antistampede_timeout = 5
"""Seconds to wait for other threads to release a cache lock."""
expire_freq = 0.1
"""Seconds to sleep between cache expiration sweeps."""
debug = False
def __init__(self):
self.clear()
# Run self.expire_cache in a separate daemon thread.
t = threading.Thread(target=self.expire_cache, name='expire_cache')
self.expiration_thread = t
set_daemon(t, True)
t.start()
def clear(self):
"""Reset the cache to its initial, empty state."""
self.store = {}
self.expirations = {}
self.tot_puts = 0
self.tot_gets = 0
self.tot_hist = 0
self.tot_expires = 0
self.tot_non_modified = 0
self.cursize = 0
def expire_cache(self):
"""Continuously examine cached objects, expiring stale ones.
This function is designed to be run in its own daemon thread,
referenced at ``self.expiration_thread``.
"""
# It's possible that "time" will be set to None
# arbitrarily, so we check "while time" to avoid exceptions.
# See tickets #99 and #180 for more information.
while time:
now = time.time()
# Must make a copy of expirations so it doesn't change size
# during iteration
for expiration_time, objects in copyitems(self.expirations):
if expiration_time <= now:
for obj_size, uri, sel_header_values in objects:
try:
del self.store[uri][tuple(sel_header_values)]
self.tot_expires += 1
self.cursize -= obj_size
except KeyError:
# the key may have been deleted elsewhere
pass
del self.expirations[expiration_time]
time.sleep(self.expire_freq)
def get(self):
"""Return the current variant if in the cache, else None."""
request = cherrypy.serving.request
self.tot_gets += 1
uri = cherrypy.url(qs=request.query_string)
uricache = self.store.get(uri)
if uricache is None:
return None
header_values = [request.headers.get(h, '')
for h in uricache.selecting_headers]
variant = uricache.wait(key=tuple(sorted(header_values)),
timeout=self.antistampede_timeout,
debug=self.debug)
if variant is not None:
self.tot_hist += 1
return variant
def put(self, variant, size):
"""Store the current variant in the cache."""
request = cherrypy.serving.request
response = cherrypy.serving.response
uri = cherrypy.url(qs=request.query_string)
uricache = self.store.get(uri)
if uricache is None:
uricache = AntiStampedeCache()
uricache.selecting_headers = [
e.value for e in response.headers.elements('Vary')]
self.store[uri] = uricache
if len(self.store) < self.maxobjects:
total_size = self.cursize + size
# checks if there's space for the object
if (size < self.maxobj_size and total_size < self.maxsize):
# add to the expirations list
expiration_time = response.time + self.delay
bucket = self.expirations.setdefault(expiration_time, [])
bucket.append((size, uri, uricache.selecting_headers))
# add to the cache
header_values = [request.headers.get(h, '')
for h in uricache.selecting_headers]
uricache[tuple(sorted(header_values))] = variant
self.tot_puts += 1
self.cursize = total_size
def delete(self):
"""Remove ALL cached variants of the current resource."""
uri = cherrypy.url(qs=cherrypy.serving.request.query_string)
self.store.pop(uri, None)
def get(invalid_methods=("POST", "PUT", "DELETE"), debug=False, **kwargs):
"""Try to obtain cached output. If fresh enough, raise HTTPError(304).
If POST, PUT, or DELETE:
* invalidates (deletes) any cached response for this resource
* sets request.cached = False
* sets request.cacheable = False
else if a cached copy exists:
* sets request.cached = True
* sets request.cacheable = False
* sets response.headers to the cached values
* checks the cached Last-Modified response header against the
current If-(Un)Modified-Since request headers; raises 304
if necessary.
* sets response.status and response.body to the cached values
* returns True
otherwise:
* sets request.cached = False
* sets request.cacheable = True
* returns False
"""
request = cherrypy.serving.request
response = cherrypy.serving.response
if not hasattr(cherrypy, "_cache"):
# Make a process-wide Cache object.
cherrypy._cache = kwargs.pop("cache_class", MemoryCache)()
# Take all remaining kwargs and set them on the Cache object.
for k, v in kwargs.items():
setattr(cherrypy._cache, k, v)
cherrypy._cache.debug = debug
# POST, PUT, DELETE should invalidate (delete) the cached copy.
# See http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html#sec13.10.
if request.method in invalid_methods:
if debug:
cherrypy.log('request.method %r in invalid_methods %r' %
(request.method, invalid_methods), 'TOOLS.CACHING')
cherrypy._cache.delete()
request.cached = False
request.cacheable = False
return False
if 'no-cache' in [e.value for e in request.headers.elements('Pragma')]:
request.cached = False
request.cacheable = True
return False
cache_data = cherrypy._cache.get()
request.cached = bool(cache_data)
request.cacheable = not request.cached
if request.cached:
# Serve the cached copy.
max_age = cherrypy._cache.delay
for v in [e.value for e in request.headers.elements('Cache-Control')]:
atoms = v.split('=', 1)
directive = atoms.pop(0)
if directive == 'max-age':
if len(atoms) != 1 or not atoms[0].isdigit():
raise cherrypy.HTTPError(400, "Invalid Cache-Control header")
max_age = int(atoms[0])
break
elif directive == 'no-cache':
if debug:
cherrypy.log('Ignoring cache due to Cache-Control: no-cache',
'TOOLS.CACHING')
request.cached = False
request.cacheable = True
return False
if debug:
cherrypy.log('Reading response from cache', 'TOOLS.CACHING')
s, h, b, create_time = cache_data
age = int(response.time - create_time)
if (age > max_age):
if debug:
cherrypy.log('Ignoring cache due to age > %d' % max_age,
'TOOLS.CACHING')
request.cached = False
request.cacheable = True
return False
# Copy the response headers. See http://www.cherrypy.org/ticket/721.
response.headers = rh = httputil.HeaderMap()
for k in h:
dict.__setitem__(rh, k, dict.__getitem__(h, k))
# Add the required Age header
response.headers["Age"] = str(age)
try:
# Note that validate_since depends on a Last-Modified header;
# this was put into the cached copy, and should have been
# resurrected just above (response.headers = cache_data[1]).
cptools.validate_since()
except cherrypy.HTTPRedirect:
x = sys.exc_info()[1]
if x.status == 304:
cherrypy._cache.tot_non_modified += 1
raise
# serve it & get out from the request
response.status = s
response.body = b
else:
if debug:
cherrypy.log('request is not cached', 'TOOLS.CACHING')
return request.cached
def tee_output():
"""Tee response output to cache storage. Internal."""
# Used by CachingTool by attaching to request.hooks
request = cherrypy.serving.request
if 'no-store' in request.headers.values('Cache-Control'):
return
def tee(body):
"""Tee response.body into a list."""
if ('no-cache' in response.headers.values('Pragma') or
'no-store' in response.headers.values('Cache-Control')):
for chunk in body:
yield chunk
return
output = []
for chunk in body:
output.append(chunk)
yield chunk
# save the cache data
body = ntob('').join(output)
cherrypy._cache.put((response.status, response.headers or {},
body, response.time), len(body))
response = cherrypy.serving.response
response.body = tee(response.body)
def expires(secs=0, force=False, debug=False):
"""Tool for influencing cache mechanisms using the 'Expires' header.
secs
Must be either an int or a datetime.timedelta, and indicates the
number of seconds between response.time and when the response should
expire. The 'Expires' header will be set to response.time + secs.
If secs is zero, the 'Expires' header is set one year in the past, and
the following "cache prevention" headers are also set:
* Pragma: no-cache
* Cache-Control': no-cache, must-revalidate
force
If False, the following headers are checked:
* Etag
* Last-Modified
* Age
* Expires
If any are already present, none of the above response headers are set.
"""
response = cherrypy.serving.response
headers = response.headers
cacheable = False
if not force:
# some header names that indicate that the response can be cached
for indicator in ('Etag', 'Last-Modified', 'Age', 'Expires'):
if indicator in headers:
cacheable = True
break
if not cacheable and not force:
if debug:
cherrypy.log('request is not cacheable', 'TOOLS.EXPIRES')
else:
if debug:
cherrypy.log('request is cacheable', 'TOOLS.EXPIRES')
if isinstance(secs, datetime.timedelta):
secs = (86400 * secs.days) + secs.seconds
if secs == 0:
if force or ("Pragma" not in headers):
headers["Pragma"] = "no-cache"
if cherrypy.serving.request.protocol >= (1, 1):
if force or "Cache-Control" not in headers:
headers["Cache-Control"] = "no-cache, must-revalidate"
# Set an explicit Expires date in the past.
expiry = httputil.HTTPDate(1169942400.0)
else:
expiry = httputil.HTTPDate(response.time + secs)
if force or "Expires" not in headers:
headers["Expires"] = expiry

View File

@@ -0,0 +1,365 @@
"""Code-coverage tools for CherryPy.
To use this module, or the coverage tools in the test suite,
you need to download 'coverage.py', either Gareth Rees' `original
implementation <http://www.garethrees.org/2001/12/04/python-coverage/>`_
or Ned Batchelder's `enhanced version:
<http://www.nedbatchelder.com/code/modules/coverage.html>`_
To turn on coverage tracing, use the following code::
cherrypy.engine.subscribe('start', covercp.start)
DO NOT subscribe anything on the 'start_thread' channel, as previously
recommended. Calling start once in the main thread should be sufficient
to start coverage on all threads. Calling start again in each thread
effectively clears any coverage data gathered up to that point.
Run your code, then use the ``covercp.serve()`` function to browse the
results in a web browser. If you run this module from the command line,
it will call ``serve()`` for you.
"""
import re
import sys
import cgi
from cherrypy._cpcompat import quote_plus
import os, os.path
localFile = os.path.join(os.path.dirname(__file__), "coverage.cache")
the_coverage = None
try:
from coverage import coverage
the_coverage = coverage(data_file=localFile)
def start():
the_coverage.start()
except ImportError:
# Setting the_coverage to None will raise errors
# that need to be trapped downstream.
the_coverage = None
import warnings
warnings.warn("No code coverage will be performed; coverage.py could not be imported.")
def start():
pass
start.priority = 20
TEMPLATE_MENU = """<html>
<head>
<title>CherryPy Coverage Menu</title>
<style>
body {font: 9pt Arial, serif;}
#tree {
font-size: 8pt;
font-family: Andale Mono, monospace;
white-space: pre;
}
#tree a:active, a:focus {
background-color: black;
padding: 1px;
color: white;
border: 0px solid #9999FF;
-moz-outline-style: none;
}
.fail { color: red;}
.pass { color: #888;}
#pct { text-align: right;}
h3 {
font-size: small;
font-weight: bold;
font-style: italic;
margin-top: 5px;
}
input { border: 1px solid #ccc; padding: 2px; }
.directory {
color: #933;
font-style: italic;
font-weight: bold;
font-size: 10pt;
}
.file {
color: #400;
}
a { text-decoration: none; }
#crumbs {
color: white;
font-size: 8pt;
font-family: Andale Mono, monospace;
width: 100%;
background-color: black;
}
#crumbs a {
color: #f88;
}
#options {
line-height: 2.3em;
border: 1px solid black;
background-color: #eee;
padding: 4px;
}
#exclude {
width: 100%;
margin-bottom: 3px;
border: 1px solid #999;
}
#submit {
background-color: black;
color: white;
border: 0;
margin-bottom: -9px;
}
</style>
</head>
<body>
<h2>CherryPy Coverage</h2>"""
TEMPLATE_FORM = """
<div id="options">
<form action='menu' method=GET>
<input type='hidden' name='base' value='%(base)s' />
Show percentages <input type='checkbox' %(showpct)s name='showpct' value='checked' /><br />
Hide files over <input type='text' id='pct' name='pct' value='%(pct)s' size='3' />%%<br />
Exclude files matching<br />
<input type='text' id='exclude' name='exclude' value='%(exclude)s' size='20' />
<br />
<input type='submit' value='Change view' id="submit"/>
</form>
</div>"""
TEMPLATE_FRAMESET = """<html>
<head><title>CherryPy coverage data</title></head>
<frameset cols='250, 1*'>
<frame src='menu?base=%s' />
<frame name='main' src='' />
</frameset>
</html>
"""
TEMPLATE_COVERAGE = """<html>
<head>
<title>Coverage for %(name)s</title>
<style>
h2 { margin-bottom: .25em; }
p { margin: .25em; }
.covered { color: #000; background-color: #fff; }
.notcovered { color: #fee; background-color: #500; }
.excluded { color: #00f; background-color: #fff; }
table .covered, table .notcovered, table .excluded
{ font-family: Andale Mono, monospace;
font-size: 10pt; white-space: pre; }
.lineno { background-color: #eee;}
.notcovered .lineno { background-color: #000;}
table { border-collapse: collapse;
</style>
</head>
<body>
<h2>%(name)s</h2>
<p>%(fullpath)s</p>
<p>Coverage: %(pc)s%%</p>"""
TEMPLATE_LOC_COVERED = """<tr class="covered">
<td class="lineno">%s&nbsp;</td>
<td>%s</td>
</tr>\n"""
TEMPLATE_LOC_NOT_COVERED = """<tr class="notcovered">
<td class="lineno">%s&nbsp;</td>
<td>%s</td>
</tr>\n"""
TEMPLATE_LOC_EXCLUDED = """<tr class="excluded">
<td class="lineno">%s&nbsp;</td>
<td>%s</td>
</tr>\n"""
TEMPLATE_ITEM = "%s%s<a class='file' href='report?name=%s' target='main'>%s</a>\n"
def _percent(statements, missing):
s = len(statements)
e = s - len(missing)
if s > 0:
return int(round(100.0 * e / s))
return 0
def _show_branch(root, base, path, pct=0, showpct=False, exclude="",
coverage=the_coverage):
# Show the directory name and any of our children
dirs = [k for k, v in root.items() if v]
dirs.sort()
for name in dirs:
newpath = os.path.join(path, name)
if newpath.lower().startswith(base):
relpath = newpath[len(base):]
yield "| " * relpath.count(os.sep)
yield "<a class='directory' href='menu?base=%s&exclude=%s'>%s</a>\n" % \
(newpath, quote_plus(exclude), name)
for chunk in _show_branch(root[name], base, newpath, pct, showpct, exclude, coverage=coverage):
yield chunk
# Now list the files
if path.lower().startswith(base):
relpath = path[len(base):]
files = [k for k, v in root.items() if not v]
files.sort()
for name in files:
newpath = os.path.join(path, name)
pc_str = ""
if showpct:
try:
_, statements, _, missing, _ = coverage.analysis2(newpath)
except:
# Yes, we really want to pass on all errors.
pass
else:
pc = _percent(statements, missing)
pc_str = ("%3d%% " % pc).replace(' ','&nbsp;')
if pc < float(pct) or pc == -1:
pc_str = "<span class='fail'>%s</span>" % pc_str
else:
pc_str = "<span class='pass'>%s</span>" % pc_str
yield TEMPLATE_ITEM % ("| " * (relpath.count(os.sep) + 1),
pc_str, newpath, name)
def _skip_file(path, exclude):
if exclude:
return bool(re.search(exclude, path))
def _graft(path, tree):
d = tree
p = path
atoms = []
while True:
p, tail = os.path.split(p)
if not tail:
break
atoms.append(tail)
atoms.append(p)
if p != "/":
atoms.append("/")
atoms.reverse()
for node in atoms:
if node:
d = d.setdefault(node, {})
def get_tree(base, exclude, coverage=the_coverage):
"""Return covered module names as a nested dict."""
tree = {}
runs = coverage.data.executed_files()
for path in runs:
if not _skip_file(path, exclude) and not os.path.isdir(path):
_graft(path, tree)
return tree
class CoverStats(object):
def __init__(self, coverage, root=None):
self.coverage = coverage
if root is None:
# Guess initial depth. Files outside this path will not be
# reachable from the web interface.
import cherrypy
root = os.path.dirname(cherrypy.__file__)
self.root = root
def index(self):
return TEMPLATE_FRAMESET % self.root.lower()
index.exposed = True
def menu(self, base="/", pct="50", showpct="",
exclude=r'python\d\.\d|test|tut\d|tutorial'):
# The coverage module uses all-lower-case names.
base = base.lower().rstrip(os.sep)
yield TEMPLATE_MENU
yield TEMPLATE_FORM % locals()
# Start by showing links for parent paths
yield "<div id='crumbs'>"
path = ""
atoms = base.split(os.sep)
atoms.pop()
for atom in atoms:
path += atom + os.sep
yield ("<a href='menu?base=%s&exclude=%s'>%s</a> %s"
% (path, quote_plus(exclude), atom, os.sep))
yield "</div>"
yield "<div id='tree'>"
# Then display the tree
tree = get_tree(base, exclude, self.coverage)
if not tree:
yield "<p>No modules covered.</p>"
else:
for chunk in _show_branch(tree, base, "/", pct,
showpct=='checked', exclude, coverage=self.coverage):
yield chunk
yield "</div>"
yield "</body></html>"
menu.exposed = True
def annotated_file(self, filename, statements, excluded, missing):
source = open(filename, 'r')
buffer = []
for lineno, line in enumerate(source.readlines()):
lineno += 1
line = line.strip("\n\r")
empty_the_buffer = True
if lineno in excluded:
template = TEMPLATE_LOC_EXCLUDED
elif lineno in missing:
template = TEMPLATE_LOC_NOT_COVERED
elif lineno in statements:
template = TEMPLATE_LOC_COVERED
else:
empty_the_buffer = False
buffer.append((lineno, line))
if empty_the_buffer:
for lno, pastline in buffer:
yield template % (lno, cgi.escape(pastline))
buffer = []
yield template % (lineno, cgi.escape(line))
def report(self, name):
filename, statements, excluded, missing, _ = self.coverage.analysis2(name)
pc = _percent(statements, missing)
yield TEMPLATE_COVERAGE % dict(name=os.path.basename(name),
fullpath=name,
pc=pc)
yield '<table>\n'
for line in self.annotated_file(filename, statements, excluded,
missing):
yield line
yield '</table>'
yield '</body>'
yield '</html>'
report.exposed = True
def serve(path=localFile, port=8080, root=None):
if coverage is None:
raise ImportError("The coverage module could not be imported.")
from coverage import coverage
cov = coverage(data_file = path)
cov.load()
import cherrypy
cherrypy.config.update({'server.socket_port': int(port),
'server.thread_pool': 10,
'environment': "production",
})
cherrypy.quickstart(CoverStats(cov, root))
if __name__ == "__main__":
serve(*tuple(sys.argv[1:]))

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,388 @@
import struct
import time
import cherrypy
from cherrypy._cpcompat import basestring, BytesIO, ntob, set, unicodestr
from cherrypy.lib import file_generator
from cherrypy.lib import set_vary_header
def decode(encoding=None, default_encoding='utf-8'):
"""Replace or extend the list of charsets used to decode a request entity.
Either argument may be a single string or a list of strings.
encoding
If not None, restricts the set of charsets attempted while decoding
a request entity to the given set (even if a different charset is given in
the Content-Type request header).
default_encoding
Only in effect if the 'encoding' argument is not given.
If given, the set of charsets attempted while decoding a request entity is
*extended* with the given value(s).
"""
body = cherrypy.request.body
if encoding is not None:
if not isinstance(encoding, list):
encoding = [encoding]
body.attempt_charsets = encoding
elif default_encoding:
if not isinstance(default_encoding, list):
default_encoding = [default_encoding]
body.attempt_charsets = body.attempt_charsets + default_encoding
class ResponseEncoder:
default_encoding = 'utf-8'
failmsg = "Response body could not be encoded with %r."
encoding = None
errors = 'strict'
text_only = True
add_charset = True
debug = False
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
self.attempted_charsets = set()
request = cherrypy.serving.request
if request.handler is not None:
# Replace request.handler with self
if self.debug:
cherrypy.log('Replacing request.handler', 'TOOLS.ENCODE')
self.oldhandler = request.handler
request.handler = self
def encode_stream(self, encoding):
"""Encode a streaming response body.
Use a generator wrapper, and just pray it works as the stream is
being written out.
"""
if encoding in self.attempted_charsets:
return False
self.attempted_charsets.add(encoding)
def encoder(body):
for chunk in body:
if isinstance(chunk, unicodestr):
chunk = chunk.encode(encoding, self.errors)
yield chunk
self.body = encoder(self.body)
return True
def encode_string(self, encoding):
"""Encode a buffered response body."""
if encoding in self.attempted_charsets:
return False
self.attempted_charsets.add(encoding)
try:
body = []
for chunk in self.body:
if isinstance(chunk, unicodestr):
chunk = chunk.encode(encoding, self.errors)
body.append(chunk)
self.body = body
except (LookupError, UnicodeError):
return False
else:
return True
def find_acceptable_charset(self):
request = cherrypy.serving.request
response = cherrypy.serving.response
if self.debug:
cherrypy.log('response.stream %r' % response.stream, 'TOOLS.ENCODE')
if response.stream:
encoder = self.encode_stream
else:
encoder = self.encode_string
if "Content-Length" in response.headers:
# Delete Content-Length header so finalize() recalcs it.
# Encoded strings may be of different lengths from their
# unicode equivalents, and even from each other. For example:
# >>> t = u"\u7007\u3040"
# >>> len(t)
# 2
# >>> len(t.encode("UTF-8"))
# 6
# >>> len(t.encode("utf7"))
# 8
del response.headers["Content-Length"]
# Parse the Accept-Charset request header, and try to provide one
# of the requested charsets (in order of user preference).
encs = request.headers.elements('Accept-Charset')
charsets = [enc.value.lower() for enc in encs]
if self.debug:
cherrypy.log('charsets %s' % repr(charsets), 'TOOLS.ENCODE')
if self.encoding is not None:
# If specified, force this encoding to be used, or fail.
encoding = self.encoding.lower()
if self.debug:
cherrypy.log('Specified encoding %r' % encoding, 'TOOLS.ENCODE')
if (not charsets) or "*" in charsets or encoding in charsets:
if self.debug:
cherrypy.log('Attempting encoding %r' % encoding, 'TOOLS.ENCODE')
if encoder(encoding):
return encoding
else:
if not encs:
if self.debug:
cherrypy.log('Attempting default encoding %r' %
self.default_encoding, 'TOOLS.ENCODE')
# Any character-set is acceptable.
if encoder(self.default_encoding):
return self.default_encoding
else:
raise cherrypy.HTTPError(500, self.failmsg % self.default_encoding)
else:
for element in encs:
if element.qvalue > 0:
if element.value == "*":
# Matches any charset. Try our default.
if self.debug:
cherrypy.log('Attempting default encoding due '
'to %r' % element, 'TOOLS.ENCODE')
if encoder(self.default_encoding):
return self.default_encoding
else:
encoding = element.value
if self.debug:
cherrypy.log('Attempting encoding %s (qvalue >'
'0)' % element, 'TOOLS.ENCODE')
if encoder(encoding):
return encoding
if "*" not in charsets:
# If no "*" is present in an Accept-Charset field, then all
# character sets not explicitly mentioned get a quality
# value of 0, except for ISO-8859-1, which gets a quality
# value of 1 if not explicitly mentioned.
iso = 'iso-8859-1'
if iso not in charsets:
if self.debug:
cherrypy.log('Attempting ISO-8859-1 encoding',
'TOOLS.ENCODE')
if encoder(iso):
return iso
# No suitable encoding found.
ac = request.headers.get('Accept-Charset')
if ac is None:
msg = "Your client did not send an Accept-Charset header."
else:
msg = "Your client sent this Accept-Charset header: %s." % ac
msg += " We tried these charsets: %s." % ", ".join(self.attempted_charsets)
raise cherrypy.HTTPError(406, msg)
def __call__(self, *args, **kwargs):
response = cherrypy.serving.response
self.body = self.oldhandler(*args, **kwargs)
if isinstance(self.body, basestring):
# strings get wrapped in a list because iterating over a single
# item list is much faster than iterating over every character
# in a long string.
if self.body:
self.body = [self.body]
else:
# [''] doesn't evaluate to False, so replace it with [].
self.body = []
elif hasattr(self.body, 'read'):
self.body = file_generator(self.body)
elif self.body is None:
self.body = []
ct = response.headers.elements("Content-Type")
if self.debug:
cherrypy.log('Content-Type: %r' % [str(h) for h in ct], 'TOOLS.ENCODE')
if ct:
ct = ct[0]
if self.text_only:
if ct.value.lower().startswith("text/"):
if self.debug:
cherrypy.log('Content-Type %s starts with "text/"' % ct,
'TOOLS.ENCODE')
do_find = True
else:
if self.debug:
cherrypy.log('Not finding because Content-Type %s does '
'not start with "text/"' % ct,
'TOOLS.ENCODE')
do_find = False
else:
if self.debug:
cherrypy.log('Finding because not text_only', 'TOOLS.ENCODE')
do_find = True
if do_find:
# Set "charset=..." param on response Content-Type header
ct.params['charset'] = self.find_acceptable_charset()
if self.add_charset:
if self.debug:
cherrypy.log('Setting Content-Type %s' % ct,
'TOOLS.ENCODE')
response.headers["Content-Type"] = str(ct)
return self.body
# GZIP
def compress(body, compress_level):
"""Compress 'body' at the given compress_level."""
import zlib
# See http://www.gzip.org/zlib/rfc-gzip.html
yield ntob('\x1f\x8b') # ID1 and ID2: gzip marker
yield ntob('\x08') # CM: compression method
yield ntob('\x00') # FLG: none set
# MTIME: 4 bytes
yield struct.pack("<L", int(time.time()) & int('FFFFFFFF', 16))
yield ntob('\x02') # XFL: max compression, slowest algo
yield ntob('\xff') # OS: unknown
crc = zlib.crc32(ntob(""))
size = 0
zobj = zlib.compressobj(compress_level,
zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
for line in body:
size += len(line)
crc = zlib.crc32(line, crc)
yield zobj.compress(line)
yield zobj.flush()
# CRC32: 4 bytes
yield struct.pack("<L", crc & int('FFFFFFFF', 16))
# ISIZE: 4 bytes
yield struct.pack("<L", size & int('FFFFFFFF', 16))
def decompress(body):
import gzip
zbuf = BytesIO()
zbuf.write(body)
zbuf.seek(0)
zfile = gzip.GzipFile(mode='rb', fileobj=zbuf)
data = zfile.read()
zfile.close()
return data
def gzip(compress_level=5, mime_types=['text/html', 'text/plain'], debug=False):
"""Try to gzip the response body if Content-Type in mime_types.
cherrypy.response.headers['Content-Type'] must be set to one of the
values in the mime_types arg before calling this function.
The provided list of mime-types must be of one of the following form:
* type/subtype
* type/*
* type/*+subtype
No compression is performed if any of the following hold:
* The client sends no Accept-Encoding request header
* No 'gzip' or 'x-gzip' is present in the Accept-Encoding header
* No 'gzip' or 'x-gzip' with a qvalue > 0 is present
* The 'identity' value is given with a qvalue > 0.
"""
request = cherrypy.serving.request
response = cherrypy.serving.response
set_vary_header(response, "Accept-Encoding")
if not response.body:
# Response body is empty (might be a 304 for instance)
if debug:
cherrypy.log('No response body', context='TOOLS.GZIP')
return
# If returning cached content (which should already have been gzipped),
# don't re-zip.
if getattr(request, "cached", False):
if debug:
cherrypy.log('Not gzipping cached response', context='TOOLS.GZIP')
return
acceptable = request.headers.elements('Accept-Encoding')
if not acceptable:
# If no Accept-Encoding field is present in a request,
# the server MAY assume that the client will accept any
# content coding. In this case, if "identity" is one of
# the available content-codings, then the server SHOULD use
# the "identity" content-coding, unless it has additional
# information that a different content-coding is meaningful
# to the client.
if debug:
cherrypy.log('No Accept-Encoding', context='TOOLS.GZIP')
return
ct = response.headers.get('Content-Type', '').split(';')[0]
for coding in acceptable:
if coding.value == 'identity' and coding.qvalue != 0:
if debug:
cherrypy.log('Non-zero identity qvalue: %s' % coding,
context='TOOLS.GZIP')
return
if coding.value in ('gzip', 'x-gzip'):
if coding.qvalue == 0:
if debug:
cherrypy.log('Zero gzip qvalue: %s' % coding,
context='TOOLS.GZIP')
return
if ct not in mime_types:
# If the list of provided mime-types contains tokens
# such as 'text/*' or 'application/*+xml',
# we go through them and find the most appropriate one
# based on the given content-type.
# The pattern matching is only caring about the most
# common cases, as stated above, and doesn't support
# for extra parameters.
found = False
if '/' in ct:
ct_media_type, ct_sub_type = ct.split('/')
for mime_type in mime_types:
if '/' in mime_type:
media_type, sub_type = mime_type.split('/')
if ct_media_type == media_type:
if sub_type == '*':
found = True
break
elif '+' in sub_type and '+' in ct_sub_type:
ct_left, ct_right = ct_sub_type.split('+')
left, right = sub_type.split('+')
if left == '*' and ct_right == right:
found = True
break
if not found:
if debug:
cherrypy.log('Content-Type %s not in mime_types %r' %
(ct, mime_types), context='TOOLS.GZIP')
return
if debug:
cherrypy.log('Gzipping', context='TOOLS.GZIP')
# Return a generator that compresses the page
response.headers['Content-Encoding'] = 'gzip'
response.body = compress(response.body, compress_level)
if "Content-Length" in response.headers:
# Delete Content-Length header so finalize() recalcs it.
del response.headers["Content-Length"]
return
if debug:
cherrypy.log('No acceptable encoding found.', context='GZIP')
cherrypy.HTTPError(406, "identity, gzip").set_response()

View File

@@ -0,0 +1,7 @@
import warnings
warnings.warn('cherrypy.lib.http has been deprecated and will be removed '
'in CherryPy 3.3 use cherrypy.lib.httputil instead.',
DeprecationWarning)
from cherrypy.lib.httputil import *

View File

@@ -0,0 +1,354 @@
"""
This module defines functions to implement HTTP Digest Authentication (:rfc:`2617`).
This has full compliance with 'Digest' and 'Basic' authentication methods. In
'Digest' it supports both MD5 and MD5-sess algorithms.
Usage:
First use 'doAuth' to request the client authentication for a
certain resource. You should send an httplib.UNAUTHORIZED response to the
client so he knows he has to authenticate itself.
Then use 'parseAuthorization' to retrieve the 'auth_map' used in
'checkResponse'.
To use 'checkResponse' you must have already verified the password associated
with the 'username' key in 'auth_map' dict. Then you use the 'checkResponse'
function to verify if the password matches the one sent by the client.
SUPPORTED_ALGORITHM - list of supported 'Digest' algorithms
SUPPORTED_QOP - list of supported 'Digest' 'qop'.
"""
__version__ = 1, 0, 1
__author__ = "Tiago Cogumbreiro <cogumbreiro@users.sf.net>"
__credits__ = """
Peter van Kampen for its recipe which implement most of Digest authentication:
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/302378
"""
__license__ = """
Copyright (c) 2005, Tiago Cogumbreiro <cogumbreiro@users.sf.net>
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of Sylvain Hellegouarch nor the names of his contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
__all__ = ("digestAuth", "basicAuth", "doAuth", "checkResponse",
"parseAuthorization", "SUPPORTED_ALGORITHM", "md5SessionKey",
"calculateNonce", "SUPPORTED_QOP")
################################################################################
import time
from cherrypy._cpcompat import base64_decode, ntob, md5
from cherrypy._cpcompat import parse_http_list, parse_keqv_list
MD5 = "MD5"
MD5_SESS = "MD5-sess"
AUTH = "auth"
AUTH_INT = "auth-int"
SUPPORTED_ALGORITHM = (MD5, MD5_SESS)
SUPPORTED_QOP = (AUTH, AUTH_INT)
################################################################################
# doAuth
#
DIGEST_AUTH_ENCODERS = {
MD5: lambda val: md5(ntob(val)).hexdigest(),
MD5_SESS: lambda val: md5(ntob(val)).hexdigest(),
# SHA: lambda val: sha.new(ntob(val)).hexdigest (),
}
def calculateNonce (realm, algorithm = MD5):
"""This is an auxaliary function that calculates 'nonce' value. It is used
to handle sessions."""
global SUPPORTED_ALGORITHM, DIGEST_AUTH_ENCODERS
assert algorithm in SUPPORTED_ALGORITHM
try:
encoder = DIGEST_AUTH_ENCODERS[algorithm]
except KeyError:
raise NotImplementedError ("The chosen algorithm (%s) does not have "\
"an implementation yet" % algorithm)
return encoder ("%d:%s" % (time.time(), realm))
def digestAuth (realm, algorithm = MD5, nonce = None, qop = AUTH):
"""Challenges the client for a Digest authentication."""
global SUPPORTED_ALGORITHM, DIGEST_AUTH_ENCODERS, SUPPORTED_QOP
assert algorithm in SUPPORTED_ALGORITHM
assert qop in SUPPORTED_QOP
if nonce is None:
nonce = calculateNonce (realm, algorithm)
return 'Digest realm="%s", nonce="%s", algorithm="%s", qop="%s"' % (
realm, nonce, algorithm, qop
)
def basicAuth (realm):
"""Challengenes the client for a Basic authentication."""
assert '"' not in realm, "Realms cannot contain the \" (quote) character."
return 'Basic realm="%s"' % realm
def doAuth (realm):
"""'doAuth' function returns the challenge string b giving priority over
Digest and fallback to Basic authentication when the browser doesn't
support the first one.
This should be set in the HTTP header under the key 'WWW-Authenticate'."""
return digestAuth (realm) + " " + basicAuth (realm)
################################################################################
# Parse authorization parameters
#
def _parseDigestAuthorization (auth_params):
# Convert the auth params to a dict
items = parse_http_list(auth_params)
params = parse_keqv_list(items)
# Now validate the params
# Check for required parameters
required = ["username", "realm", "nonce", "uri", "response"]
for k in required:
if k not in params:
return None
# If qop is sent then cnonce and nc MUST be present
if "qop" in params and not ("cnonce" in params \
and "nc" in params):
return None
# If qop is not sent, neither cnonce nor nc can be present
if ("cnonce" in params or "nc" in params) and \
"qop" not in params:
return None
return params
def _parseBasicAuthorization (auth_params):
username, password = base64_decode(auth_params).split(":", 1)
return {"username": username, "password": password}
AUTH_SCHEMES = {
"basic": _parseBasicAuthorization,
"digest": _parseDigestAuthorization,
}
def parseAuthorization (credentials):
"""parseAuthorization will convert the value of the 'Authorization' key in
the HTTP header to a map itself. If the parsing fails 'None' is returned.
"""
global AUTH_SCHEMES
auth_scheme, auth_params = credentials.split(" ", 1)
auth_scheme = auth_scheme.lower ()
parser = AUTH_SCHEMES[auth_scheme]
params = parser (auth_params)
if params is None:
return
assert "auth_scheme" not in params
params["auth_scheme"] = auth_scheme
return params
################################################################################
# Check provided response for a valid password
#
def md5SessionKey (params, password):
"""
If the "algorithm" directive's value is "MD5-sess", then A1
[the session key] is calculated only once - on the first request by the
client following receipt of a WWW-Authenticate challenge from the server.
This creates a 'session key' for the authentication of subsequent
requests and responses which is different for each "authentication
session", thus limiting the amount of material hashed with any one
key.
Because the server need only use the hash of the user
credentials in order to create the A1 value, this construction could
be used in conjunction with a third party authentication service so
that the web server would not need the actual password value. The
specification of such a protocol is beyond the scope of this
specification.
"""
keys = ("username", "realm", "nonce", "cnonce")
params_copy = {}
for key in keys:
params_copy[key] = params[key]
params_copy["algorithm"] = MD5_SESS
return _A1 (params_copy, password)
def _A1(params, password):
algorithm = params.get ("algorithm", MD5)
H = DIGEST_AUTH_ENCODERS[algorithm]
if algorithm == MD5:
# If the "algorithm" directive's value is "MD5" or is
# unspecified, then A1 is:
# A1 = unq(username-value) ":" unq(realm-value) ":" passwd
return "%s:%s:%s" % (params["username"], params["realm"], password)
elif algorithm == MD5_SESS:
# This is A1 if qop is set
# A1 = H( unq(username-value) ":" unq(realm-value) ":" passwd )
# ":" unq(nonce-value) ":" unq(cnonce-value)
h_a1 = H ("%s:%s:%s" % (params["username"], params["realm"], password))
return "%s:%s:%s" % (h_a1, params["nonce"], params["cnonce"])
def _A2(params, method, kwargs):
# If the "qop" directive's value is "auth" or is unspecified, then A2 is:
# A2 = Method ":" digest-uri-value
qop = params.get ("qop", "auth")
if qop == "auth":
return method + ":" + params["uri"]
elif qop == "auth-int":
# If the "qop" value is "auth-int", then A2 is:
# A2 = Method ":" digest-uri-value ":" H(entity-body)
entity_body = kwargs.get ("entity_body", "")
H = kwargs["H"]
return "%s:%s:%s" % (
method,
params["uri"],
H(entity_body)
)
else:
raise NotImplementedError ("The 'qop' method is unknown: %s" % qop)
def _computeDigestResponse(auth_map, password, method = "GET", A1 = None,**kwargs):
"""
Generates a response respecting the algorithm defined in RFC 2617
"""
params = auth_map
algorithm = params.get ("algorithm", MD5)
H = DIGEST_AUTH_ENCODERS[algorithm]
KD = lambda secret, data: H(secret + ":" + data)
qop = params.get ("qop", None)
H_A2 = H(_A2(params, method, kwargs))
if algorithm == MD5_SESS and A1 is not None:
H_A1 = H(A1)
else:
H_A1 = H(_A1(params, password))
if qop in ("auth", "auth-int"):
# If the "qop" value is "auth" or "auth-int":
# request-digest = <"> < KD ( H(A1), unq(nonce-value)
# ":" nc-value
# ":" unq(cnonce-value)
# ":" unq(qop-value)
# ":" H(A2)
# ) <">
request = "%s:%s:%s:%s:%s" % (
params["nonce"],
params["nc"],
params["cnonce"],
params["qop"],
H_A2,
)
elif qop is None:
# If the "qop" directive is not present (this construction is
# for compatibility with RFC 2069):
# request-digest =
# <"> < KD ( H(A1), unq(nonce-value) ":" H(A2) ) > <">
request = "%s:%s" % (params["nonce"], H_A2)
return KD(H_A1, request)
def _checkDigestResponse(auth_map, password, method = "GET", A1 = None, **kwargs):
"""This function is used to verify the response given by the client when
he tries to authenticate.
Optional arguments:
entity_body - when 'qop' is set to 'auth-int' you MUST provide the
raw data you are going to send to the client (usually the
HTML page.
request_uri - the uri from the request line compared with the 'uri'
directive of the authorization map. They must represent
the same resource (unused at this time).
"""
if auth_map['realm'] != kwargs.get('realm', None):
return False
response = _computeDigestResponse(auth_map, password, method, A1,**kwargs)
return response == auth_map["response"]
def _checkBasicResponse (auth_map, password, method='GET', encrypt=None, **kwargs):
# Note that the Basic response doesn't provide the realm value so we cannot
# test it
try:
return encrypt(auth_map["password"], auth_map["username"]) == password
except TypeError:
return encrypt(auth_map["password"]) == password
AUTH_RESPONSES = {
"basic": _checkBasicResponse,
"digest": _checkDigestResponse,
}
def checkResponse (auth_map, password, method = "GET", encrypt=None, **kwargs):
"""'checkResponse' compares the auth_map with the password and optionally
other arguments that each implementation might need.
If the response is of type 'Basic' then the function has the following
signature::
checkBasicResponse (auth_map, password) -> bool
If the response is of type 'Digest' then the function has the following
signature::
checkDigestResponse (auth_map, password, method = 'GET', A1 = None) -> bool
The 'A1' argument is only used in MD5_SESS algorithm based responses.
Check md5SessionKey() for more info.
"""
checker = AUTH_RESPONSES[auth_map["auth_scheme"]]
return checker (auth_map, password, method=method, encrypt=encrypt, **kwargs)

View File

@@ -0,0 +1,469 @@
"""HTTP library functions.
This module contains functions for building an HTTP application
framework: any one, not just one whose name starts with "Ch". ;) If you
reference any modules from some popular framework inside *this* module,
FuManChu will personally hang you up by your thumbs and submit you
to a public caning.
"""
from binascii import b2a_base64
from cherrypy._cpcompat import BaseHTTPRequestHandler, HTTPDate, ntob, ntou, reversed, sorted
from cherrypy._cpcompat import basestring, iteritems, unicodestr, unquote_qs
response_codes = BaseHTTPRequestHandler.responses.copy()
# From http://www.cherrypy.org/ticket/361
response_codes[500] = ('Internal Server Error',
'The server encountered an unexpected condition '
'which prevented it from fulfilling the request.')
response_codes[503] = ('Service Unavailable',
'The server is currently unable to handle the '
'request due to a temporary overloading or '
'maintenance of the server.')
import re
import urllib
def urljoin(*atoms):
"""Return the given path \*atoms, joined into a single URL.
This will correctly join a SCRIPT_NAME and PATH_INFO into the
original URL, even if either atom is blank.
"""
url = "/".join([x for x in atoms if x])
while "//" in url:
url = url.replace("//", "/")
# Special-case the final url of "", and return "/" instead.
return url or "/"
def protocol_from_http(protocol_str):
"""Return a protocol tuple from the given 'HTTP/x.y' string."""
return int(protocol_str[5]), int(protocol_str[7])
def get_ranges(headervalue, content_length):
"""Return a list of (start, stop) indices from a Range header, or None.
Each (start, stop) tuple will be composed of two ints, which are suitable
for use in a slicing operation. That is, the header "Range: bytes=3-6",
if applied against a Python string, is requesting resource[3:7]. This
function will return the list [(3, 7)].
If this function returns an empty list, you should return HTTP 416.
"""
if not headervalue:
return None
result = []
bytesunit, byteranges = headervalue.split("=", 1)
for brange in byteranges.split(","):
start, stop = [x.strip() for x in brange.split("-", 1)]
if start:
if not stop:
stop = content_length - 1
start, stop = int(start), int(stop)
if start >= content_length:
# From rfc 2616 sec 14.16:
# "If the server receives a request (other than one
# including an If-Range request-header field) with an
# unsatisfiable Range request-header field (that is,
# all of whose byte-range-spec values have a first-byte-pos
# value greater than the current length of the selected
# resource), it SHOULD return a response code of 416
# (Requested range not satisfiable)."
continue
if stop < start:
# From rfc 2616 sec 14.16:
# "If the server ignores a byte-range-spec because it
# is syntactically invalid, the server SHOULD treat
# the request as if the invalid Range header field
# did not exist. (Normally, this means return a 200
# response containing the full entity)."
return None
result.append((start, stop + 1))
else:
if not stop:
# See rfc quote above.
return None
# Negative subscript (last N bytes)
result.append((content_length - int(stop), content_length))
return result
class HeaderElement(object):
"""An element (with parameters) from an HTTP header's element list."""
def __init__(self, value, params=None):
self.value = value
if params is None:
params = {}
self.params = params
def __cmp__(self, other):
return cmp(self.value, other.value)
def __str__(self):
p = [";%s=%s" % (k, v) for k, v in iteritems(self.params)]
return "%s%s" % (self.value, "".join(p))
def __unicode__(self):
return ntou(self.__str__())
def parse(elementstr):
"""Transform 'token;key=val' to ('token', {'key': 'val'})."""
# Split the element into a value and parameters. The 'value' may
# be of the form, "token=token", but we don't split that here.
atoms = [x.strip() for x in elementstr.split(";") if x.strip()]
if not atoms:
initial_value = ''
else:
initial_value = atoms.pop(0).strip()
params = {}
for atom in atoms:
atom = [x.strip() for x in atom.split("=", 1) if x.strip()]
key = atom.pop(0)
if atom:
val = atom[0]
else:
val = ""
params[key] = val
return initial_value, params
parse = staticmethod(parse)
def from_str(cls, elementstr):
"""Construct an instance from a string of the form 'token;key=val'."""
ival, params = cls.parse(elementstr)
return cls(ival, params)
from_str = classmethod(from_str)
q_separator = re.compile(r'; *q *=')
class AcceptElement(HeaderElement):
"""An element (with parameters) from an Accept* header's element list.
AcceptElement objects are comparable; the more-preferred object will be
"less than" the less-preferred object. They are also therefore sortable;
if you sort a list of AcceptElement objects, they will be listed in
priority order; the most preferred value will be first. Yes, it should
have been the other way around, but it's too late to fix now.
"""
def from_str(cls, elementstr):
qvalue = None
# The first "q" parameter (if any) separates the initial
# media-range parameter(s) (if any) from the accept-params.
atoms = q_separator.split(elementstr, 1)
media_range = atoms.pop(0).strip()
if atoms:
# The qvalue for an Accept header can have extensions. The other
# headers cannot, but it's easier to parse them as if they did.
qvalue = HeaderElement.from_str(atoms[0].strip())
media_type, params = cls.parse(media_range)
if qvalue is not None:
params["q"] = qvalue
return cls(media_type, params)
from_str = classmethod(from_str)
def qvalue(self):
val = self.params.get("q", "1")
if isinstance(val, HeaderElement):
val = val.value
return float(val)
qvalue = property(qvalue, doc="The qvalue, or priority, of this value.")
def __cmp__(self, other):
diff = cmp(self.qvalue, other.qvalue)
if diff == 0:
diff = cmp(str(self), str(other))
return diff
def header_elements(fieldname, fieldvalue):
"""Return a sorted HeaderElement list from a comma-separated header string."""
if not fieldvalue:
return []
result = []
for element in fieldvalue.split(","):
if fieldname.startswith("Accept") or fieldname == 'TE':
hv = AcceptElement.from_str(element)
else:
hv = HeaderElement.from_str(element)
result.append(hv)
return list(reversed(sorted(result)))
def decode_TEXT(value):
r"""Decode :rfc:`2047` TEXT (e.g. "=?utf-8?q?f=C3=BCr?=" -> u"f\xfcr")."""
from email.Header import decode_header
atoms = decode_header(value)
decodedvalue = ""
for atom, charset in atoms:
if charset is not None:
atom = atom.decode(charset)
decodedvalue += atom
return decodedvalue
def valid_status(status):
"""Return legal HTTP status Code, Reason-phrase and Message.
The status arg must be an int, or a str that begins with an int.
If status is an int, or a str and no reason-phrase is supplied,
a default reason-phrase will be provided.
"""
if not status:
status = 200
status = str(status)
parts = status.split(" ", 1)
if len(parts) == 1:
# No reason supplied.
code, = parts
reason = None
else:
code, reason = parts
reason = reason.strip()
try:
code = int(code)
except ValueError:
raise ValueError("Illegal response status from server "
"(%s is non-numeric)." % repr(code))
if code < 100 or code > 599:
raise ValueError("Illegal response status from server "
"(%s is out of range)." % repr(code))
if code not in response_codes:
# code is unknown but not illegal
default_reason, message = "", ""
else:
default_reason, message = response_codes[code]
if reason is None:
reason = default_reason
return code, reason, message
def _parse_qs(qs, keep_blank_values=0, strict_parsing=0, encoding='utf-8'):
"""Parse a query given as a string argument.
Arguments:
qs: URL-encoded query string to be parsed
keep_blank_values: flag indicating whether blank values in
URL encoded queries should be treated as blank strings. A
true value indicates that blanks should be retained as blank
strings. The default false value indicates that blank values
are to be ignored and treated as if they were not included.
strict_parsing: flag indicating what to do with parsing errors. If
false (the default), errors are silently ignored. If true,
errors raise a ValueError exception.
Returns a dict, as G-d intended.
"""
pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
d = {}
for name_value in pairs:
if not name_value and not strict_parsing:
continue
nv = name_value.split('=', 1)
if len(nv) != 2:
if strict_parsing:
raise ValueError("bad query field: %r" % (name_value,))
# Handle case of a control-name with no equal sign
if keep_blank_values:
nv.append('')
else:
continue
if len(nv[1]) or keep_blank_values:
name = unquote_qs(nv[0], encoding)
value = unquote_qs(nv[1], encoding)
if name in d:
if not isinstance(d[name], list):
d[name] = [d[name]]
d[name].append(value)
else:
d[name] = value
return d
image_map_pattern = re.compile(r"[0-9]+,[0-9]+")
def parse_query_string(query_string, keep_blank_values=True, encoding='utf-8'):
"""Build a params dictionary from a query_string.
Duplicate key/value pairs in the provided query_string will be
returned as {'key': [val1, val2, ...]}. Single key/values will
be returned as strings: {'key': 'value'}.
"""
if image_map_pattern.match(query_string):
# Server-side image map. Map the coords to 'x' and 'y'
# (like CGI::Request does).
pm = query_string.split(",")
pm = {'x': int(pm[0]), 'y': int(pm[1])}
else:
pm = _parse_qs(query_string, keep_blank_values, encoding=encoding)
return pm
class CaseInsensitiveDict(dict):
"""A case-insensitive dict subclass.
Each key is changed on entry to str(key).title().
"""
def __getitem__(self, key):
return dict.__getitem__(self, str(key).title())
def __setitem__(self, key, value):
dict.__setitem__(self, str(key).title(), value)
def __delitem__(self, key):
dict.__delitem__(self, str(key).title())
def __contains__(self, key):
return dict.__contains__(self, str(key).title())
def get(self, key, default=None):
return dict.get(self, str(key).title(), default)
def has_key(self, key):
return dict.has_key(self, str(key).title())
def update(self, E):
for k in E.keys():
self[str(k).title()] = E[k]
def fromkeys(cls, seq, value=None):
newdict = cls()
for k in seq:
newdict[str(k).title()] = value
return newdict
fromkeys = classmethod(fromkeys)
def setdefault(self, key, x=None):
key = str(key).title()
try:
return self[key]
except KeyError:
self[key] = x
return x
def pop(self, key, default):
return dict.pop(self, str(key).title(), default)
# TEXT = <any OCTET except CTLs, but including LWS>
#
# A CRLF is allowed in the definition of TEXT only as part of a header
# field continuation. It is expected that the folding LWS will be
# replaced with a single SP before interpretation of the TEXT value."
header_translate_table = ''.join([chr(i) for i in xrange(256)])
header_translate_deletechars = ''.join([chr(i) for i in xrange(32)]) + chr(127)
class HeaderMap(CaseInsensitiveDict):
"""A dict subclass for HTTP request and response headers.
Each key is changed on entry to str(key).title(). This allows headers
to be case-insensitive and avoid duplicates.
Values are header values (decoded according to :rfc:`2047` if necessary).
"""
protocol=(1, 1)
encodings = ["ISO-8859-1"]
# Someday, when http-bis is done, this will probably get dropped
# since few servers, clients, or intermediaries do it. But until then,
# we're going to obey the spec as is.
# "Words of *TEXT MAY contain characters from character sets other than
# ISO-8859-1 only when encoded according to the rules of RFC 2047."
use_rfc_2047 = True
def elements(self, key):
"""Return a sorted list of HeaderElements for the given header."""
key = str(key).title()
value = self.get(key)
return header_elements(key, value)
def values(self, key):
"""Return a sorted list of HeaderElement.value for the given header."""
return [e.value for e in self.elements(key)]
def output(self):
"""Transform self into a list of (name, value) tuples."""
header_list = []
for k, v in self.items():
if isinstance(k, unicodestr):
k = self.encode(k)
if not isinstance(v, basestring):
v = str(v)
if isinstance(v, unicodestr):
v = self.encode(v)
# See header_translate_* constants above.
# Replace only if you really know what you're doing.
k = k.translate(header_translate_table, header_translate_deletechars)
v = v.translate(header_translate_table, header_translate_deletechars)
header_list.append((k, v))
return header_list
def encode(self, v):
"""Return the given header name or value, encoded for HTTP output."""
for enc in self.encodings:
try:
return v.encode(enc)
except UnicodeEncodeError:
continue
if self.protocol == (1, 1) and self.use_rfc_2047:
# Encode RFC-2047 TEXT
# (e.g. u"\u8200" -> "=?utf-8?b?6IiA?=").
# We do our own here instead of using the email module
# because we never want to fold lines--folding has
# been deprecated by the HTTP working group.
v = b2a_base64(v.encode('utf-8'))
return (ntob('=?utf-8?b?') + v.strip(ntob('\n')) + ntob('?='))
raise ValueError("Could not encode header part %r using "
"any of the encodings %r." %
(v, self.encodings))
class Host(object):
"""An internet address.
name
Should be the client's host name. If not available (because no DNS
lookup is performed), the IP address should be used instead.
"""
ip = "0.0.0.0"
port = 80
name = "unknown.tld"
def __init__(self, ip, port, name=None):
self.ip = ip
self.port = port
if name is None:
name = ip
self.name = name
def __repr__(self):
return "httputil.Host(%r, %r, %r)" % (self.ip, self.port, self.name)

View File

@@ -0,0 +1,87 @@
import sys
import cherrypy
from cherrypy._cpcompat import basestring, ntou, json, json_encode, json_decode
def json_processor(entity):
"""Read application/json data into request.json."""
if not entity.headers.get(ntou("Content-Length"), ntou("")):
raise cherrypy.HTTPError(411)
body = entity.fp.read()
try:
cherrypy.serving.request.json = json_decode(body.decode('utf-8'))
except ValueError:
raise cherrypy.HTTPError(400, 'Invalid JSON document')
def json_in(content_type=[ntou('application/json'), ntou('text/javascript')],
force=True, debug=False, processor = json_processor):
"""Add a processor to parse JSON request entities:
The default processor places the parsed data into request.json.
Incoming request entities which match the given content_type(s) will
be deserialized from JSON to the Python equivalent, and the result
stored at cherrypy.request.json. The 'content_type' argument may
be a Content-Type string or a list of allowable Content-Type strings.
If the 'force' argument is True (the default), then entities of other
content types will not be allowed; "415 Unsupported Media Type" is
raised instead.
Supply your own processor to use a custom decoder, or to handle the parsed
data differently. The processor can be configured via
tools.json_in.processor or via the decorator method.
Note that the deserializer requires the client send a Content-Length
request header, or it will raise "411 Length Required". If for any
other reason the request entity cannot be deserialized from JSON,
it will raise "400 Bad Request: Invalid JSON document".
You must be using Python 2.6 or greater, or have the 'simplejson'
package importable; otherwise, ValueError is raised during processing.
"""
request = cherrypy.serving.request
if isinstance(content_type, basestring):
content_type = [content_type]
if force:
if debug:
cherrypy.log('Removing body processors %s' %
repr(request.body.processors.keys()), 'TOOLS.JSON_IN')
request.body.processors.clear()
request.body.default_proc = cherrypy.HTTPError(
415, 'Expected an entity of content type %s' %
', '.join(content_type))
for ct in content_type:
if debug:
cherrypy.log('Adding body processor for %s' % ct, 'TOOLS.JSON_IN')
request.body.processors[ct] = processor
def json_handler(*args, **kwargs):
value = cherrypy.serving.request._json_inner_handler(*args, **kwargs)
return json_encode(value)
def json_out(content_type='application/json', debug=False, handler=json_handler):
"""Wrap request.handler to serialize its output to JSON. Sets Content-Type.
If the given content_type is None, the Content-Type response header
is not set.
Provide your own handler to use a custom encoder. For example
cherrypy.config['tools.json_out.handler'] = <function>, or
@json_out(handler=function).
You must be using Python 2.6 or greater, or have the 'simplejson'
package importable; otherwise, ValueError is raised during processing.
"""
request = cherrypy.serving.request
if debug:
cherrypy.log('Replacing %s with JSON handler' % request.handler,
'TOOLS.JSON_OUT')
request._json_inner_handler = request.handler
request.handler = handler
if content_type is not None:
if debug:
cherrypy.log('Setting Content-Type to %s' % ct, 'TOOLS.JSON_OUT')
cherrypy.serving.response.headers['Content-Type'] = content_type

View File

@@ -0,0 +1,208 @@
"""Profiler tools for CherryPy.
CherryPy users
==============
You can profile any of your pages as follows::
from cherrypy.lib import profiler
class Root:
p = profile.Profiler("/path/to/profile/dir")
def index(self):
self.p.run(self._index)
index.exposed = True
def _index(self):
return "Hello, world!"
cherrypy.tree.mount(Root())
You can also turn on profiling for all requests
using the ``make_app`` function as WSGI middleware.
CherryPy developers
===================
This module can be used whenever you make changes to CherryPy,
to get a quick sanity-check on overall CP performance. Use the
``--profile`` flag when running the test suite. Then, use the ``serve()``
function to browse the results in a web browser. If you run this
module from the command line, it will call ``serve()`` for you.
"""
def new_func_strip_path(func_name):
"""Make profiler output more readable by adding ``__init__`` modules' parents"""
filename, line, name = func_name
if filename.endswith("__init__.py"):
return os.path.basename(filename[:-12]) + filename[-12:], line, name
return os.path.basename(filename), line, name
try:
import profile
import pstats
pstats.func_strip_path = new_func_strip_path
except ImportError:
profile = None
pstats = None
import os, os.path
import sys
import warnings
from cherrypy._cpcompat import BytesIO
_count = 0
class Profiler(object):
def __init__(self, path=None):
if not path:
path = os.path.join(os.path.dirname(__file__), "profile")
self.path = path
if not os.path.exists(path):
os.makedirs(path)
def run(self, func, *args, **params):
"""Dump profile data into self.path."""
global _count
c = _count = _count + 1
path = os.path.join(self.path, "cp_%04d.prof" % c)
prof = profile.Profile()
result = prof.runcall(func, *args, **params)
prof.dump_stats(path)
return result
def statfiles(self):
""":rtype: list of available profiles.
"""
return [f for f in os.listdir(self.path)
if f.startswith("cp_") and f.endswith(".prof")]
def stats(self, filename, sortby='cumulative'):
""":rtype stats(index): output of print_stats() for the given profile.
"""
sio = BytesIO()
if sys.version_info >= (2, 5):
s = pstats.Stats(os.path.join(self.path, filename), stream=sio)
s.strip_dirs()
s.sort_stats(sortby)
s.print_stats()
else:
# pstats.Stats before Python 2.5 didn't take a 'stream' arg,
# but just printed to stdout. So re-route stdout.
s = pstats.Stats(os.path.join(self.path, filename))
s.strip_dirs()
s.sort_stats(sortby)
oldout = sys.stdout
try:
sys.stdout = sio
s.print_stats()
finally:
sys.stdout = oldout
response = sio.getvalue()
sio.close()
return response
def index(self):
return """<html>
<head><title>CherryPy profile data</title></head>
<frameset cols='200, 1*'>
<frame src='menu' />
<frame name='main' src='' />
</frameset>
</html>
"""
index.exposed = True
def menu(self):
yield "<h2>Profiling runs</h2>"
yield "<p>Click on one of the runs below to see profiling data.</p>"
runs = self.statfiles()
runs.sort()
for i in runs:
yield "<a href='report?filename=%s' target='main'>%s</a><br />" % (i, i)
menu.exposed = True
def report(self, filename):
import cherrypy
cherrypy.response.headers['Content-Type'] = 'text/plain'
return self.stats(filename)
report.exposed = True
class ProfileAggregator(Profiler):
def __init__(self, path=None):
Profiler.__init__(self, path)
global _count
self.count = _count = _count + 1
self.profiler = profile.Profile()
def run(self, func, *args):
path = os.path.join(self.path, "cp_%04d.prof" % self.count)
result = self.profiler.runcall(func, *args)
self.profiler.dump_stats(path)
return result
class make_app:
def __init__(self, nextapp, path=None, aggregate=False):
"""Make a WSGI middleware app which wraps 'nextapp' with profiling.
nextapp
the WSGI application to wrap, usually an instance of
cherrypy.Application.
path
where to dump the profiling output.
aggregate
if True, profile data for all HTTP requests will go in
a single file. If False (the default), each HTTP request will
dump its profile data into a separate file.
"""
if profile is None or pstats is None:
msg = ("Your installation of Python does not have a profile module. "
"If you're on Debian, try `sudo apt-get install python-profiler`. "
"See http://www.cherrypy.org/wiki/ProfilingOnDebian for details.")
warnings.warn(msg)
self.nextapp = nextapp
self.aggregate = aggregate
if aggregate:
self.profiler = ProfileAggregator(path)
else:
self.profiler = Profiler(path)
def __call__(self, environ, start_response):
def gather():
result = []
for line in self.nextapp(environ, start_response):
result.append(line)
return result
return self.profiler.run(gather)
def serve(path=None, port=8080):
if profile is None or pstats is None:
msg = ("Your installation of Python does not have a profile module. "
"If you're on Debian, try `sudo apt-get install python-profiler`. "
"See http://www.cherrypy.org/wiki/ProfilingOnDebian for details.")
warnings.warn(msg)
import cherrypy
cherrypy.config.update({'server.socket_port': int(port),
'server.thread_pool': 10,
'environment': "production",
})
cherrypy.quickstart(Profiler(path))
if __name__ == "__main__":
serve(*tuple(sys.argv[1:]))

View File

@@ -0,0 +1,351 @@
"""Generic configuration system using unrepr.
Configuration data may be supplied as a Python dictionary, as a filename,
or as an open file object. When you supply a filename or file, Python's
builtin ConfigParser is used (with some extensions).
Namespaces
----------
Configuration keys are separated into namespaces by the first "." in the key.
The only key that cannot exist in a namespace is the "environment" entry.
This special entry 'imports' other config entries from a template stored in
the Config.environments dict.
You can define your own namespaces to be called when new config is merged
by adding a named handler to Config.namespaces. The name can be any string,
and the handler must be either a callable or a context manager.
"""
try:
# Python 3.0+
from configparser import ConfigParser
except ImportError:
from ConfigParser import ConfigParser
try:
set
except NameError:
from sets import Set as set
import sys
def as_dict(config):
"""Return a dict from 'config' whether it is a dict, file, or filename."""
if isinstance(config, basestring):
config = Parser().dict_from_file(config)
elif hasattr(config, 'read'):
config = Parser().dict_from_file(config)
return config
class NamespaceSet(dict):
"""A dict of config namespace names and handlers.
Each config entry should begin with a namespace name; the corresponding
namespace handler will be called once for each config entry in that
namespace, and will be passed two arguments: the config key (with the
namespace removed) and the config value.
Namespace handlers may be any Python callable; they may also be
Python 2.5-style 'context managers', in which case their __enter__
method should return a callable to be used as the handler.
See cherrypy.tools (the Toolbox class) for an example.
"""
def __call__(self, config):
"""Iterate through config and pass it to each namespace handler.
config
A flat dict, where keys use dots to separate
namespaces, and values are arbitrary.
The first name in each config key is used to look up the corresponding
namespace handler. For example, a config entry of {'tools.gzip.on': v}
will call the 'tools' namespace handler with the args: ('gzip.on', v)
"""
# Separate the given config into namespaces
ns_confs = {}
for k in config:
if "." in k:
ns, name = k.split(".", 1)
bucket = ns_confs.setdefault(ns, {})
bucket[name] = config[k]
# I chose __enter__ and __exit__ so someday this could be
# rewritten using Python 2.5's 'with' statement:
# for ns, handler in self.iteritems():
# with handler as callable:
# for k, v in ns_confs.get(ns, {}).iteritems():
# callable(k, v)
for ns, handler in self.items():
exit = getattr(handler, "__exit__", None)
if exit:
callable = handler.__enter__()
no_exc = True
try:
try:
for k, v in ns_confs.get(ns, {}).items():
callable(k, v)
except:
# The exceptional case is handled here
no_exc = False
if exit is None:
raise
if not exit(*sys.exc_info()):
raise
# The exception is swallowed if exit() returns true
finally:
# The normal and non-local-goto cases are handled here
if no_exc and exit:
exit(None, None, None)
else:
for k, v in ns_confs.get(ns, {}).items():
handler(k, v)
def __repr__(self):
return "%s.%s(%s)" % (self.__module__, self.__class__.__name__,
dict.__repr__(self))
def __copy__(self):
newobj = self.__class__()
newobj.update(self)
return newobj
copy = __copy__
class Config(dict):
"""A dict-like set of configuration data, with defaults and namespaces.
May take a file, filename, or dict.
"""
defaults = {}
environments = {}
namespaces = NamespaceSet()
def __init__(self, file=None, **kwargs):
self.reset()
if file is not None:
self.update(file)
if kwargs:
self.update(kwargs)
def reset(self):
"""Reset self to default values."""
self.clear()
dict.update(self, self.defaults)
def update(self, config):
"""Update self from a dict, file or filename."""
if isinstance(config, basestring):
# Filename
config = Parser().dict_from_file(config)
elif hasattr(config, 'read'):
# Open file object
config = Parser().dict_from_file(config)
else:
config = config.copy()
self._apply(config)
def _apply(self, config):
"""Update self from a dict."""
which_env = config.get('environment')
if which_env:
env = self.environments[which_env]
for k in env:
if k not in config:
config[k] = env[k]
dict.update(self, config)
self.namespaces(config)
def __setitem__(self, k, v):
dict.__setitem__(self, k, v)
self.namespaces({k: v})
class Parser(ConfigParser):
"""Sub-class of ConfigParser that keeps the case of options and that
raises an exception if the file cannot be read.
"""
def optionxform(self, optionstr):
return optionstr
def read(self, filenames):
if isinstance(filenames, basestring):
filenames = [filenames]
for filename in filenames:
# try:
# fp = open(filename)
# except IOError:
# continue
fp = open(filename)
try:
self._read(fp, filename)
finally:
fp.close()
def as_dict(self, raw=False, vars=None):
"""Convert an INI file to a dictionary"""
# Load INI file into a dict
result = {}
for section in self.sections():
if section not in result:
result[section] = {}
for option in self.options(section):
value = self.get(section, option, raw, vars)
try:
value = unrepr(value)
except Exception, x:
msg = ("Config error in section: %r, option: %r, "
"value: %r. Config values must be valid Python." %
(section, option, value))
raise ValueError(msg, x.__class__.__name__, x.args)
result[section][option] = value
return result
def dict_from_file(self, file):
if hasattr(file, 'read'):
self.readfp(file)
else:
self.read(file)
return self.as_dict()
# public domain "unrepr" implementation, found on the web and then improved.
class _Builder:
def build(self, o):
m = getattr(self, 'build_' + o.__class__.__name__, None)
if m is None:
raise TypeError("unrepr does not recognize %s" %
repr(o.__class__.__name__))
return m(o)
def build_Subscript(self, o):
expr, flags, subs = o.getChildren()
expr = self.build(expr)
subs = self.build(subs)
return expr[subs]
def build_CallFunc(self, o):
children = map(self.build, o.getChildren())
callee = children.pop(0)
kwargs = children.pop() or {}
starargs = children.pop() or ()
args = tuple(children) + tuple(starargs)
return callee(*args, **kwargs)
def build_List(self, o):
return map(self.build, o.getChildren())
def build_Const(self, o):
return o.value
def build_Dict(self, o):
d = {}
i = iter(map(self.build, o.getChildren()))
for el in i:
d[el] = i.next()
return d
def build_Tuple(self, o):
return tuple(self.build_List(o))
def build_Name(self, o):
name = o.name
if name == 'None':
return None
if name == 'True':
return True
if name == 'False':
return False
# See if the Name is a package or module. If it is, import it.
try:
return modules(name)
except ImportError:
pass
# See if the Name is in builtins.
try:
import __builtin__
return getattr(__builtin__, name)
except AttributeError:
pass
raise TypeError("unrepr could not resolve the name %s" % repr(name))
def build_Add(self, o):
left, right = map(self.build, o.getChildren())
return left + right
def build_Getattr(self, o):
parent = self.build(o.expr)
return getattr(parent, o.attrname)
def build_NoneType(self, o):
return None
def build_UnarySub(self, o):
return -self.build(o.getChildren()[0])
def build_UnaryAdd(self, o):
return self.build(o.getChildren()[0])
def _astnode(s):
"""Return a Python ast Node compiled from a string."""
try:
import compiler
except ImportError:
# Fallback to eval when compiler package is not available,
# e.g. IronPython 1.0.
return eval(s)
p = compiler.parse("__tempvalue__ = " + s)
return p.getChildren()[1].getChildren()[0].getChildren()[1]
def unrepr(s):
"""Return a Python object compiled from a string."""
if not s:
return s
obj = _astnode(s)
return _Builder().build(obj)
def modules(modulePath):
"""Load a module and retrieve a reference to that module."""
try:
mod = sys.modules[modulePath]
if mod is None:
raise KeyError()
except KeyError:
# The last [''] is important.
mod = __import__(modulePath, globals(), locals(), [''])
return mod
def attributes(full_attribute_name):
"""Load a module and retrieve an attribute of that module."""
# Parse out the path, module, and attribute
last_dot = full_attribute_name.rfind(".")
attr_name = full_attribute_name[last_dot + 1:]
mod_path = full_attribute_name[:last_dot]
mod = modules(mod_path)
# Let an AttributeError propagate outward.
try:
attr = getattr(mod, attr_name)
except AttributeError:
raise AttributeError("'%s' object has no attribute '%s'"
% (mod_path, attr_name))
# Return a reference to the attribute.
return attr

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,352 @@
import logging
import mimetypes
mimetypes.init()
mimetypes.types_map['.dwg']='image/x-dwg'
mimetypes.types_map['.ico']='image/x-icon'
mimetypes.types_map['.bz2']='application/x-bzip2'
mimetypes.types_map['.gz']='application/x-gzip'
import os
import re
import stat
import time
import cherrypy
from cherrypy._cpcompat import ntob, unquote
from cherrypy.lib import cptools, httputil, file_generator_limited
def serve_file(path, content_type=None, disposition=None, name=None, debug=False):
"""Set status, headers, and body in order to serve the given path.
The Content-Type header will be set to the content_type arg, if provided.
If not provided, the Content-Type will be guessed by the file extension
of the 'path' argument.
If disposition is not None, the Content-Disposition header will be set
to "<disposition>; filename=<name>". If name is None, it will be set
to the basename of path. If disposition is None, no Content-Disposition
header will be written.
"""
response = cherrypy.serving.response
# If path is relative, users should fix it by making path absolute.
# That is, CherryPy should not guess where the application root is.
# It certainly should *not* use cwd (since CP may be invoked from a
# variety of paths). If using tools.staticdir, you can make your relative
# paths become absolute by supplying a value for "tools.staticdir.root".
if not os.path.isabs(path):
msg = "'%s' is not an absolute path." % path
if debug:
cherrypy.log(msg, 'TOOLS.STATICFILE')
raise ValueError(msg)
try:
st = os.stat(path)
except OSError:
if debug:
cherrypy.log('os.stat(%r) failed' % path, 'TOOLS.STATIC')
raise cherrypy.NotFound()
# Check if path is a directory.
if stat.S_ISDIR(st.st_mode):
# Let the caller deal with it as they like.
if debug:
cherrypy.log('%r is a directory' % path, 'TOOLS.STATIC')
raise cherrypy.NotFound()
# Set the Last-Modified response header, so that
# modified-since validation code can work.
response.headers['Last-Modified'] = httputil.HTTPDate(st.st_mtime)
cptools.validate_since()
if content_type is None:
# Set content-type based on filename extension
ext = ""
i = path.rfind('.')
if i != -1:
ext = path[i:].lower()
content_type = mimetypes.types_map.get(ext, None)
if content_type is not None:
response.headers['Content-Type'] = content_type
if debug:
cherrypy.log('Content-Type: %r' % content_type, 'TOOLS.STATIC')
cd = None
if disposition is not None:
if name is None:
name = os.path.basename(path)
cd = '%s; filename="%s"' % (disposition, name)
response.headers["Content-Disposition"] = cd
if debug:
cherrypy.log('Content-Disposition: %r' % cd, 'TOOLS.STATIC')
# Set Content-Length and use an iterable (file object)
# this way CP won't load the whole file in memory
content_length = st.st_size
fileobj = open(path, 'rb')
return _serve_fileobj(fileobj, content_type, content_length, debug=debug)
def serve_fileobj(fileobj, content_type=None, disposition=None, name=None,
debug=False):
"""Set status, headers, and body in order to serve the given file object.
The Content-Type header will be set to the content_type arg, if provided.
If disposition is not None, the Content-Disposition header will be set
to "<disposition>; filename=<name>". If name is None, 'filename' will
not be set. If disposition is None, no Content-Disposition header will
be written.
CAUTION: If the request contains a 'Range' header, one or more seek()s will
be performed on the file object. This may cause undesired behavior if
the file object is not seekable. It could also produce undesired results
if the caller set the read position of the file object prior to calling
serve_fileobj(), expecting that the data would be served starting from that
position.
"""
response = cherrypy.serving.response
try:
st = os.fstat(fileobj.fileno())
except AttributeError:
if debug:
cherrypy.log('os has no fstat attribute', 'TOOLS.STATIC')
content_length = None
else:
# Set the Last-Modified response header, so that
# modified-since validation code can work.
response.headers['Last-Modified'] = httputil.HTTPDate(st.st_mtime)
cptools.validate_since()
content_length = st.st_size
if content_type is not None:
response.headers['Content-Type'] = content_type
if debug:
cherrypy.log('Content-Type: %r' % content_type, 'TOOLS.STATIC')
cd = None
if disposition is not None:
if name is None:
cd = disposition
else:
cd = '%s; filename="%s"' % (disposition, name)
response.headers["Content-Disposition"] = cd
if debug:
cherrypy.log('Content-Disposition: %r' % cd, 'TOOLS.STATIC')
return _serve_fileobj(fileobj, content_type, content_length, debug=debug)
def _serve_fileobj(fileobj, content_type, content_length, debug=False):
"""Internal. Set response.body to the given file object, perhaps ranged."""
response = cherrypy.serving.response
# HTTP/1.0 didn't have Range/Accept-Ranges headers, or the 206 code
request = cherrypy.serving.request
if request.protocol >= (1, 1):
response.headers["Accept-Ranges"] = "bytes"
r = httputil.get_ranges(request.headers.get('Range'), content_length)
if r == []:
response.headers['Content-Range'] = "bytes */%s" % content_length
message = "Invalid Range (first-byte-pos greater than Content-Length)"
if debug:
cherrypy.log(message, 'TOOLS.STATIC')
raise cherrypy.HTTPError(416, message)
if r:
if len(r) == 1:
# Return a single-part response.
start, stop = r[0]
if stop > content_length:
stop = content_length
r_len = stop - start
if debug:
cherrypy.log('Single part; start: %r, stop: %r' % (start, stop),
'TOOLS.STATIC')
response.status = "206 Partial Content"
response.headers['Content-Range'] = (
"bytes %s-%s/%s" % (start, stop - 1, content_length))
response.headers['Content-Length'] = r_len
fileobj.seek(start)
response.body = file_generator_limited(fileobj, r_len)
else:
# Return a multipart/byteranges response.
response.status = "206 Partial Content"
from mimetools import choose_boundary
boundary = choose_boundary()
ct = "multipart/byteranges; boundary=%s" % boundary
response.headers['Content-Type'] = ct
if "Content-Length" in response.headers:
# Delete Content-Length header so finalize() recalcs it.
del response.headers["Content-Length"]
def file_ranges():
# Apache compatibility:
yield ntob("\r\n")
for start, stop in r:
if debug:
cherrypy.log('Multipart; start: %r, stop: %r' % (start, stop),
'TOOLS.STATIC')
yield ntob("--" + boundary, 'ascii')
yield ntob("\r\nContent-type: %s" % content_type, 'ascii')
yield ntob("\r\nContent-range: bytes %s-%s/%s\r\n\r\n"
% (start, stop - 1, content_length), 'ascii')
fileobj.seek(start)
for chunk in file_generator_limited(fileobj, stop-start):
yield chunk
yield ntob("\r\n")
# Final boundary
yield ntob("--" + boundary + "--", 'ascii')
# Apache compatibility:
yield ntob("\r\n")
response.body = file_ranges()
return response.body
else:
if debug:
cherrypy.log('No byteranges requested', 'TOOLS.STATIC')
# Set Content-Length and use an iterable (file object)
# this way CP won't load the whole file in memory
response.headers['Content-Length'] = content_length
response.body = fileobj
return response.body
def serve_download(path, name=None):
"""Serve 'path' as an application/x-download attachment."""
# This is such a common idiom I felt it deserved its own wrapper.
return serve_file(path, "application/x-download", "attachment", name)
def _attempt(filename, content_types, debug=False):
if debug:
cherrypy.log('Attempting %r (content_types %r)' %
(filename, content_types), 'TOOLS.STATICDIR')
try:
# you can set the content types for a
# complete directory per extension
content_type = None
if content_types:
r, ext = os.path.splitext(filename)
content_type = content_types.get(ext[1:], None)
serve_file(filename, content_type=content_type, debug=debug)
return True
except cherrypy.NotFound:
# If we didn't find the static file, continue handling the
# request. We might find a dynamic handler instead.
if debug:
cherrypy.log('NotFound', 'TOOLS.STATICFILE')
return False
def staticdir(section, dir, root="", match="", content_types=None, index="",
debug=False):
"""Serve a static resource from the given (root +) dir.
match
If given, request.path_info will be searched for the given
regular expression before attempting to serve static content.
content_types
If given, it should be a Python dictionary of
{file-extension: content-type} pairs, where 'file-extension' is
a string (e.g. "gif") and 'content-type' is the value to write
out in the Content-Type response header (e.g. "image/gif").
index
If provided, it should be the (relative) name of a file to
serve for directory requests. For example, if the dir argument is
'/home/me', the Request-URI is 'myapp', and the index arg is
'index.html', the file '/home/me/myapp/index.html' will be sought.
"""
request = cherrypy.serving.request
if request.method not in ('GET', 'HEAD'):
if debug:
cherrypy.log('request.method not GET or HEAD', 'TOOLS.STATICDIR')
return False
if match and not re.search(match, request.path_info):
if debug:
cherrypy.log('request.path_info %r does not match pattern %r' %
(request.path_info, match), 'TOOLS.STATICDIR')
return False
# Allow the use of '~' to refer to a user's home directory.
dir = os.path.expanduser(dir)
# If dir is relative, make absolute using "root".
if not os.path.isabs(dir):
if not root:
msg = "Static dir requires an absolute dir (or root)."
if debug:
cherrypy.log(msg, 'TOOLS.STATICDIR')
raise ValueError(msg)
dir = os.path.join(root, dir)
# Determine where we are in the object tree relative to 'section'
# (where the static tool was defined).
if section == 'global':
section = "/"
section = section.rstrip(r"\/")
branch = request.path_info[len(section) + 1:]
branch = unquote(branch.lstrip(r"\/"))
# If branch is "", filename will end in a slash
filename = os.path.join(dir, branch)
if debug:
cherrypy.log('Checking file %r to fulfill %r' %
(filename, request.path_info), 'TOOLS.STATICDIR')
# There's a chance that the branch pulled from the URL might
# have ".." or similar uplevel attacks in it. Check that the final
# filename is a child of dir.
if not os.path.normpath(filename).startswith(os.path.normpath(dir)):
raise cherrypy.HTTPError(403) # Forbidden
handled = _attempt(filename, content_types)
if not handled:
# Check for an index file if a folder was requested.
if index:
handled = _attempt(os.path.join(filename, index), content_types)
if handled:
request.is_index = filename[-1] in (r"\/")
return handled
def staticfile(filename, root=None, match="", content_types=None, debug=False):
"""Serve a static resource from the given (root +) filename.
match
If given, request.path_info will be searched for the given
regular expression before attempting to serve static content.
content_types
If given, it should be a Python dictionary of
{file-extension: content-type} pairs, where 'file-extension' is
a string (e.g. "gif") and 'content-type' is the value to write
out in the Content-Type response header (e.g. "image/gif").
"""
request = cherrypy.serving.request
if request.method not in ('GET', 'HEAD'):
if debug:
cherrypy.log('request.method not GET or HEAD', 'TOOLS.STATICFILE')
return False
if match and not re.search(match, request.path_info):
if debug:
cherrypy.log('request.path_info %r does not match pattern %r' %
(request.path_info, match), 'TOOLS.STATICFILE')
return False
# If filename is relative, make absolute using "root".
if not os.path.isabs(filename):
if not root:
msg = "Static tool requires an absolute filename (got '%s')." % filename
if debug:
cherrypy.log(msg, 'TOOLS.STATICFILE')
raise ValueError(msg)
filename = os.path.join(root, filename)
return _attempt(filename, content_types, debug=debug)

View File

@@ -0,0 +1,49 @@
import sys
import cherrypy
def process_body():
"""Return (params, method) from request body."""
try:
import xmlrpclib
return xmlrpclib.loads(cherrypy.request.body.read())
except Exception:
return ('ERROR PARAMS', ), 'ERRORMETHOD'
def patched_path(path):
"""Return 'path', doctored for RPC."""
if not path.endswith('/'):
path += '/'
if path.startswith('/RPC2/'):
# strip the first /rpc2
path = path[5:]
return path
def _set_response(body):
# The XML-RPC spec (http://www.xmlrpc.com/spec) says:
# "Unless there's a lower-level error, always return 200 OK."
# Since Python's xmlrpclib interprets a non-200 response
# as a "Protocol Error", we'll just return 200 every time.
response = cherrypy.response
response.status = '200 OK'
response.body = body
response.headers['Content-Type'] = 'text/xml'
response.headers['Content-Length'] = len(body)
def respond(body, encoding='utf-8', allow_none=0):
from xmlrpclib import Fault, dumps
if not isinstance(body, Fault):
body = (body,)
_set_response(dumps(body, methodresponse=1,
encoding=encoding,
allow_none=allow_none))
def on_error(*args, **kwargs):
body = str(sys.exc_info()[1])
from xmlrpclib import Fault, dumps
_set_response(dumps(Fault(1, body)))