Source code for pulsar.apps.rpc.jsonrpc

import sys
import json
import logging
import asyncio
from collections import namedtuple

from pulsar import AsyncObject, as_coroutine, new_event_loop, ensure_future
from pulsar.utils.string import gen_unique_id
from pulsar.utils.tools import checkarity
from pulsar.apps.wsgi import Json
from pulsar.apps.http import HttpClient

from .handlers import RpcHandler, InvalidRequest, exception


__all__ = ['JSONRPC', 'JsonProxy', 'JsonBatchProxy']


logger = logging.getLogger('pulsar.jsonrpc')

BatchResponse = namedtuple('BatchResponse', 'id result exception')


[docs]class JSONRPC(RpcHandler): '''An :class:`.RpcHandler` for JSON-RPC services. Design to comply with the `JSON-RPC 2.0`_ Specification. JSON-RPC is a lightweight remote procedure call protocol designed to be simple. A remote method is invoked by sending a request to a remote service, the request is a single object serialised using JSON. .. _`JSON-RPC 2.0`: http://www.jsonrpc.org/specification ''' version = '2.0' def __call__(self, request): return ensure_future(self._execute_request(request)) async def _execute_request(self, request): response = request.response try: data = await as_coroutine(request.body_data()) except ValueError: res, status = self._get_error_and_status(InvalidRequest( status=415, msg='Content-Type must be application/json')) else: # if it's batch request if isinstance(data, list): status = 200 tasks = [self._call(request, each) for each in data] result = await asyncio.gather(*tasks) res = [r[0] for r in result] else: res, status = await self._call(request, data) response.status_code = status return Json(res).http_response(request) async def _call(self, request, data): exc_info = None proc = None try: if (not isinstance(data, dict) or data.get('jsonrpc') != self.version or 'id' not in data): raise InvalidRequest( 'jsonrpc must be supplied and equal to "%s"' % self.version ) params = data.get('params') if isinstance(params, dict): args, kwargs = (), params else: args, kwargs = tuple(params or ()), {} # proc = self.get_handler(data.get('method')) result = await as_coroutine(proc(request, *args, **kwargs)) except Exception as exc: result = exc exc_info = sys.exc_info() else: try: json.dumps(result) except Exception as exc: result = exc exc_info = sys.exc_info() # if exc_info: if isinstance(result, TypeError) and proc: msg = checkarity(proc, args, kwargs, discount=1) else: msg = None rpc_id = data.get('id') if isinstance(data, dict) else None res, status = self._get_error_and_status( result, msg=msg, rpc_id=rpc_id, exc_info=exc_info) else: res = { 'id': data.get('id'), 'jsonrpc': self.version, 'result': result } status = 200 return res, status def _get_error_and_status(self, exc, msg=None, rpc_id=None, exc_info=None): res = {'id': rpc_id, 'jsonrpc': self.version} code = getattr(exc, 'fault_code', None) if not code: code = -32602 if msg else -32603 msg = msg or str(exc) or 'JSON RPC exception' if code == -32603: logger.error(msg, exc_info=exc_info) else: logger.warning(msg) res['error'] = { 'code': code, 'message': msg, 'data': getattr(exc, 'data', '') } return res, getattr(exc, 'status', 400)
class JsonCall: slots = ('_client', '_name') def __init__(self, client, name): self._client = client self._name = name def __repr__(self): return self._name __str__ = __repr__ @property def url(self): return self._client.url @property def name(self): return self._name def __getattr__(self, name): name = "%s%s%s" % (self._name, self._client.separator, name) return self.__class__(self._client, name) def __call__(self, *args, **kwargs): result = self._client._call(self._name, *args, **kwargs) if self._client.sync: return self._client._loop.run_until_complete(result) else: return result
[docs]class JsonProxy(AsyncObject): '''A python Proxy class for :class:`.JSONRPC` Servers. :param url: server location :param version: JSON-RPC server version. Default ``2.0`` :param id: optional request id, generated if not provided. Default ``None``. :param data: Extra data to include in all requests. Default ``None``. :param full_response: return the full Http response rather than just the content. :param http: optional http client. If provided it must have the ``request`` method available which must be of the form:: http.request(url, body=..., method=...) Default ``None``. :param encoding: encoding of the request. Default ``ascii``. Lets say your RPC server is running at ``http://domain.name.com/``:: >>> a = JsonProxy('http://domain.name.com/') >>> a.add(3,4) 7 >>> a.ping() 'pong' ''' separator = '.' default_version = '2.0' default_timeout = 30 def __init__(self, url, version=None, data=None, full_response=False, http=None, timeout=None, sync=False, loop=None, encoding='ascii', **kw): self.sync = sync self._url = url self._version = version or self.__class__.default_version self._full_response = full_response self._data = data if data is not None else {} if not http: timeout = timeout if timeout is not None else self.default_timeout if sync and not loop: loop = new_event_loop() http = HttpClient(timeout=timeout, loop=loop, **kw) http.headers['accept'] = 'application/json, text/*; q=0.5' http.headers['content-type'] = 'application/json' self._http = http self._encoding = encoding @property def url(self): return self._url @property def version(self): return self._version @property def _loop(self): return self._http._loop
[docs] def makeid(self): '''Can be re-implemented by your own Proxy''' return gen_unique_id()
def __repr__(self): return '%s(%s)' % (self.__class__.__name__, self.__url) def __str__(self): return self.__repr__() def __getattr__(self, name): return JsonCall(self, name) async def _call(self, name, *args, **kwargs): data = self._get_data(name, *args, **kwargs) is_ascii = self._encoding == 'ascii' body = json.dumps(data, ensure_ascii=is_ascii).encode(self._encoding) resp = await self._http.post(self._url, data=body) if self._full_response: return resp else: content = resp.json() if resp.is_error: if 'error' not in content: resp.raise_for_status() return self.loads(content) def _get_data(self, func_name, *args, **kwargs): id = self.makeid() params = self.get_params(*args, **kwargs) data = {'method': func_name, 'params': params, 'id': id, 'jsonrpc': self._version} return data
[docs] def get_params(self, *args, **kwargs): ''' Create an array or positional or named parameters Mixing positional and named parameters in one call is not possible. ''' kwargs.update(self._data) if args and kwargs: raise ValueError('Cannot mix positional and named parameters') if args: return list(args) else: return kwargs
@staticmethod def loads(obj): if isinstance(obj, dict): if 'error' in obj: error = obj['error'] raise exception(error.get('code'), error.get('message')) else: return obj.get('result') return obj
class JsonBatchProxy(JsonProxy): """A python Proxy class for :class:`.JSONRPC` Servers implementing batch protocol. :param url: server location :param version: JSON-RPC server version. Default ``2.0`` :param id: optional request id, generated if not provided. Default ``None``. :param data: Extra data to include in all requests. Default ``None``. :param full_response: return the full Http response rather than just the response generator. :param http: optional http client. If provided it must have the ``request`` method available which must be of the form:: http.request(url, body=..., method=...) Default ``None``. :return: generator that returns batch response (a named tuple 'id result exception'). If ``full_response`` is True, then returns Http response. Lets say your RPC server is running at ``http://domain.name.com/``:: >>> a = JsonBatchProxy('http://domain.name.com/') >>> a.add(3,4) 'i86863002653c42278d7c5ff7506d84c7' >>> a.ping() 'i71a9b79eef9b48eea2fb9d691c8e897e' >>> for each in (await a): >>> print(each.id, each.result, each.exception) """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._batch = [] def discard(self): """Clear pool of batch requests.""" self._batch = [] def __len__(self): return len(self._batch) def _call(self, name, *args, **kwargs): data = self._get_data(name, *args, **kwargs) is_ascii = self._encoding == 'ascii' body = json.dumps(data, ensure_ascii=is_ascii).encode(self._encoding) self._batch.append(body) return data['id'] async def __call__(self): if not self._batch: return resp = await self._http.post( self._url, data=b'[' + b','.join(self._batch) + b']') if self._full_response: self.discard() return resp else: content = resp.json() if resp.is_error: if 'error' not in content: resp.raise_for_status() self.discard() return self._response_gen(content) @staticmethod def _response_gen(content): if not isinstance(content, list): content = [content] for resp in content: try: yield BatchResponse( id=resp['id'], result=JsonBatchProxy.loads(resp), exception=None ) except Exception as err: yield BatchResponse( id=resp['id'], result=None, exception=err ) def __iter__(self): return self()