Source code for pulsar.async.mixins

from .access import create_future


[docs]class FlowControl: """A protocol mixin for flow control logic. This implements the protocol methods :meth:`pause_writing`, :meth:`resume_writing`. """ _paused = False _write_waiter = None def __init__(self, low_limit=None, high_limit=None, **kw): self._low_limit = low_limit self._high_limit = high_limit self.bind_event('connection_made', self._set_flow_limits) self.bind_event('connection_lost', self._wakeup_waiter) self.bind_event('after_write', self._make_write_waiter)
[docs] def pause_writing(self): '''Called by the transport when the buffer goes over the high-water mark Successive calls to this method will fails unless :meth:`resume_writing` is called first. ''' assert not self._paused self._paused = True self._transport.pause_reading()
[docs] def resume_writing(self, exc=None): '''Resume writing. Successive calls to this method will fails unless :meth:`pause_writing` is called first. ''' assert self._paused self._paused = False waiter = self._write_waiter if waiter is not None: self._write_waiter = None if not waiter.done(): if exc is None: waiter.set_result(None) else: waiter.set_exception(exc) self._transport.resume_reading()
# INTERNAL CALLBACKS def _set_flow_limits(self, _, exc=None): if not exc: self._transport.set_write_buffer_limits(self._low_limit, self._high_limit) def _wakeup_waiter(self, _, exc=None): # Wake up the writer if currently paused. if not self._paused: return self.resume_writing(exc=exc) def _make_write_waiter(self, _, exc=None): # callback for the after_write event if self._paused: waiter = self._write_waiter assert waiter is None or waiter.cancelled() waiter = create_future(self._loop) self.logger.debug('Waiting for write buffer to drain') self._write_waiter = waiter
[docs]class Timeout: '''Adds a timeout for idle connections to protocols ''' _timeout = None _timeout_handler = None @property def timeout(self): return self._timeout @timeout.setter def timeout(self, timeout): '''Set a new :attr:`timeout` for this protocol ''' if self._timeout is None: self.bind_event('connection_made', self._add_timeout) self.bind_event('connection_lost', self._cancel_timeout) self.bind_event('before_write', self._cancel_timeout) self.bind_event('after_write', self._add_timeout) self.bind_event('data_received', self._cancel_timeout) self.bind_event('data_processed', self._add_timeout) self._timeout = timeout or 0 self._add_timeout(None) # INTERNALS def _timed_out(self): self.close() self.logger.debug('Closed idle %s.', self) def _add_timeout(self, _, exc=None, **kw): if not self.closed: self._cancel_timeout(_, exc=exc) if self._timeout and not exc: self._timeout_handler = self._loop.call_later(self._timeout, self._timed_out) def _cancel_timeout(self, _, exc=None, **kw): if self._timeout_handler: self._timeout_handler.cancel() self._timeout_handler = None