Source code for pulsar.async.mailbox

'''Actors communicate with each other by sending and receiving messages.
The :mod:`pulsar.async.mailbox` module implements the message passing layer
via a bidirectional socket connections between the :class:`.Arbiter`
and any :class:`.Actor`.

Message sending is asynchronous and safe, the message is guaranteed to
eventually reach the recipient, provided that the recipient exists.

The implementation details are outlined below:

* Messages are sent via the :func:`.send` function, which is a proxy for
  the actor :meth:`~.Actor.send` method.
  Here is how you ping actor ``abc`` in a coroutine::

      from pulsar import send

      async def example():
          result = await send('abc', 'ping')

* The :class:`.Arbiter` :attr:`~pulsar.Actor.mailbox` is a :class:`.TcpServer`
  accepting connections from remote actors.
* The :attr:`.Actor.mailbox` is a :class:`.MailboxClient` of the arbiter
  mailbox server.
* When an actor sends a message to another actor, the arbiter mailbox behaves
  as a proxy server by routing the message to the targeted actor.
* Communication is bidirectional and there is **only one connection** between
  the arbiter and any given actor.
* Messages are encoded and decoded using the unmasked websocket protocol
  implemented in :func:`.frame_parser`.
* If, for some reasons, the connection between an actor and the arbiter
  get broken, the actor will eventually stop running and garbaged collected.

  For the curious this is how the internal protocol is implemented


.. autoclass:: MailboxProtocol
  :member-order: bysource


.. autoclass:: MailboxClient
  :member-order: bysource

import socket
import pickle
from collections import namedtuple

from pulsar import ProtocolError, CommandError
from pulsar.utils.internet import nice_address
from pulsar.utils.websocket import frame_parser
from pulsar.utils.string import gen_unique_id

from .access import get_actor, isawaitable, create_future, ensure_future
from .futures import task
from .proxy import actor_identity, get_proxy, get_command, ActorProxy
from .protocols import Protocol
from .clients import AbstractClient

CommandRequest = namedtuple('CommandRequest', 'actor caller connection')

def create_aid():
    return gen_unique_id()[:8]

async def command_in_context(command, caller, target, args, kwargs,
    cmnd = get_command(command)
    if not cmnd:
        raise CommandError('unknown %s' % command)
    request = CommandRequest(target, caller, connection)
    result = cmnd(request, args, kwargs)
    if isawaitable(result):
        result = await result
    return result

class ProxyMailbox:
    '''A proxy for the arbiter :class:`Mailbox`.
    active_connections = 0

    def __init__(self, actor):
        mailbox = actor.monitor.mailbox
        if isinstance(mailbox, ProxyMailbox):
            mailbox = mailbox.mailbox
        self.mailbox = mailbox

    def __repr__(self):
        return self.mailbox.__repr__()

    def __str__(self):
        return self.mailbox.__str__()

    def __getattr__(self, name):
        return getattr(self.mailbox, name)

    def _run(self):

    def close(self):

class Message:
    '''A message which travels from actor to actor.
    def __init__(self, data, waiter=None): = data
        self.waiter = waiter

    def __repr__(self):
        return'command', 'unknown')
    __str__ = __repr__

    def command(cls, command, sender, target, args, kwargs):
        command = get_command(command)
        data = {'command': command.__name__,
                'sender': actor_identity(sender),
                'target': actor_identity(target),
                'args': args if args is not None else (),
                'kwargs': kwargs if kwargs is not None else {}}
        waiter = create_future()
        if command.ack:
            data['ack'] = create_aid()
        return cls(data, waiter)

    def callback(cls, result, ack):
        data = {'command': 'callback', 'result': result, 'ack': ack}
        return cls(data)

[docs]class MailboxProtocol(Protocol): '''The :class:`.Protocol` for internal message passing between actors. Encoding and decoding uses the unmasked websocket protocol. ''' def __init__(self, **kw): super().__init__(**kw) self._pending_responses = {} self._parser = frame_parser(kind=2, pyparser=True) actor = get_actor() if actor.is_arbiter(): self.bind_event('connection_lost', self._connection_lost)
[docs] def request(self, command, sender, target, args, kwargs): '''Used by the server to send messages to the client.''' req = Message.command(command, sender, target, args, kwargs) self._start(req) return req.waiter
def data_received(self, data): # Feed data into the parser msg = self._parser.decode(data) while msg: try: message = pickle.loads(msg.body) except Exception as e: raise ProtocolError('Could not decode message body: %s' % e) ensure_future(self._on_message(message), loop=self._loop) msg = self._parser.decode() ######################################################################## # INTERNALS def _start(self, req): if req.waiter and 'ack' in self._pending_responses[['ack']] = req.waiter try: self._write(req) except Exception as exc: req.waiter.set_exception(exc) else: self._write(req) def _connection_lost(self, _, exc=None): if exc: actor = get_actor() if actor.is_running(): actor.logger.warning('Connection lost with actor') async def _on_message(self, message): actor = get_actor() command = message.get('command') ack = message.get('ack') if command == 'callback': if not ack: raise ProtocolError('A callback without id') try: pending = self._pending_responses.pop(ack) except KeyError: raise KeyError('Callback %s not in pending callbacks' % ack) pending.set_result(message.get('result')) else: try: target = actor.get_actor(message['target']) if target is None: raise CommandError('cannot execute "%s", unknown actor ' '"%s"' % (command, message['target'])) # Get the caller proxy without throwing caller = get_proxy(actor.get_actor(message['sender']), safe=True) if isinstance(target, ActorProxy): # route the message to the actor proxy if caller is None: raise CommandError( "'%s' got message from unknown '%s'" % (actor, message['sender'])) result = await actor.send(target, command, *message['args'], **message['kwargs']) else: result = await command_in_context(command, caller, target, message['args'], message['kwargs'], self) except CommandError as exc: self.logger.warning('Command error: %s' % exc) result = None except Exception as exc: self.logger.exception('Unhandled exception') result = None if ack: self._start(Message.callback(result, ack)) def _write(self, req): obj = pickle.dumps(, protocol=2) data = self._parser.encode(obj, opcode=2) try: self._transport.write(data) except (socket.error, RuntimeError): actor = get_actor() if actor.is_running() and not actor.is_arbiter(): actor.logger.warning('Lost connection with arbiter') actor._loop.stop()
[docs]class MailboxClient(AbstractClient): '''Used by actors to send messages to other actors via the arbiter. ''' protocol_factory = MailboxProtocol def __init__(self, address, actor, loop): super().__init__(loop) self.address = address = 'Mailbox for %s' % actor self._connection = None def connect(self): return self.create_connection(self.address) def __repr__(self): return '%s %s' % (, nice_address(self.address)) @task async def request(self, command, sender, target, args, kwargs): # the request method if self._connection is None: self._connection = await self.connect() self._connection.bind_event('connection_lost', self._lost) req = Message.command(command, sender, target, args, kwargs) self._connection._start(req) response = await req.waiter return response def close(self): if self._connection: self._connection.close() def start_serving(self): # pragma nocover pass def _lost(self, _, exc=None): # When the connection is lost, stop the event loop if self._loop.is_running(): self._loop.stop()