Source code for examples.calculator.manage
'''\
This is a a :ref:`JSON-RPC <apps-rpc>` server with some simple functions.
To run the server type::
    python manage.py
Open a new shell and launch python and type::
    >>> from pulsar.apps import rpc
    >>> p = rpc.JsonProxy('http://localhost:8060')
    >>> p.ping()
    'pong'
    >>> p.functions_list()
    [[...
    >>> p.calc.add(3,4)
    7.0
Implementation
-----------------
The calculator rpc functions are implemented by the :class:`Calculator`
handler, while the :class:`Root` handler exposes utility methods from
the :class:`.PulsarServerCommands` handler.
.. autoclass:: Calculator
   :members:
   :member-order: bysource
.. autoclass:: Root
   :members:
   :member-order: bysource
.. autoclass:: Site
   :members:
   :member-order: bysource
'''
from random import normalvariate
from pulsar import as_coroutine
from pulsar.apps import rpc, wsgi
from pulsar.utils.httpurl import JSON_CONTENT_TYPES
def divide(request, a, b):
    '''Divide two numbers.
    This method illustrate how to use the :func:`.rpc_method` decorator.'''
    return float(a)/float(b)
def request_handler(request, format, kwargs):
    '''Dummy request handler for test coverage
    '''
    return kwargs
def randompaths(request, num_paths=1, size=250, mu=0, sigma=1):
    '''Lists of random walks.'''
    r = []
    for p in range(num_paths):
        v = 0
        path = [v]
        r.append(path)
        for t in range(size):
            v += normalvariate(mu, sigma)
            path.append(v)
    return r
class RequestCheck:
    async def __call__(self, request, name):
        data = await as_coroutine(request.body_data())
        assert(data['method'] == name)
        return True
[docs]class Root(rpc.PulsarServerCommands):
    '''Add two rpc methods for testing to the :class:`.PulsarServerCommands`
    handler.
    '''
[docs]    def rpc_dodgy_method(self, request):
        '''This method will fails because the return object is not
        json serialisable.'''
        return Calculator 
    rpc_check_request = RequestCheck() 
[docs]class Calculator(rpc.JSONRPC):
    '''A :class:`.JSONRPC` handler which implements few simple
    remote methods.
    '''
[docs]    def rpc_add(self, request, a, b):
        '''Add two numbers'''
        return float(a) + float(b) 
[docs]    def rpc_subtract(self, request, a, b):
        '''Subtract two numbers'''
        return float(a) - float(b) 
[docs]    def rpc_multiply(self, request, a, b):
        '''Multiply two numbers'''
        return float(a) * float(b) 
    rpc_divide = rpc.rpc_method(divide, request_handler=request_handler)
    rpc_randompaths = rpc.rpc_method(randompaths) 
[docs]class Site(wsgi.LazyWsgi):
    '''WSGI handler for the RPC server'''
[docs]    def setup(self, environ):
        '''Called once to setup the list of wsgi middleware.'''
        json_handler = Root().putSubHandler('calc', Calculator())
        middleware = wsgi.Router('/', post=json_handler,
                                 accept_content_types=JSON_CONTENT_TYPES)
        response = [wsgi.GZipMiddleware(200)]
        return wsgi.WsgiHandler(middleware=[wsgi.wait_for_body_middleware,
                                            middleware],
                                response_middleware=response)  
def server(callable=None, **params):
    return wsgi.WSGIServer(Site(), **params)
if __name__ == '__main__':  # pragma nocover
    server().start()