Source code for pulsar.async.concurrency

import os
import sys
import itertools
import asyncio
import pickle
from time import time
from collections import OrderedDict
from multiprocessing import current_process
from multiprocessing.reduction import ForkingPickler

import pulsar
from pulsar import system, MonitorStarted, HaltServer, Config
from pulsar.utils.log import logger_fds
from pulsar.utils.tools import Pidfile
from pulsar.utils import autoreload

from .proxy import ActorProxyMonitor, get_proxy, actor_proxy_future
from .access import get_actor, set_actor, logger, EventLoopPolicy
from .threads import Thread
from .mailbox import MailboxClient, MailboxProtocol, ProxyMailbox, create_aid
from .futures import ensure_future, add_errback, chain_future, create_future
from .protocols import TcpServer
from .actor import Actor
from .consts import (ACTOR_STATES, ACTOR_TIMEOUT_TOLE, MIN_NOTIFY, MAX_NOTIFY,
                     MONITOR_TASK_PERIOD)
from .process import ProcessMixin, signal_from_exitcode

__all__ = ['arbiter']


SUBPROCESS = os.path.join(os.path.dirname(__file__), '_subprocess.py')


[docs]def arbiter(**params): '''Obtain the ``arbiter``. It returns the arbiter instance only if we are on the arbiter context domain, otherwise it returns nothing. ''' arbiter = get_actor() if arbiter is None: # Create the arbiter return set_actor(_spawn_actor('arbiter', None, **params)) elif arbiter.is_arbiter(): return arbiter
[docs]class Concurrency: '''Actor :class:`.Concurrency`. Responsible for the actual spawning of actors according to a concurrency implementation. Instances are picklable and are shared between the :class:`.Actor` and its :class:`.ActorProxyMonitor`. This is an abstract class, derived classes must implement the ``start`` method. :param concurrency: string indicating the concurrency implementation. Valid choices are ``monitor``, ``process``, ``thread``. :param actor_class: :class:`.Actor` or one of its subclasses. :param timeout: timeout in seconds for the actor. :param kwargs: additional key-valued arguments to be passed to the actor constructor. ''' monitors = None managed_actors = None registered = None actor_class = Actor @classmethod def make(cls, kind, cfg, name, aid, **kw): self = cls() self.aid = aid self.age = next(_actor_counter) self.name = name or self.actor_class.__name__.lower() self.kind = kind self.cfg = cfg self.params = kw return self.create_actor() def identity(self, actor): return actor.aid @property def unique_name(self): return '%s.%s' % (self.name, self.aid) def __repr__(self): return self.unique_name __str__ = __repr__ def before_start(self, actor): pass def is_process(self): return False def is_arbiter(self): return False def is_monitor(self): return False def get_actor(self, actor, aid, check_monitor=True): if aid == actor.aid: return actor elif aid == 'monitor': return actor.monitor
[docs] def spawn(self, actor, aid=None, **params): '''Spawn a new actor from ``actor``. ''' aid = aid or create_aid() future = actor.send('arbiter', 'spawn', aid=aid, **params) return actor_proxy_future(aid, future)
[docs] def run_actor(self, actor): '''Start running the ``actor``. ''' set_actor(actor) if not actor.mailbox.address: ensure_future(actor.mailbox.start_serving(), loop=actor._loop) actor._loop.run_forever()
def add_monitor(self, actor, monitor_name, **params): raise RuntimeError('Cannot add monitors to %s' % actor)
[docs] def setup_event_loop(self, actor): '''Set up the event loop for ``actor``. ''' actor._logger = self.cfg.configured_logger('pulsar.%s' % actor.name) try: loop = asyncio.get_event_loop() except RuntimeError: if self.cfg and self.cfg.concurrency == 'thread': loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) else: raise if not hasattr(loop, 'logger'): loop.logger = actor._logger actor.mailbox = self.create_mailbox(actor, loop) return loop
[docs] def hand_shake(self, actor): '''Perform the hand shake for ``actor`` The hand shake occurs when the ``actor`` is in starting state. It performs the following actions: * set the ``actor`` as the actor of the current thread * bind two additional callbacks to the ``start`` event * fire the ``start`` event If the hand shake is successful, the actor will eventually results in a running state. ''' try: assert actor.state == ACTOR_STATES.STARTING if actor.cfg.debug: actor.logger.debug('starting handshake') actor.bind_event('start', self._switch_to_run) actor.bind_event('start', self.periodic_task) actor.bind_event('start', self._acknowledge_start) actor.fire_event('start') except Exception as exc: actor.stop(exc)
def create_actor(self): self.daemon = True self.params['monitor'] = get_proxy(self.params['monitor']) return ActorProxyMonitor(self)
[docs] def create_mailbox(self, actor, loop): '''Create the mailbox for ``actor``.''' client = MailboxClient(actor.monitor.address, actor, loop) loop.call_soon_threadsafe(self.hand_shake, actor) return client
[docs] def periodic_task(self, actor, **kw): '''Implement the :ref:`actor period task <actor-periodic-task>`. This is an internal method called periodically by the :attr:`.Actor._loop` to ping the actor monitor. If successful return a :class:`~asyncio.Future` called back with the acknowledgement from the monitor. ''' actor.next_periodic_task = None ack = None if actor.is_running(): if actor.cfg.debug: actor.logger.debug('notify monitor') # if an error occurs, shut down the actor ack = actor.send('monitor', 'notify', actor.info()) add_errback(ack, actor.stop) actor.fire_event('periodic_task') next = max(ACTOR_TIMEOUT_TOLE*actor.cfg.timeout, MIN_NOTIFY) else: next = 0 actor.next_periodic_task = actor._loop.call_later( min(next, MAX_NOTIFY), self.periodic_task, actor) return ack
[docs] def stop(self, actor, exc=None, exit_code=None): '''Gracefully stop the ``actor``. ''' if actor.state <= ACTOR_STATES.RUN: # The actor has not started the stopping process. Starts it now. actor.state = ACTOR_STATES.STOPPING actor.event('start').clear() if exc: if not exit_code: exit_code = getattr(exc, 'exit_code', 1) if exit_code == 1: exc_info = sys.exc_info() if exc_info[0] is not None: actor.logger.critical('Stopping', exc_info=exc_info) else: actor.logger.critical('Stopping: %s', exc) elif exit_code == 2: actor.logger.error(str(exc)) elif exit_code: actor.stream.writeln(str(exc)) else: if not exit_code: exit_code = getattr(actor._loop, 'exit_code', 0) # actor.exit_code = exit_code stopping = actor.fire_event('stopping') if not stopping.done() and actor._loop.is_running(): actor.logger.debug('asynchronous stopping') def cbk(_): return self._stop_actor(actor) return chain_future(stopping, callback=cbk, errback=cbk) else: if actor.logger: actor.logger.debug('stopping') return self._stop_actor(actor) elif actor.stopped(): return self._stop_actor(actor, True)
def _remove_signals(self, actor): pass def _stop_actor(self, actor, finished=False): # Stop the actor if finished is True # otherwise starts the stopping process if finished: self._remove_signals(actor) return True # if actor._loop.is_running(): actor.logger.debug('Closing mailbox') actor.mailbox.close() else: actor.state = ACTOR_STATES.CLOSE actor.logger.debug('Exiting actor with exit code 1') actor.exit_code = 1 actor.mailbox.abort() return actor.stop() def _switch_to_run(self, actor, exc=None): if exc is None and actor.state < ACTOR_STATES.RUN: actor.state = ACTOR_STATES.RUN elif exc: actor.stop(exc) def _acknowledge_start(self, actor, exc=None): if exc is None: actor.logger.info('started') else: actor.stop(exc) def _remove_actor(self, monitor, actor, log=True): raise RuntimeError('Cannot remove actor')
class MonitorMixin: def identity(self, actor): return actor.name def create_actor(self): self.managed_actors = {} actor = self.actor_class(self) actor.bind_event('on_info', self._info_monitor) return actor def start(self): '''does nothing,''' pass def get_actor(self, actor, aid, check_monitor=True): # Delegate get_actor to the arbiter if aid == actor.aid: return actor elif aid == 'monitor': return actor.monitor or actor elif aid in self.managed_actors: return self.managed_actors[aid] elif actor.monitor and check_monitor: return actor.monitor.get_actor(aid) def spawn(self, monitor, kind=None, **params): '''Spawn a new :class:`Actor` and return its :class:`.ActorProxyMonitor`. ''' proxy = _spawn_actor(kind, monitor, **params) # Add to the list of managed actors if this is a remote actor if isinstance(proxy, Actor): self._register(proxy) return proxy else: proxy.monitor = monitor self.managed_actors[proxy.aid] = proxy future = actor_proxy_future(proxy) proxy.start() return future def manage_actors(self, monitor, stop=False): '''Remove :class:`Actor` which are not alive from the :class:`PoolMixin.managed_actors` and return the number of actors still alive. :parameter stop: if ``True`` stops all alive actor. ''' alive = 0 if self.managed_actors: for aid, actor in list(self.managed_actors.items()): alive += self.manage_actor(monitor, actor, stop) return alive def manage_actor(self, monitor, actor, stop=False): '''If an actor failed to notify itself to the arbiter for more than the timeout, stop the actor. :param actor: the :class:`Actor` to manage. :param stop: if ``True``, stop the actor. :return: if the actor is alive 0 if it is not. ''' if not monitor.is_running(): stop = True if not actor.is_alive(): if not actor.should_be_alive() and not stop: return 1 actor.join() monitor._remove_actor(actor) return 0 timeout = None started_stopping = bool(actor.stopping_start) # if started_stopping is True, set stop to True stop = stop or started_stopping if not stop and actor.notified: gap = time() - actor.notified stop = timeout = gap > actor.cfg.timeout if stop: # we are stopping the actor dt = actor.should_terminate() if not actor.mailbox or dt: if not actor.mailbox: monitor.logger.warning('kill %s - no mailbox.', actor) else: monitor.logger.warning('kill %s - could not stop ' 'after %.2f seconds.', actor, dt) actor.kill() self._remove_actor(monitor, actor) return 0 elif not started_stopping: if timeout: monitor.logger.warning('Stopping %s. Timeout %.2f', actor, timeout) else: monitor.logger.info('Stopping %s.', actor) actor.stop() return 1 def spawn_actors(self, monitor): '''Spawn new actors if needed. ''' to_spawn = monitor.cfg.workers - len(self.managed_actors) if monitor.cfg.workers and to_spawn > 0: for _ in range(to_spawn): monitor.spawn() def stop_actors(self, monitor): """Maintain the number of workers by spawning or killing as required """ if monitor.cfg.workers: num_to_kill = len(self.managed_actors) - monitor.cfg.workers for i in range(num_to_kill, 0, -1): w, kage = 0, sys.maxsize for worker in self.managed_actors.values(): age = worker.impl.age if age < kage: w, kage = w, age self.manage_actor(monitor, w, True) def _close_actors(self, monitor): # # Close all managed actors at once and wait for completion sig = signal_from_exitcode(monitor.exit_code) for worker in self.managed_actors.values(): worker.stop(sig) waiter = create_future(monitor._loop) def _check(_, **kw): if not self.managed_actors: monitor.remove_callback('periodic_task', _check) waiter.set_result(None) monitor.bind_event('periodic_task', _check) return waiter def _remove_actor(self, monitor, actor, log=True): removed = self.managed_actors.pop(actor.aid, None) if log and removed: log = False monitor.logger.warning('Removed %s', actor) if monitor.monitor: monitor.monitor._remove_actor(actor, log) return removed def _info_monitor(self, actor, info=None): if actor.started(): info['actor'].update({'concurrency': actor.cfg.concurrency, 'workers': len(self.managed_actors)}) info['workers'] = [a.info for a in self.managed_actors.values() if a.info] return info def _register(self, arbiter): raise HaltServer('Critical error') ############################################################################ # CONCURRENCY IMPLEMENTATIONS
[docs]class MonitorConcurrency(MonitorMixin, Concurrency): ''':class:`.Concurrency` class for a :class:`.Monitor`. Monitors live in the **main thread** of the master process and therefore do not require to be spawned. ''' def is_monitor(self): return True def setup_event_loop(self, actor): actor._logger = self.cfg.configured_logger('pulsar.%s' % actor.name) actor.mailbox = ProxyMailbox(actor) loop = actor.mailbox._loop loop.call_soon(actor.start) return loop def run_actor(self, actor): actor._loop.call_soon(self.hand_shake, actor) raise MonitorStarted def create_mailbox(self, actor, loop): raise NotImplementedError
[docs] def periodic_task(self, monitor, **kw): '''Override the :meth:`.Concurrency.periodic_task` to implement the :class:`.Monitor` :ref:`periodic task <actor-periodic-task>`.''' interval = 0 monitor.next_periodic_task = None if monitor.started(): interval = MONITOR_TASK_PERIOD self.manage_actors(monitor) # if monitor.is_running(): self.spawn_actors(monitor) self.stop_actors(monitor) elif monitor.cfg.debug: monitor.logger.debug('still stopping') # monitor.fire_event('periodic_task') # if not monitor.closed(): monitor.next_periodic_task = monitor._loop.call_later( interval, self.periodic_task, monitor)
def _stop_actor(self, actor, finished=False): # remove all workers from this monitor if finished: return def _cleanup(_): if actor.cfg.debug: actor.logger.debug('monitor is now stopping') actor.state = ACTOR_STATES.CLOSE if actor.next_periodic_task: actor.next_periodic_task.cancel() self.stop(actor) return chain_future(self._close_actors(actor), callback=_cleanup, errback=_cleanup)
[docs]class ArbiterConcurrency(MonitorMixin, ProcessMixin, Concurrency): '''Concurrency implementation for the ``arbiter`` ''' pid_file = None def is_arbiter(self): return True def create_actor(self): cfg = self.cfg policy = EventLoopPolicy(cfg.event_loop, cfg.thread_workers, cfg.debug) asyncio.set_event_loop_policy(policy) if cfg.daemon: # pragma nocover # Daemonize the system if not cfg.pid_file: cfg.set('pid_file', 'pulsar.pid') system.daemonize(keep_fds=logger_fds()) self.aid = self.name actor = super().create_actor() self.monitors = OrderedDict() self.registered = {self.identity(actor): actor} actor.bind_event('start', self._start_arbiter) return actor
[docs] def get_actor(self, actor, aid, check_monitor=True): '''Given an actor unique id return the actor proxy.''' a = super().get_actor(actor, aid) if a is None: if aid in self.monitors: # Check in monitors aid return self.monitors[aid] elif aid in self.managed_actors: return self.managed_actors[aid] elif aid in self.registered: return self.registered[aid] else: # Finally check in workers in monitors for m in self.monitors.values(): a = m.get_actor(aid, check_monitor=False) if a is not None: return a else: return a
[docs] def add_monitor(self, actor, monitor_name, **params): '''Add a new ``monitor``. :param monitor_class: a :class:`.Monitor` class. :param monitor_name: a unique name for the monitor. :param kwargs: dictionary of key-valued parameters for the monitor. :return: the :class:`.Monitor` added. ''' if monitor_name in self.registered: raise KeyError('Monitor "%s" already available' % monitor_name) params.update(actor.actorparams()) params['name'] = monitor_name params['kind'] = 'monitor' return actor.spawn(**params)
[docs] def create_mailbox(self, actor, loop): '''Override :meth:`.Concurrency.create_mailbox` to create the mailbox server. ''' mailbox = TcpServer(MailboxProtocol, loop, ('127.0.0.1', 0), name='mailbox') # when the mailbox stop, close the event loop too mailbox.bind_event('stop', lambda _, **kw: loop.stop()) mailbox.bind_event( 'start', lambda _, **kw: loop.call_soon(self.hand_shake, actor)) return mailbox
[docs] def periodic_task(self, actor, **kw): '''Override the :meth:`.Concurrency.periodic_task` to implement the :class:`.Arbiter` :ref:`periodic task <actor-periodic-task>`.''' interval = 0 actor.next_periodic_task = None # if actor.started(): # managed actors job self.manage_actors(actor) for m in list(self.monitors.values()): if m.closed(): actor._remove_actor(m) interval = MONITOR_TASK_PERIOD if not actor.is_running() and actor.cfg.debug: actor.logger.debug('still stopping') # actor.fire_event('periodic_task') if not actor.closed(): actor.next_periodic_task = actor._loop.call_later( interval, self.periodic_task, actor) if actor.cfg.reload and autoreload.check_changes(): # reload changes actor.stop(exit_code=autoreload.EXIT_CODE)
def _stop_actor(self, actor, finished=False): # remove all actors and monitors if finished: self._stop_arbiter(actor) elif actor._loop.is_running(): self._exit_arbiter(actor) else: actor.logger.debug('Restarts event loop to stop actors') actor._loop.call_soon(self._exit_arbiter, actor) actor._run(False) def _exit_arbiter(self, actor, done=False): if done: actor.logger.debug('Closing mailbox server') actor.state = ACTOR_STATES.CLOSE actor._loop.create_task(actor.mailbox.close()) else: monitors = len(self.monitors) managed = len(self.managed_actors) if monitors or managed: actor.logger.debug('Closing %d monitors and %d actors', monitors, managed) ensure_future(self._close_all(actor)) else: self._exit_arbiter(actor, True) async def _close_all(self, actor): # Close monitors and actors try: waiters = [] for m in tuple(self.monitors.values()): stop = m.stop(exit_code=actor.exit_code) if stop: waiters.append(stop) waiters.append(self._close_actors(actor)) await asyncio.gather(*waiters) except Exception: actor.logger.exception('Exception while closing arbiter') self._exit_arbiter(actor, True) def _remove_actor(self, arbiter, actor, log=True): a = super()._remove_actor(arbiter, actor, False) b = self.registered.pop(self.identity(actor), None) c = self.monitors.pop(self.identity(actor), None) removed = a or b or c if removed and log: arbiter.logger.warning('Removed %s', actor) return removed def _stop_arbiter(self, actor): # pragma nocover actor.stop_coverage() self._remove_signals(actor) p = self.pid_file if p is not None: actor.logger.debug('Removing %s' % p.fname) p.unlink() self.pid_file = None if self.managed_actors: actor.state = ACTOR_STATES.TERMINATE actor.exit_code = actor.exit_code or 0 if actor.exit_code == autoreload.EXIT_CODE: actor.logger.info("Code changed, reloading server") actor._exit = True else: # actor.logger.info("Bye (exit code = %s)", exit_code) actor.stream.writeln("\nBye (exit code = %s)" % actor.exit_code) try: actor.cfg.when_exit(actor) except Exception: pass if actor.exit_code and actor._exit: sys.exit(actor.exit_code) def _start_arbiter(self, actor, exc=None): if not os.environ.get('SERVER_SOFTWARE'): os.environ["SERVER_SOFTWARE"] = pulsar.SERVER_SOFTWARE pid_file = actor.cfg.pid_file if pid_file is not None: actor.logger.info('Create pid file %s', pid_file) try: p = Pidfile(pid_file) p.create(actor.pid) except RuntimeError as e: raise HaltServer('ERROR. %s' % str(e), exit_code=2) self.pid_file = p def _info_monitor(self, actor, info=None): data = info monitors = {} for m in self.monitors.values(): info = m.info() if info: actor = info['actor'] monitors[actor['name']] = info server = data.pop('actor') server.update({'version': pulsar.__version__, 'python_version': sys.version, 'name': pulsar.SERVER_NAME, 'number_of_monitors': len(self.monitors), 'number_of_actors': len(self.managed_actors)}) server.pop('is_process', None) server.pop('ppid', None) server.pop('actor_id', None) server.pop('age', None) data['server'] = server data['workers'] = [a.info for a in self.managed_actors.values()] data['monitors'] = monitors return data def _register(self, actor): aid = self.identity(actor) self.registered[aid] = actor self.monitors[aid] = actor
def run_actor(self): self._actor = actor = self.actor_class(self) try: actor.start() finally: try: actor.cfg.when_exit(actor) except Exception: # pragma nocover pass log = actor.logger or logger() actor.stop_coverage() log.info('Bye from "%s"', actor) class ActorThread(Concurrency, Thread): '''Actor on a thread of the master process ''' _actor = None def before_start(self, actor): self.set_loop(actor._loop) def run(self): run_actor(self) class ActorProcess(ProcessMixin, Concurrency): '''Actor on a Operative system process. ''' process = None def start(self): loop = asyncio.get_event_loop() return ensure_future(self._start(loop), loop=loop) async def _start(self, loop): import inspect data = { 'path': sys.path.copy(), 'impl': bytes(ForkingPickler.dumps(self)), 'main': inspect.getfile(sys.modules['__main__']), 'authkey': bytes(current_process().authkey) } self.process = await asyncio.create_subprocess_exec( sys.executable, SUBPROCESS, stdin=asyncio.subprocess.PIPE, loop=loop ) await self.process.communicate(pickle.dumps(data)) def is_alive(self): if self.process: code = self.process.returncode return code is None return False def join(self, timeout=None): pass def kill(self, sig): self.process.send_signal(sig) class ActorCoroutine(Concurrency): def start(self): run_actor(self) def kill(self, sig): self._actor.exit_code = int(sig) def is_alive(self): actor = self._actor return actor.exit_code is None and actor._loop.is_running() def join(self, timeout=None): pass def run_actor(self, actor): raise MonitorStarted concurrency_models = { 'arbiter': ArbiterConcurrency, 'monitor': MonitorConcurrency, 'coroutine': ActorCoroutine, 'thread': ActorThread, 'process': ActorProcess } def _spawn_actor(kind, monitor, cfg=None, name=None, aid=None, **kw): # Internal function which spawns a new Actor and return its # ActorProxyMonitor. # *cls* is the Actor class # *monitor* can be either the arbiter or a monitor if monitor: params = monitor.actorparams() name = params.pop('name', name) aid = params.pop('aid', aid) cfg = params.pop('cfg', cfg) # get config if not available if cfg is None: if monitor: cfg = monitor.cfg.copy() else: cfg = Config() if not aid: aid = create_aid() if not monitor: # monitor not available, this is the arbiter assert kind == 'arbiter' name = kind params = {} if not cfg.exc_id: cfg.set('exc_id', aid) # for key, value in kw.items(): if key in cfg.settings: cfg.set(key, value) else: params[key] = value # if monitor: kind = kind or cfg.concurrency if not kind: raise TypeError('Cannot spawn') model = concurrency_models.get(kind) if model: return model.make(kind, cfg, name, aid, monitor=monitor, **params) else: raise ValueError('Concurrency %s not supported in pulsar' % kind) _actor_counter = itertools.count(1)