Source code for pulsar.async.actor

import sys
import os
import pickle
from itertools import chain
from threading import current_thread

from pulsar import HaltServer, CommandError, MonitorStarted, system
from pulsar.utils.log import WritelnDecorator

from .events import EventHandler
from .proxy import ActorProxy, ActorProxyMonitor, actor_identity
from .mailbox import command_in_context
from .access import get_actor
from .cov import Coverage
from .consts import ACTOR_STATES


__all__ = ['is_actor', 'send', 'spawn',
           'Actor', 'get_stream']


def is_actor(obj):
    return isinstance(obj, Actor)


def get_stream(cfg):
    '''Obtain the python stream handler given a config dictionary.
    '''
    stream = sys.stderr
    return WritelnDecorator(stream)


[docs]def send(target, action, *args, **params): '''Send a :ref:`message <api-remote_commands>` to ``target`` The message is to perform a given ``action``. The actor sending the message is obtained via the :func:`get_actor` function. :parameter target: the :class:`Actor` id or an :class:`.ActorProxy` or name of the target actor which will receive the message. :parameter action: the :ref:`remote command <api-remote_commands>` to perform in the ``target`` :class:`Actor`. :parameter args: positional arguments to pass to the :ref:`remote command <api-remote_commands>` ``action``. :parameter params: dictionary of parameters to pass to :ref:`remote command <api-remote_commands>` ``action``. :return: an :class:`~asyncio.Future` if the action acknowledge the caller or `None`. Typical example:: >>> r = send(p,'ping') >>> r.result() 'pong' ''' actor = get_actor() if not actor: raise RuntimeError('No actor available, cannot send messages') else: return actor.send(target, action, *args, **params)
[docs]def spawn(**kwargs): '''Spawn a new :class:`.Actor` and return an :class:`~asyncio.Future`. **Parameter kwargs** These optional parameters are: * ``aid`` the actor id * ``name`` the actor name * :ref:`actor hooks <actor-hooks>` such as ``start``, ``stopping`` and ``periodic_task`` :return: an :class:`~asyncio.Future`. A typical usage:: >>> def do_something(actor): ... >>> a = spawn(start=do_something, ...) >>> a.aid 'ba42b02b' >>> a.called True >>> p = a.result() >>> p.address ('127.0.0.1', 46691) ''' actor = get_actor() if not actor: raise RuntimeError('No actor available, cannot spawn') else: return actor.spawn(**kwargs)
[docs]class Actor(EventHandler, Coverage): '''The base class for parallel execution in pulsar. In computer science, the **Actor model** is a mathematical model of concurrent computation that treats *actors* as the universal primitives of computation. In response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received. The current implementation allows for actors to perform specific tasks such as listening to a socket, acting as http server, consuming a task queue and so forth. To spawn a new actor:: >>> from pulsar import spawn >>> a = spawn() >>> a.is_alive() True Here ``a`` is actually a reference to the remote actor, it is an :class:`.ActorProxy`. **ATTRIBUTES** .. attribute:: name The name of this :class:`Actor`. .. attribute:: aid Unique ID for this :class:`Actor`. .. attribute:: impl The :class:`.Concurrency` implementation for this :class:`Actor`. .. attribute:: _loop An :ref:`event loop <asyncio-event-loop>` which listen for input/output events on sockets or socket-like objects. It is the driver of the :class:`Actor`. If the :attr:`_loop` stops, the :class:`Actor` stops running and goes out of scope. .. attribute:: mailbox Used to send and receive :ref:`actor messages <tutorials-messages>`. .. attribute:: address The socket address for this :attr:`Actor.mailbox`. .. attribute:: proxy Instance of a :class:`.ActorProxy` holding a reference to this :class:`Actor`. The proxy is a lightweight representation of the actor which can be shared across different processes (i.e. it is picklable). .. attribute:: state The actor :ref:`numeric state <actor-states>`. .. attribute:: extra A dictionary which can be populated with extra parameters useful for other actors. This dictionary is included in the dictionary returned by the :meth:`info` method. Check the :ref:`info command <actor_info_command>` for how to obtain information about an actor. .. attribute:: info_state Current state description string. One of ``initial``, ``running``, ``stopping``, ``closed`` and ``terminated``. .. attribute:: next_periodic_task The :class:`asyncio.Handle` for the next :ref:`actor periodic task <actor-periodic-task>`. .. attribute:: stream A ``stream`` handler to write information messages without using the :attr:`~.AsyncObject.logger`. ''' ONE_TIME_EVENTS = ('start', 'stopping') MANY_TIMES_EVENTS = ('on_info', 'on_params', 'periodic_task') exit_code = None mailbox = None monitor = None next_periodic_task = None def __init__(self, impl): self.state = ACTOR_STATES.INITIAL self.__impl = impl self.servers = {} self.extra = {} self.stream = get_stream(self.cfg) self.tid = current_thread().ident self.pid = os.getpid() hooks = [] for name in chain(self.ONE_TIME_EVENTS, self.MANY_TIMES_EVENTS): hook = impl.params.pop(name, None) if hook: hooks.append((name, hook)) for name, value in impl.params.items(): setattr(self, name, value) del impl.params super().__init__(impl.setup_event_loop(self)) for name, hook in hooks: self.bind_event(name, hook) try: self.cfg.post_fork(self) except Exception: # pragma nocover pass def __repr__(self): return self.impl.unique_name def __str__(self): return self.__repr__() # ############################################################# PROPERTIES @property def name(self): return self.__impl.name @property def aid(self): return self.__impl.aid @property def impl(self): return self.__impl @property def cfg(self): return self.__impl.cfg @property def proxy(self): return ActorProxy(self) @property def address(self): return self.mailbox.address @property def info_state(self): return ACTOR_STATES.DESCRIPTION[self.state] @property def monitors(self): '''Dictionary of monitors or None''' return self.__impl.monitors @property def managed_actors(self): '''Dictionary of managed actors or None''' return self.__impl.managed_actors @property def registered(self): '''Dictionary of registered actors or None''' return self.__impl.registered ####################################################################### # HIGH LEVEL API METHODS #######################################################################
[docs] def start(self, exit=True): '''Called after forking to start the actor's life. This is where logging is configured, the :attr:`mailbox` is registered and the :attr:`_loop` is initialised and started. Calling this method more than once does nothing. ''' if self.state == ACTOR_STATES.INITIAL: self.__impl.before_start(self) self._started = self._loop.time() self._exit = exit self.state = ACTOR_STATES.STARTING self._run()
[docs] def send(self, target, action, *args, **kwargs): '''Send a message to ``target`` to perform ``action`` with given positional ``args`` and key-valued ``kwargs``. Returns a coroutine or a Future. ''' target = self.monitor if target == 'monitor' else target mailbox = self.mailbox if isinstance(target, ActorProxyMonitor): mailbox = target.mailbox else: actor = self.get_actor(target) if isinstance(actor, Actor): # this occur when sending a message from arbiter to monitors or # vice-versa. return command_in_context(action, self, actor, args, kwargs) elif isinstance(actor, ActorProxyMonitor): mailbox = actor.mailbox if hasattr(mailbox, 'request'): # if not mailbox.closed: return mailbox.request(action, self, target, args, kwargs) else: raise CommandError('Cannot execute "%s" in %s. Unknown actor %s.' % (action, self, target))
[docs] def spawn(self, **params): '''Spawn a new actor ''' return self.__impl.spawn(self, **params)
[docs] def stop(self, exc=None, exit_code=None): '''Gracefully stop the :class:`Actor`. Implemented by the :meth:`.Concurrency.stop` method of the :attr:`impl` attribute.''' return self.__impl.stop(self, exc, exit_code)
def add_monitor(self, monitor_name, **params): return self.__impl.add_monitor(self, monitor_name, **params)
[docs] def actorparams(self): '''Returns a dictionary of parameters for spawning actors. The dictionary is passed to the spawn method when creating new actors. Fire the :ref:`on_params actor hook <actor-hooks>`. ''' data = {} self.fire_event('on_params', params=data) return data
# ############################################################## STATES
[docs] def is_running(self): '''``True`` if actor is running, that is when the :attr:`state` is equal to :ref:`ACTOR_STATES.RUN <actor-states>` and the loop is running.''' return self.state == ACTOR_STATES.RUN and self._loop.is_running()
[docs] def started(self): '''``True`` if actor has started. It does not necessarily mean it is running. Its state is greater or equal :ref:`ACTOR_STATES.RUN <actor-states>`. ''' return self.state >= ACTOR_STATES.RUN
[docs] def after_run(self): """``True`` when the actor is sopping or has already stopped """ return self.state > ACTOR_STATES.RUN
[docs] def closed(self): '''``True`` if actor has exited in an clean fashion. Its :attr:`state` is :ref:`ACTOR_STATES.CLOSE <actor-states>`. ''' return self.state == ACTOR_STATES.CLOSE
[docs] def stopped(self): '''``True`` if actor has exited. Its :attr:`state` is greater or equal to :ref:`ACTOR_STATES.CLOSE <actor-states>`. ''' return self.state >= ACTOR_STATES.CLOSE
[docs] def is_arbiter(self): '''``True`` if ``self`` is the ``arbiter``''' return self.__impl.is_arbiter()
[docs] def is_monitor(self): '''``True`` if ``self`` is a ``monitor``''' return self.__impl.is_monitor()
[docs] def is_process(self): '''boolean indicating if this is an actor on a child process.''' return self.__impl.is_process()
def __reduce__(self): raise pickle.PicklingError('{0} - Cannot pickle Actor instances' .format(self)) ####################################################################### # INTERNALS #######################################################################
[docs] def get_actor(self, aid, check_monitor=True): '''Given an actor unique id return the actor proxy. ''' aid = actor_identity(aid) return self.__impl.get_actor(self, aid, check_monitor=check_monitor)
[docs] def info(self): '''Return a nested dictionary of information related to the actor status and performance. The dictionary contains the following entries: * ``actor`` a dictionary containing information regarding the type of actor and its status. * ``events`` a dictionary of information about the :ref:`event loop <asyncio-event-loop>` running the actor. * ``extra`` the :attr:`extra` attribute (you can use it to add stuff). * ``system`` system info. This method is invoked when you run the :ref:`info command <actor_info_command>` from another actor. ''' if not self.started(): return isp = self.is_process() actor = {'name': self.name, 'state': self.info_state, 'actor_id': self.aid, 'uptime': self._loop.time() - self._started, 'thread_id': self.tid, 'process_id': self.pid, 'is_process': isp, 'age': self.impl.age} data = {'actor': actor, 'extra': self.extra} if isp: data['system'] = system.process_info(self.pid) self.fire_event('on_info', info=data) return data
def _run(self, initial=True): exc = None if initial: try: self.cfg.when_ready(self) except Exception: # pragma nocover self.logger.exception('Unhandled exception in when_ready hook') try: exc = self.__impl.run_actor(self) except MonitorStarted: return except (Exception, HaltServer) as exc: return self.stop(exc) except BaseException: pass return self.stop() def _remove_actor(self, actor, log=True): return self.__impl._remove_actor(self, actor, log=log)