Source code for pulsar.async.events

from collections import deque
from functools import partial

from asyncio import Future, InvalidStateError, ensure_future

from .access import _EVENT_LOOP_CLASSES
from .futures import future_result_exc, AsyncObject


__all__ = ['EventHandler', 'Event', 'OneTime', 'AbortEvent']


class AbortEvent(Exception):
    '''Use this exception to abort events
    '''
    pass


[docs]class AbstractEvent(AsyncObject): """Abstract event handler """ _handlers = None _fired = 0 @property def handlers(self): if self._handlers is None: self._handlers = [] return self._handlers
[docs] def bind(self, callback): '''Bind a ``callback`` to this event. ''' if callback not in self.handlers: self.handlers.append(callback)
[docs] def remove_callback(self, callback): '''Remove a callback from the list ''' handlers = self.handlers filtered_callbacks = [f for f in handlers if f != callback] removed_count = len(handlers) - len(filtered_callbacks) if removed_count: self.clear() self._handlers.extend(filtered_callbacks) return removed_count
[docs] def fired(self): '''The number of times this event has fired''' return self._fired
[docs] def fire(self, arg, **kwargs): '''Fire this event.''' raise NotImplementedError
def clear(self): if self._handlers: self._handlers[:] = []
[docs]class Event(AbstractEvent): '''The default implementation of :class:`AbstractEvent`. ''' def __init__(self, loop=None, name=None): self._loop = loop self._name = name or self.__class__.__name__.lower() def __repr__(self): return '%s: %s' % (self._name, self._handlers) __str__ = __repr__ def fire(self, arg, **kwargs): self._fired += self._fired + 1 if self._handlers: for hnd in self._handlers: try: hnd(arg, **kwargs) except Exception: self.logger.exception('Exception while firing %s', self)
[docs]class OneTime(Future, AbstractEvent): '''An :class:`AbstractEvent` which can be fired once only. This event handler is a subclass of :class:`.Future`. Implemented mainly for the one time events of the :class:`EventHandler`. ''' def __init__(self, *, loop=None, name=None): super().__init__(loop=loop) self._processing = False self._name = name or self.__class__.__name__.lower() def __repr__(self): return '%s: %s' % (self._name, super().__repr__()) __str__ = __repr__ @property def handlers(self): if self._handlers is None: self._handlers = deque() return self._handlers
[docs] def bind(self, callback): '''Bind a ``callback`` to this event. ''' super().bind(callback) if self._fired and not self._processing: arg, exc = future_result_exc(self) self._process(arg, exc, {})
[docs] def fire(self, arg, exc=None, **kwargs): '''The callback handlers registered via the :meth:~AbstractEvent.bind` method are executed first. :param arg: the argument :param exc: optional exception ''' if self._fired: raise InvalidStateError('already fired') self._fired = 1 self._process(arg, exc, kwargs) return self
def clear(self): if self._handlers: self._handlers.clear() def _process(self, arg, exc, kwargs, future=None): while self._handlers: self._processing = True hnd = self._handlers.popleft() try: result = hnd(arg, exc=exc, **kwargs) except AbortEvent as e: exc = e except Exception: self.logger.exception('Exception while firing onetime event') else: # If result is a future, resume process once done try: result = ensure_future(result, loop=self._loop) except TypeError: pass else: result.add_done_callback( partial(self._process, arg, exc, kwargs)) return self._processing = False if not self.done(): if exc: self.set_exception(exc) else: self.set_result(arg)
[docs]class EventHandler(AsyncObject): '''A Mixin for handling events on :ref:`async objects <async-object>`. It handles :class:`OneTime` events and :class:`Event` that occur several times. ''' ONE_TIME_EVENTS = () '''Event names which occur once only.''' MANY_TIMES_EVENTS = () '''Event names which occur several times.''' def __init__(self, loop=None, one_time_events=None, many_times_events=None): assert isinstance(loop, _EVENT_LOOP_CLASSES) self._loop = loop one = self.ONE_TIME_EVENTS if one_time_events: one = set(one) one.update(one_time_events) events = dict(((name, OneTime(loop=loop, name=name)) for name in one)) many = self.MANY_TIMES_EVENTS if many_times_events: many = set(many) many.update(many_times_events) events.update(((name, Event(loop=loop, name=name)) for name in many)) self._events = events @property def events(self): '''The dictionary of all events. ''' return self._events
[docs] def event(self, name): '''Returns the :class:`Event` at ``name``. If no event is registered for ``name`` returns nothing. ''' return self._events.get(name)
def fired_event(self, name): event = self._events.get(name) return event._fired if event else 0
[docs] def bind_event(self, name, callback): '''Register a ``callback`` with ``event``. The callback must be a callable accepting one positional parameter and at least the ``exc`` optional parameter:: def callback(arg, ext=None): ... o.bind_event('start', callback) the instance firing the event or the first positional argument passed to the :meth:`fire_event` method. :param name: the event name. If the event is not available a warning message is logged. :param callback: a callable receiving one positional parameter. It can also be a list/tuple of callables. :return: nothing. ''' if name not in self._events: self._events[name] = Event() event = self._events[name] event.bind(callback)
[docs] def remove_callback(self, name, callback): '''Remove a ``callback`` from event ``name`` ''' if name in self._events: event = self._events[name] return event.remove_callback(callback)
[docs] def bind_events(self, **events): '''Register all known events found in ``events`` key-valued parameters. ''' for name in self._events: if name in events: self.bind_event(name, events[name])
[docs] def fire_event(self, name, *args, **kwargs): """Dispatches ``arg`` or ``self`` to event ``name`` listeners. * If event at ``name`` is a one-time event, it makes sure that it was not fired before. :param args: optional argument passed as positional parameter to the event handler. :param kwargs: optional key-valued parameters to pass to the event handler. Can only be used for :ref:`many times events <many-times-event>`. :return: the :class:`Event` fired """ if not args: arg = self elif len(args) == 1: arg = args[0] else: raise TypeError('fire_event expected at most 1 argument got %s' % len(args)) event = self.event(name) if event: try: event.fire(arg, **kwargs) except InvalidStateError: self.logger.error('Event %s already fired' % name) return event else: self.logger.warning('Unknown event "%s" for %s', name, self)
[docs] def copy_many_times_events(self, other): '''Copy :ref:`many times events <many-times-event>` from ``other``. All many times events of ``other`` are copied to this handler provided the events handlers already exist. ''' if isinstance(other, EventHandler): events = self._events for name, event in other._events.items(): if isinstance(event, Event) and event._handlers: ev = events.get(name) # If the event is available add it if ev: for callback in event._handlers: ev.bind(callback)