Source code for examples.httpbin.manage

'''Pulsar HTTP test application::

    python manage.py

Implementation
======================

.. autoclass:: HttpBin
   :members:
   :member-order: bysource

Server Hooks
===================

This example shows how to use
:ref:`server hooks <setting-section-application-hooks>` to log each request

.. automodule:: examples.httpbin.config
   :members:

'''
import os
import sys
import string
from functools import partial
from itertools import repeat, chain
from random import random
from base64 import b64encode

import pulsar
from pulsar import (HttpRedirect, HttpException, version, JAPANESE, CHINESE,
                    ensure_future)
from pulsar.utils.httpurl import (Headers, ENCODE_URL_METHODS,
                                  ENCODE_BODY_METHODS)
from pulsar.utils.html import escape
from pulsar.apps import wsgi, ws
from pulsar.apps.wsgi import (route, Html, Json, HtmlDocument, GZipMiddleware,
                              String)
from pulsar.utils.structures import MultiValueDict
from pulsar.utils.system import json

METHODS = frozenset(chain((m.lower() for m in ENCODE_URL_METHODS),
                          (m.lower() for m in ENCODE_BODY_METHODS)))
pyversion = '.'.join(map(str, sys.version_info[:3]))
ASSET_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'assets')
FAVICON = os.path.join(ASSET_DIR, 'favicon.ico')
characters = string.ascii_letters + string.digits


def asset(name, mode='r'):
    name = os.path.join(ASSET_DIR, name)
    if os.path.isfile(name):
        with open(name, mode) as file:
            data = file.read()
        return data


class BaseRouter(wsgi.Router):
    ########################################################################
    #    INTERNALS
    def bind_server_event(self, request, event, handler):
        consumer = request.environ['pulsar.connection'].current_consumer()
        consumer.bind_event(event, handler)

    def info_data_response(self, request, **params):
        data = self.info_data(request, **params)
        return Json(data).http_response(request)

    def info_data(self, request, **params):
        headers = self.getheaders(request)
        data = {'method': request.method,
                'headers': headers,
                'pulsar': self.pulsar_info(request)}
        if request.method in ENCODE_URL_METHODS:
            data['args'] = dict(request.url_data)
        else:
            args, files = request.data_and_files()
            jfiles = MultiValueDict()
            if files:
                for name, parts in files.lists():
                    for part in parts:
                        try:
                            part = part.string()
                        except UnicodeError:
                            part = part.base64()
                        jfiles[name] = part
            if isinstance(args, MultiValueDict):
                args = dict(args)

            data.update((('args', args),
                         ('files', dict(jfiles))))
        data.update(params)
        return data

    def getheaders(self, request):
        headers = Headers(kind='client')
        for k in request.environ:
            if k.startswith('HTTP_'):
                headers[k[5:].replace('_', '-')] = request.environ[k]
        return dict(headers)

    def pulsar_info(self, request):
        return request.get('pulsar.connection').info()


