from collections import Mapping
from inspect import isgeneratorfunction
from functools import wraps, partial
from asyncio import Future, CancelledError, TimeoutError, sleep, gather
from .consts import MAX_ASYNC_WHILE
from .access import (get_event_loop, LOGGER, isfuture, isawaitable,
ensure_future, create_future)
__all__ = ['maybe_async',
'run_in_loop',
'add_errback',
'task_callback',
'multi_async',
'as_coroutine',
'as_gather',
'task',
'async_while',
'chain_future',
'future_result_exc',
'AsyncObject']
def return_false():
return False
[docs]def chain_future(future, callback=None, errback=None, next=None):
'''Chain a :class:`~asyncio.Future` to an existing ``future``.
This function `chain` the ``next`` future to an existing ``future``.
When the input ``future`` receive a result the optional
``callback`` is executed and its result set as the results of ``next``.
If an exception occurs the optional ``errback`` is executed.
:param future: the original :class:`~asyncio.Future` (can be a coroutine)
:param callback: optional callback to execute on the result of ``future``
:param errback: optional callback to execute on the exception of ``future``
:param next: optional :class:`~asyncio.Future` to chain.
If not provided a new future is created
:return: the future ``next``
'''
loop = next._loop if next else None
future = ensure_future(future, loop=loop)
if next is None:
next = create_future(future._loop)
def _callback(fut):
try:
try:
result = future.result()
except Exception as exc:
if errback:
result = errback(exc)
exc = None
else:
raise
else:
if callback:
result = callback(result)
except Exception as exc:
next.set_exception(exc)
else:
if isinstance(result, Future):
chain_future(result, next=next)
else:
next.set_result(result)
future.add_done_callback(_callback)
return next
def as_exception(future):
if future._exception:
return future.exception()
elif future.cancelled():
return CancelledError()
def add_errback(future, callback, loop=None):
'''Add a ``callback`` to a ``future`` executed only if an exception
or cancellation has occurred.'''
def _error_back(fut):
if fut._exception:
callback(fut.exception())
elif fut.cancelled():
callback(CancelledError())
future = ensure_future(future, loop=None)
future.add_done_callback(_error_back)
return future
def future_result_exc(future):
'''Return a two elements tuple containing the future result and exception.
The :class:`.Future` must be ``done``
'''
if future.cancelled():
return None, CancelledError()
elif future._exception:
return None, future.exception()
else:
return future.result(), None
def task_callback(callback):
@wraps(callback)
def _task_callback(fut):
return ensure_future(callback(fut.result()), fut._loop)
return _task_callback
[docs]def maybe_async(value, loop=None):
'''Handle a possible asynchronous ``value``.
Return an :ref:`asynchronous instance <tutorials-coroutine>`
only if ``value`` is a generator, a :class:`.Future`.
:parameter value: the value to convert to an asynchronous instance
if it needs to.
:parameter loop: optional :class:`.EventLoop`.
:return: a :class:`.Future` or a synchronous ``value``.
'''
try:
return ensure_future(value, loop=loop)
except TypeError:
return value
async def as_coroutine(value):
if isawaitable(value):
value = await value
return value
def as_gather(*args):
"""Same as :func:`~.asyncio.gather` but allows sync values
"""
return gather(*[as_coroutine(arg) for arg in args])
[docs]def task(function):
'''Thread-safe decorator to run a ``function`` in an event loop.
:param function: a callable which can return coroutines,
:class:`.asyncio.Future` or synchronous data. Can be a method of
an :ref:`async object <async-object>`, in which case the loop
is given by the object ``_loop`` attribute.
:return: a :class:`~asyncio.Future`
'''
if isgeneratorfunction(function):
wrapper = function
else:
async def wrapper(*args, **kw):
res = function(*args, **kw)
if isawaitable(res):
res = await res
return res
@wraps(function)
def _(*args, **kwargs):
loop = getattr(args[0], '_loop', None) if args else None
coro = wrapper(*args, **kwargs)
return ensure_future(coro, loop=loop)
return _
[docs]def run_in_loop(loop, callable, *args, **kwargs):
'''Run ``callable`` in the event ``loop`` thread, thread safe.
:param loop: The event loop where ``callable`` is run
:return: a :class:`~asyncio.Future`
'''
waiter = create_future(loop)
def _():
try:
result = callable(*args, **kwargs)
except Exception as exc:
waiter.set_exception(exc)
else:
try:
future = ensure_future(result, loop=loop)
except TypeError:
waiter.set_result(result)
else:
chain_future(future, next=waiter)
loop.call_soon_threadsafe(_)
return waiter
[docs]async def async_while(timeout, while_clause, *args):
'''The asynchronous equivalent of ``while while_clause(*args):``
Use this function within a :ref:`coroutine <coroutine>` when you need
to wait ``while_clause`` to be satisfied.
:parameter timeout: a timeout in seconds after which this function stop.
:parameter while_clause: while clause callable.
:parameter args: optional arguments to pass to the ``while_clause``
callable.
:return: A :class:`.Future`.
'''
loop = get_event_loop()
start = loop.time()
di = 0.1
interval = 0
result = while_clause(*args)
while result:
interval = min(interval+di, MAX_ASYNC_WHILE)
try:
await sleep(interval, loop=loop)
except TimeoutError:
pass
if timeout and loop.time() - start >= timeout:
break
result = while_clause(*args)
return result
# ############################################################## Bench
[docs]class Bench:
'''Execute a given number of asynchronous requests and wait for results.
'''
start = None
'''The :meth:`~asyncio.BaseEventLoop.time` when the execution starts'''
finish = None
'''The :meth:`~asyncio.BaseEventLoop.time` when the execution finishes'''
result = ()
'''Tuple of results'''
def __init__(self, times, loop=None):
self._loop = loop or get_event_loop()
self.times = times
@property
def taken(self):
'''The total time taken for execution
'''
if self.finish:
return self.finish - self.start
def __call__(self, func, *args, **kwargs):
self.start = self._loop.time()
data = (func(*args, **kwargs) for t in range(self.times))
self.result = multi_async(data, loop=self._loop)
return chain_future(self.result, callback=self._done)
def _done(self, result):
self.finish = self._loop.time()
self.result = tuple(result)
return self
# ############################################################## AsyncObject
[docs]class AsyncObject:
'''Interface for :ref:`async objects <async-object>`
.. attribute:: _loop
The :ref:`event loop <asyncio-event-loop>` associated with this object
.. attribute:: _logger
Optional logger instance, used by the :attr:`logger` attribute
'''
_logger = None
_loop = None
@property
def logger(self):
'''The logger for this object.
It is either the :attr:`_logger` or the logger of the :attr:`_loop`
'''
return self._logger or getattr(self._loop, 'logger', LOGGER)
@property
def debug(self):
'''True when in debug mode
'''
return getattr(self._loop, 'get_debug', return_false)()
[docs] def timeit(self, method, times, *args, **kwargs):
'''Useful utility for benchmarking an asynchronous ``method``.
:param method: the name of the ``method`` to execute
:param times: number of times to execute the ``method``
:param args: positional arguments to pass to the ``method``
:param kwargs: key-valued arguments to pass to the ``method``
:return: a :class:`~asyncio.Future` which results in a :class:`Bench`
object if successful
The usage is simple::
>>> b = self.timeit('asyncmethod', 100)
'''
bench = Bench(times, loop=self._loop)
return bench(getattr(self, method), *args, **kwargs)
# ############################################################## MultiFuture
class MultiFuture(Future):
'''Handle several futures at once. Thread safe.
'''
def __init__(self, data=None, loop=None, type=None, raise_on_error=True):
super().__init__(loop=loop)
self._futures = {}
self._failures = []
self._raise_on_error = raise_on_error
if data is not None:
type = type or data.__class__
if issubclass(type, Mapping):
data = data.items()
else:
type = list
data = enumerate(data)
else:
type = list
data = ()
self._stream = type()
for key, value in data:
value = self._get_set_item(key, maybe_async(value, loop))
if isfuture(value):
self._futures[key] = value
value.add_done_callback(partial(self._future_done, key))
elif self.done():
break
self._check()
@property
def failures(self):
return self._failures
# INTERNALS
def _check(self):
if not self._futures and not self.done():
self.set_result(self._stream)
def _future_done(self, key, future, inthread=False):
# called by future when future is done
# thread safe
if inthread or future._loop is self._loop:
self._futures.pop(key, None)
if not self.done():
self._get_set_item(key, future)
self._check()
else:
self._loop.call_soon_threadsafe(
self._future_done, key, future, True)
def _get_set_item(self, key, value):
if isfuture(value):
if value.done():
exc = as_exception(value)
if exc:
if self._raise_on_error:
self._futures.clear()
self.set_exception(exc)
return
else:
self._failures.append(exc)
value = exc
else:
value = value._result
stream = self._stream
if isinstance(stream, list) and key == len(stream):
stream.append(value)
else:
stream[key] = value
return value
# Backward compatibility
multi_async = MultiFuture