'''Actors communicate with each other by sending and receiving messages.
The :mod:`pulsar.async.mailbox` module implements the message passing layer
via a bidirectional socket connections between the :class:`.Arbiter`
and any :class:`.Actor`.
Message sending is asynchronous and safe, the message is guaranteed to
eventually reach the recipient, provided that the recipient exists.
The implementation details are outlined below:
* Messages are sent via the :func:`.send` function, which is a proxy for
the actor :meth:`~.Actor.send` method.
Here is how you ping actor ``abc`` in a coroutine::
from pulsar import send
async def example():
result = await send('abc', 'ping')
* The :class:`.Arbiter` :attr:`~pulsar.Actor.mailbox` is a :class:`.TcpServer`
accepting connections from remote actors.
* The :attr:`.Actor.mailbox` is a :class:`.MailboxClient` of the arbiter
mailbox server.
* When an actor sends a message to another actor, the arbiter mailbox behaves
as a proxy server by routing the message to the targeted actor.
* Communication is bidirectional and there is **only one connection** between
the arbiter and any given actor.
* Messages are encoded and decoded using the unmasked websocket protocol
implemented in :func:`.frame_parser`.
* If, for some reasons, the connection between an actor and the arbiter
get broken, the actor will eventually stop running and garbaged collected.
Implementation
=========================
For the curious this is how the internal protocol is implemented
Protocol
~~~~~~~~~~~~
.. autoclass:: MailboxProtocol
:members:
:member-order: bysource
Client
~~~~~~~~~~~~
.. autoclass:: MailboxClient
:members:
:member-order: bysource
'''
import socket
import pickle
from collections import namedtuple
from pulsar import ProtocolError, CommandError
from pulsar.utils.internet import nice_address
from pulsar.utils.websocket import frame_parser
from pulsar.utils.string import gen_unique_id
from .access import get_actor, isawaitable, create_future, ensure_future
from .futures import task
from .proxy import actor_identity, get_proxy, get_command, ActorProxy
from .protocols import Protocol
from .clients import AbstractClient
CommandRequest = namedtuple('CommandRequest', 'actor caller connection')
def create_aid():
return gen_unique_id()[:8]
async def command_in_context(command, caller, target, args, kwargs,
connection=None):
cmnd = get_command(command)
if not cmnd:
raise CommandError('unknown %s' % command)
request = CommandRequest(target, caller, connection)
result = cmnd(request, args, kwargs)
if isawaitable(result):
result = await result
return result
class ProxyMailbox:
'''A proxy for the arbiter :class:`Mailbox`.
'''
active_connections = 0
def __init__(self, actor):
mailbox = actor.monitor.mailbox
if isinstance(mailbox, ProxyMailbox):
mailbox = mailbox.mailbox
self.mailbox = mailbox
def __repr__(self):
return self.mailbox.__repr__()
def __str__(self):
return self.mailbox.__str__()
def __getattr__(self, name):
return getattr(self.mailbox, name)
def _run(self):
pass
def close(self):
pass
class Message:
'''A message which travels from actor to actor.
'''
def __init__(self, data, waiter=None):
self.data = data
self.waiter = waiter
def __repr__(self):
return self.data.get('command', 'unknown')
__str__ = __repr__
@classmethod
def command(cls, command, sender, target, args, kwargs):
command = get_command(command)
data = {'command': command.__name__,
'sender': actor_identity(sender),
'target': actor_identity(target),
'args': args if args is not None else (),
'kwargs': kwargs if kwargs is not None else {}}
waiter = create_future()
if command.ack:
data['ack'] = create_aid()
else:
waiter.set_result(None)
return cls(data, waiter)
@classmethod
def callback(cls, result, ack):
data = {'command': 'callback', 'result': result, 'ack': ack}
return cls(data)
[docs]class MailboxProtocol(Protocol):
'''The :class:`.Protocol` for internal message passing between actors.
Encoding and decoding uses the unmasked websocket protocol.
'''
def __init__(self, **kw):
super().__init__(**kw)
self._pending_responses = {}
self._parser = frame_parser(kind=2, pyparser=True)
actor = get_actor()
if actor.is_arbiter():
self.bind_event('connection_lost', self._connection_lost)
[docs] def request(self, command, sender, target, args, kwargs):
'''Used by the server to send messages to the client.'''
req = Message.command(command, sender, target, args, kwargs)
self._start(req)
return req.waiter
def data_received(self, data):
# Feed data into the parser
msg = self._parser.decode(data)
while msg:
try:
message = pickle.loads(msg.body)
except Exception as e:
raise ProtocolError('Could not decode message body: %s' % e)
ensure_future(self._on_message(message), loop=self._loop)
msg = self._parser.decode()
########################################################################
# INTERNALS
def _start(self, req):
if req.waiter and 'ack' in req.data:
self._pending_responses[req.data['ack']] = req.waiter
try:
self._write(req)
except Exception as exc:
req.waiter.set_exception(exc)
else:
self._write(req)
def _connection_lost(self, _, exc=None):
if exc:
actor = get_actor()
if actor.is_running():
actor.logger.warning('Connection lost with actor')
async def _on_message(self, message):
actor = get_actor()
command = message.get('command')
ack = message.get('ack')
if command == 'callback':
if not ack:
raise ProtocolError('A callback without id')
try:
pending = self._pending_responses.pop(ack)
except KeyError:
raise KeyError('Callback %s not in pending callbacks' % ack)
pending.set_result(message.get('result'))
else:
try:
target = actor.get_actor(message['target'])
if target is None:
raise CommandError('cannot execute "%s", unknown actor '
'"%s"' % (command, message['target']))
# Get the caller proxy without throwing
caller = get_proxy(actor.get_actor(message['sender']),
safe=True)
if isinstance(target, ActorProxy):
# route the message to the actor proxy
if caller is None:
raise CommandError(
"'%s' got message from unknown '%s'"
% (actor, message['sender']))
result = await actor.send(target, command,
*message['args'],
**message['kwargs'])
else:
result = await command_in_context(command, caller, target,
message['args'],
message['kwargs'],
self)
except CommandError as exc:
self.logger.warning('Command error: %s' % exc)
result = None
except Exception as exc:
self.logger.exception('Unhandled exception')
result = None
if ack:
self._start(Message.callback(result, ack))
def _write(self, req):
obj = pickle.dumps(req.data, protocol=2)
data = self._parser.encode(obj, opcode=2)
try:
self._transport.write(data)
except (socket.error, RuntimeError):
actor = get_actor()
if actor.is_running() and not actor.is_arbiter():
actor.logger.warning('Lost connection with arbiter')
actor._loop.stop()
[docs]class MailboxClient(AbstractClient):
'''Used by actors to send messages to other actors via the arbiter.
'''
protocol_factory = MailboxProtocol
def __init__(self, address, actor, loop):
super().__init__(loop)
self.address = address
self.name = 'Mailbox for %s' % actor
self._connection = None
def connect(self):
return self.create_connection(self.address)
def __repr__(self):
return '%s %s' % (self.name, nice_address(self.address))
@task
async def request(self, command, sender, target, args, kwargs):
# the request method
if self._connection is None:
self._connection = await self.connect()
self._connection.bind_event('connection_lost', self._lost)
req = Message.command(command, sender, target, args, kwargs)
self._connection._start(req)
response = await req.waiter
return response
def close(self):
if self._connection:
self._connection.close()
def start_serving(self): # pragma nocover
pass
def _lost(self, _, exc=None):
# When the connection is lost, stop the event loop
if self._loop.is_running():
self._loop.stop()