[docs]class HttpBin(BaseRouter): '''The main :class:`.Router` for the HttpBin application '''
[docs] def get(self, request): '''The home page of this router''' ul = Html('ul') for router in sorted(self.routes, key=lambda r: r.creation_count): a = router.link(escape(router.route.path)) a.addClass(router.name) for method in METHODS: if router.getparam(method): a.addClass(method) li = Html('li', a, ' %s' % router.getparam('title', '')) ul.append(li) title = 'Pulsar' html = request.html_document html.head.title = title html.head.links.append('httpbin.css') html.head.links.append('favicon.ico', rel="icon", type='image/x-icon') html.head.scripts.append('httpbin.js') ul = ul.render(request) templ = asset('template.html') body = templ % (title, JAPANESE, CHINESE, version, pyversion, ul) html.body.append(body) return html.http_response(request)
def head(self, request): return self.get(request) @route(title='Returns GET data') def get_get(self, request): return self.info_data_response(request) @route(title='Returns POST data') def post_post(self, request): return self.info_data_response(request) @route(title='Returns Post bytes data') def post_post_chunks(self, request): data, _ = request.data_and_files() content_type = request.get('CONTENT_TYPE') request.response.content_type = content_type request.response.content = data return request.response @route(title='Returns PATCH data') def patch_patch(self, request): return self.info_data_response(request) @route(title='Returns PUT data') def put_put(self, request): return self.info_data_response(request) @route(title='Returns DELETE data') def delete_delete(self, request): return self.info_data_response(request) @route('redirect/<int(min=1,max=10):times>', defaults={'times': 5}, title='302 Redirect n times') def redirect(self, request): num = request.urlargs['times'] - 1 if num: raise HttpRedirect('/redirect/%s' % num) else: raise HttpRedirect('/get') @route('getsize/<int(min=1,max=8388608):size>', defaults={'size': 150000}, title='Returns a preset size of data (limit at 8MB)') def getsize(self, request): size = request.urlargs['size'] data = {'size': size, 'data': 'd' * size} return self.info_data_response(request, **data) @route(title='Returns gzip encoded data') def gzip(self, request): response = self.info_data_response(request, gzipped=True) return GZipMiddleware(10)(request.environ, response) @route(title='Returns cookie data') def cookies(self, request): cookies = request.cookies d = dict(((c.key, c.value) for c in cookies.values())) return Json({'cookies': d}).http_response(request) @route('cookies/set/<name>/<value>', title='Sets a simple cookie', defaults={'name': 'package', 'value': 'pulsar'}) def request_cookies_set(self, request): key = request.urlargs['name'] value = request.urlargs['value'] request.response.set_cookie(key, value=value) request.response.status_code = 302 request.response.headers['location'] = '/cookies' return request.response @route('status/<int(min=100,max=505):status>', title='Returns given HTTP Status code', defaults={'status': 418}) def status(self, request): request.response.content_type = 'text/html' msg = request.url_data.get('message', 'test error') raise HttpException(msg, status=request.urlargs['status']) @route(title='Returns response headers') def response_headers(self, request): class Gen: headers = None def __call__(self, server, **kw): self.headers = server.headers def generate(self): # yield a byte so that headers are sent yield b'' # we must have the headers now yield json.dumps(dict(self.headers)) gen = Gen() self.bind_server_event(request, 'on_headers', gen) request.response.content = gen.generate() request.response.content_type = 'application/json' return request.response @route('basic-auth/<username>/<password>', title='Challenges HTTPBasic Auth', defaults={'username': 'username', 'password': 'password'}) def challenge_auth(self, request): auth = request.get('http.authorization') if auth and auth.authenticated(request.environ, **request.urlargs): return Json({'authenticated': True, 'username': auth.username}).http_response(request) raise wsgi.HttpAuthenticate('basic') @route('digest-auth/<username>/<password>/<qop>', title='Challenges HTTP Digest Auth', defaults={'username': 'username', 'password': 'password', 'qop': 'auth'}) def challenge_digest_auth(self, request): auth = request.get('http.authorization') if auth and auth.authenticated(request.environ, **request.urlargs): return Json({'authenticated': True, 'username': auth.username}).http_response(request) raise wsgi.HttpAuthenticate('digest', qop=[request.urlargs['qop']]) @route('stream/<int(min=1):m>/<int(min=1):n>', title='Stream m chunk of data n times', defaults={'m': 300, 'n': 20}) def request_stream(self, request): m = request.urlargs['m'] n = request.urlargs['n'] request.response.content_type = 'text/plain' request.response.content = repeat(b'a' * m, n) return request.response @route(title='A web socket graph') def websocket(self, request): data = open(os.path.join(os.path.dirname(__file__), 'assets', 'websocket.html')).read() scheme = 'wss' if request.is_secure else 'ws' host = request.get('HTTP_HOST') data = data % {'address': '%s://%s/graph-data' % (scheme, host)} request.response.content_type = 'text/html' request.response.content = data return request.response @route(title='Live server statistics')
[docs] def stats(self, request): '''Live stats for the server. Try sending lots of requests ''' # scheme = 'wss' if request.is_secure else 'ws' # host = request.get('HTTP_HOST') # address = '%s://%s/stats' % (scheme, host) doc = HtmlDocument(title='Live server stats', media_path='/assets/') # docs.head.scripts return doc.http_response(request)
@route('clip/<int(min=256,max=16777216):chunk_size>', defaults={'chunk_size': 4096}, title='Show a video clip') def clip(self, request): c = request.urlargs['chunk_size'] filepath = os.path.join(ASSET_DIR, 'clip.mp4') return wsgi.file_response(request, filepath, c) @route('servername', title='display the server name') def servername(self, request): name = request.get('SERVER_NAME') return String(name, '\n').http_response(request) @route(title="Pulsar is several languages") def get_pulsar(self, request): data = [ 'pulsar', pulsar.JAPANESE, pulsar.CHINESE, pulsar.HINDI ] return Json(data).http_response(request) ######################################################################## # BENCHMARK ROUTES @route() def json(self, request): return Json({'message': "Hello, World!"}).http_response(request) @route() def plaintext(self, request): return String('Hello, World!').http_response(request)
class Upload(BaseRouter): response_content_types = ['multipart/form-data'] def put(self, request): return ensure_future(self._async_put(request)) async def _async_put(self, request): headers = self.getheaders(request) data = {'method': request.method, 'headers': headers, 'pulsar': self.pulsar_info(request), 'args': MultiValueDict(), 'files': MultiValueDict()} request.cache.response_data = data await request.data_and_files(stream=partial(self.stream, request)) data['args'] = dict(data['args']) data['files'] = dict(data['files']) return Json(data).http_response(request) def stream(self, request, part): if request.cache.current_data is not part: request.cache.current_data = part request.cache.current_data_buffer = [] request.cache.current_data_buffer.append(part.recv()) if part.complete(): data_store = request.cache.response_data data = b''.join(request.cache.current_data_buffer) store = data_store['args'] if part.is_file(): store = data_store['files'] try: data.decode('utf-8') except UnicodeError: data = b64encode(data) store[part.name] = data.decode('utf-8') class ExpectFail(BaseRouter): def post(self, request): request.get('wsgi.input').fail() class Graph(ws.WS): def on_message(self, websocket, msg): websocket.write(json.dumps([(i, random()) for i in range(100)])) class Site(wsgi.LazyWsgi): def setup(self, environ): router = HttpBin('/') return wsgi.WsgiHandler([ExpectFail('expect'), Upload('upload'), wsgi.wait_for_body_middleware, wsgi.clean_path_middleware, wsgi.authorization_middleware, wsgi.MediaRouter('media', ASSET_DIR, show_indexes=True), ws.WebSocket('/graph-data', Graph()), router]) def server(description=None, **kwargs): description = description or 'Pulsar HttpBin' return wsgi.WSGIServer(Site(), description=description, **kwargs) if __name__ == '__main__': # pragma nocover server().start()