Source code for pulsar.async.proxy

from pulsar import CommandNotFound
from pulsar.utils.pep import default_timer

import signal

from .futures import create_future, chain_future
from .consts import ACTOR_ACTION_TIMEOUT


__all__ = ['ActorProxy',
           'ActorProxyMonitor',
           'get_proxy',
           'command',
           'get_command']

global_commands_table = {}


def get_proxy(obj, safe=False):
    if isinstance(obj, ActorProxy):
        return obj
    elif hasattr(obj, 'proxy'):
        return get_proxy(obj.proxy)
    else:
        if safe:
            return None
        else:
            raise ValueError('"%s" is not an actor or actor proxy.' % obj)


def actor_identity(actor):
    return actor.aid if hasattr(actor, 'aid') else actor


def get_command(name):
    '''Get the command function *name*'''
    command = global_commands_table.get(name.lower())
    if not command:
        raise CommandNotFound(name)
    return command


[docs]class command: '''Decorator for pulsar command functions. :parameter ack: ``True`` if the command acknowledge the sender with a response. Usually is set to ``True`` (which is also the default value). ''' def __init__(self, ack=True): self.ack = ack def __call__(self, f): self.name = f.__name__.lower() def command_function(request, args, kwargs): return f(request, *args, **kwargs) command_function.ack = self.ack command_function.__name__ = self.name command_function.__doc__ = f.__doc__ global_commands_table[self.name] = command_function return command_function
def actor_proxy_future(aid, future=None): self = create_future() if isinstance(aid, ActorProxyMonitor): assert future is None aid.callback = self self.aid = aid.aid else: self.aid = aid chain_future(future, next=self) return self
[docs]class ActorProxy: '''A proxy for a remote :class:`.Actor`. This is a lightweight class which delegates function calls to the underlying remote object. It is picklable and therefore can be send from actor to actor using :ref:`actor message passing <tutorials-messages>`. For example, lets say we have a proxy ``a``, to send a message to it:: from pulsar import send send(a, 'echo', 'hello there!') will send the :ref:`command <actor_commands>` ``echo`` to actor ``a`` with parameter ``"hello there!"``. .. attribute:: aid Unique ID for the remote :class:`.Actor` .. attribute:: address the socket address of the underlying :attr:`.Actor.mailbox`. ''' def __init__(self, impl): self.aid = impl.aid self.name = impl.name self.cfg = impl.cfg self.address = getattr(impl, 'address', None) def __repr__(self): return '%s(%s)' % (self.name, self.aid) __str__ = __repr__ @property def proxy(self): return self def __eq__(self, o): o = get_proxy(o, True) return o and self.aid == o.aid def __ne__(self, o): return not self.__eq__(o)
[docs]class ActorProxyMonitor(ActorProxy): '''A specialised :class:`.ActorProxy` class. It contains additional information about the remote underlying :class:`.Actor`. Instances of this class serialise into :class:`.ActorProxy`. The :class:`.ActorProxyMonitor` is special since it lives in the :class:`.Arbiter` domain and it is used by the :class:`.Arbiter` (or a :class:`.Monitor`) to monitor the state of the spawned actor. .. attribute:: impl The :class:`.Concurrency` instance for the remote :class:`.Actor`. This dictionary is constantly updated by the remote actor by sending the :ref:`info message <actor_info_command>`. .. attribute:: info Dictionary of information regarding the remote :class:`.Actor` .. attribute:: mailbox This is the connection with the remote actor. It is available once the :ref:`actor handshake <handshake>` between the actor and the monitor has completed. The :attr:`mailbox` is a server-side :class:`.MailboxProtocol` instance and it is used by the :func:`.send` function to send messages to the remote actor. ''' monitor = None def __init__(self, impl): self.impl = impl self.info = {} self.mailbox = None self.callback = None self.spawning_start = None self.stopping_start = None super().__init__(impl) @property def notified(self): '''Last time this :class:`.ActorProxyMonitor` was notified by the remote actor.''' return self.info.get('last_notified') @property def proxy(self): '''The :class:`.ActorProxy` for this monitor.''' return ActorProxy(self) def __reduce__(self): return self.proxy.__reduce__()
[docs] def is_alive(self): '''``True`` if underlying actor is alive. ''' return self.impl.is_alive()
[docs] def kill(self): '''Terminate life of underlying actor. ''' self.impl.kill(signal.SIGKILL)
def stop(self, sig=None): sig = sig or signal.SIGTERM self.impl.kill(sig)
[docs] def join(self, timeout=None): '''Wait until the underlying actor terminates. If ``timeout`` is provided, it raises an exception if the timeout is reached. ''' self.impl.join(timeout=timeout)
[docs] def start(self): '''Start the remote actor. ''' self.spawning_start = default_timer() self.impl.start()
def should_be_alive(self): if not self.mailbox: return default_timer() - self.spawning_start > ACTOR_ACTION_TIMEOUT else: return True def should_terminate(self): if self.stopping_start is None: self.stopping_start = default_timer() return False else: dt = default_timer() - self.stopping_start return dt if dt >= ACTOR_ACTION_TIMEOUT else False