Source code for pulsar.apps.rpc.handlers

import inspect

from pulsar import HttpException
from pulsar.utils.tools import checkarity

__all__ = ['RpcHandler', 'rpc_method', 'InvalidRequest', 'InvalidParams',
           'NoSuchFunction', 'InternalError']

_exceptions = {}


def rpc_exception(cls):
    global _exceptions
    code = cls.fault_code
    _exceptions[code] = cls
    return cls


@rpc_exception
class InvalidRequest(HttpException):
    status = 400
    fault_code = -32600
    msg = 'Invalid RPC request'

    def __init__(self, msg=None, data=None):
        self.msg = msg or self.msg
        self.data = data


@rpc_exception
class ParseExcetion(InvalidRequest):
    fault_code = -32700
    msg = 'Parse error'


@rpc_exception
class NoSuchFunction(InvalidRequest):
    fault_code = -32601
    msg = 'The method does not exist'


@rpc_exception
class InvalidParams(InvalidRequest):
    fault_code = -32602
    msg = 'Invalid method parameters'


@rpc_exception
class InternalError(InvalidRequest):
    fault_code = -32603
    msg = 'Internal error'


def exception(code, msg):
    global _exceptions
    cls = _exceptions.get(code, Exception)
    raise cls(msg)


[docs]def rpc_method(func, doc=None, format='json', request_handler=None): '''A decorator which exposes a function ``func`` as an rpc function. :param func: The function to expose. :param doc: Optional doc string. If not provided the doc string of ``func`` will be used. :param format: Optional output format. :param request_handler: function which takes ``request``, ``format`` and ``kwargs`` and return a new ``kwargs`` to be passed to ``func``. It can be used to add additional parameters based on request and format. ''' def _(self, *args, **kwargs): request = args[0] if request_handler: kwargs = request_handler(request, format, kwargs) request.format = kwargs.pop('format', format) try: return func(*args, **kwargs) except TypeError: msg = checkarity(func, args, kwargs) if msg: raise InvalidParams('Invalid Parameters. %s' % msg) else: raise _.__doc__ = doc or func.__doc__ _.__name__ = func.__name__ _.FromApi = True return _
class MetaRpcHandler(type): '''A metaclass for rpc handlers. Add a limited ammount of magic to RPC handlers. ''' def __new__(cls, name, bases, attrs): make = super().__new__ if attrs.pop('virtual', None): return make(cls, name, bases, attrs) funcprefix = attrs.get('serve_as') if not funcprefix: for base in bases[::-1]: if isinstance(base, MetaRpcHandler): funcprefix = base.serve_as if funcprefix: break if funcprefix: rpc = set() fprefix = '%s_' % funcprefix n = len(fprefix) for key, method in list(attrs.items()): if hasattr(method, '__call__') and key.startswith(fprefix): method_name = key[n:] rpc.add(method_name) for base in bases[::-1]: if hasattr(base, 'rpc_methods'): rpc.update(base.rpc_methods) attrs['rpc_methods'] = frozenset(rpc) return make(cls, name, bases, attrs)
[docs]class RpcHandler(metaclass=MetaRpcHandler): '''Base class for classes to handle remote procedure calls. ''' serve_as = 'rpc' '''Prefix for class methods providing remote services. Default: ``rpc``.''' separator = '.' '''HTTP method allowed by this handler.''' virtual = True def __init__(self, subhandlers=None, title=None, documentation=None): self._parent = None self.subHandlers = {} self.title = title or self.__class__.__name__ self.documentation = documentation or self.__doc__ if subhandlers: for prefix, handler in subhandlers.items(): if inspect.isclass(handler): handler = handler() self.putSubHandler(prefix, handler) @property def parent(self): '''The parent :class:`RpcHandler` or ``None`` if this is the root handler.''' return self._parent @property def root(self): '''The root :class:`RpcHandler` or ``self`` if this is the root handler.''' return self._parent.root if self._parent is not None else self
[docs] def isroot(self): '''``True`` if this is the root handler.''' return self._parent is None
[docs] def putSubHandler(self, prefix, handler): '''Add a sub :class:`RpcHandler` with prefix ``prefix``. :keyword prefix: a string defining the prefix of the subhandler :keyword handler: the sub-handler. ''' self.subHandlers[prefix] = handler handler._parent = self return self
[docs] def getSubHandler(self, prefix): '''Get a sub :class:`RpcHandler` at ``prefix``.''' return self.subHandlers.get(prefix)
def get_handler(self, method): if not method: raise NoSuchFunction('RPC method not supplied') bits = method.split(self.separator, 1) handler = self method_name = bits[-1] for bit in bits[:-1]: subhandler = handler.getSubHandler(bit) if subhandler is None: method_name = method break else: handler = subhandler if method_name in handler.rpc_methods: return getattr(handler, '%s_%s' % (self.serve_as, method_name)) else: raise NoSuchFunction('RPC method "%s" not available.' % method) def listFunctions(self, prefix=''): for name in sorted(self.rpc_methods): method = getattr(self, '%s_%s' % (self.serve_as, name)) docs = method.__doc__ or 'No docs' doc = {'doc': ' '.join(docs.split('\n')), 'section': prefix} yield '%s%s' % (prefix, name), doc for name, handler in self.subHandlers.items(): pfx = '%s%s%s' % (prefix, name, self.separator) for f, doc in handler.listFunctions(pfx): yield f, doc def _docs(self): for name, data in self.listFunctions(): link = '.. _functions-{0}:'.format(name) title = name under = (2+len(title))*'-' yield '\n'.join((link, '', title, under, '', data['doc'], '\n')) def docs(self): return '\n'.join(self._docs())