Source code for pulsar.async.access

import os
import threading
import logging
import asyncio
from concurrent.futures import ThreadPoolExecutor

from collections import OrderedDict
from threading import current_thread

from asyncio import Future

from pulsar.utils.config import Global
from pulsar.utils.system import current_process

from asyncio import ensure_future
from inspect import isawaitable


__all__ = ['get_event_loop',
           'new_event_loop',
           'get_actor',
           'cfg',
           'cfg_value',
           'isfuture',
           'create_future',
           'is_mainthread',
           'process_data',
           'thread_data',
           'logger',
           'NOTHING',
           'EVENT_LOOPS',
           'Future',
           'reraise',
           'isawaitable',
           'ensure_future',
           'CANCELLED_ERRORS']


_EVENT_LOOP_CLASSES = (asyncio.AbstractEventLoop,)
CANCELLED_ERRORS = (asyncio.CancelledError,)


def reraise(tp, value, tb=None):
    if value.__traceback__ is not tb:
        raise value.with_traceback(tb)
    raise value


def isfuture(x):
    return isinstance(x, Future)


def create_future(loop=None):
    loop = loop or get_event_loop()
    try:
        return loop.create_future()
    except AttributeError:
        return asyncio.Future(loop=loop)


LOGGER = logging.getLogger('pulsar')
NOTHING = object()
EVENT_LOOPS = OrderedDict()

DefaultLoopClass = asyncio.get_event_loop_policy()._loop_factory


def make_loop_factory(selector):

    def loop_factory():
        return DefaultLoopClass(selector())

    return loop_factory


for selector in ('Epoll', 'Kqueue', 'Poll', 'Select'):
    name = '%sSelector' % selector
    selector_class = getattr(asyncio.selectors, name, None)
    if selector_class:
        EVENT_LOOPS[selector.lower()] = make_loop_factory(selector_class)


try:    # add uvloop if available
    import uvloop
    EVENT_LOOPS['uv'] = uvloop.Loop
except Exception:     # pragma    nocover
    pass


if os.environ.get('BUILDING-PULSAR-DOCS') == 'yes':     # pragma nocover
    default_loop = (
        'uvloop if available, epoll on linux, '
        'kqueue on mac, select on windows'
    )
elif EVENT_LOOPS:
    default_loop = tuple(EVENT_LOOPS)[0]
else:
    default_loop = None


if default_loop:
    class EventLoopSetting(Global):
        name = "event_loop"
        flags = ["--io"]
        choices = tuple(EVENT_LOOPS)
        default = default_loop
        desc = """\
            Specify the event loop used for I/O event polling.

            The default value is the best possible for the system running the
            application.
            """

get_event_loop = asyncio.get_event_loop
new_event_loop = asyncio.new_event_loop


def is_mainthread(thread=None):
    '''Check if thread is the main thread.

    If ``thread`` is not supplied check the current thread
    '''
    thread = thread if thread is not None else current_thread()
    return isinstance(thread, threading._MainThread)


def logger(loop=None, logger=None):
    return getattr(loop or get_event_loop(), 'logger', LOGGER)


def process_data(name=None):
    '''Fetch the current process local data dictionary.

    If ``name`` is not ``None`` it returns the value at``name``,
    otherwise it return the process data dictionary
    '''
    ct = current_process()
    if not hasattr(ct, '_pulsar_local'):
        ct._pulsar_local = {}
    loc = ct._pulsar_local
    return loc.get(name) if name else loc


def thread_data(name, value=NOTHING, ct=None):
    '''Set or retrieve an attribute ``name`` from thread ``ct``.

    If ``ct`` is not given used the current thread. If ``value``
    is None, it will get the value otherwise it will set the value.
    '''
    ct = ct or current_thread()
    if is_mainthread(ct):
        loc = process_data()
    elif not hasattr(ct, '_pulsar_local'):
        ct._pulsar_local = loc = {}
    else:
        loc = ct._pulsar_local
    if value is not NOTHING:
        if name in loc:
            if loc[name] is not value:
                raise RuntimeError(
                    '%s is already available on this thread' % name)
        else:
            loc[name] = value
    return loc.get(name)


[docs]def get_actor(): return thread_data('actor')
def set_actor(actor): return thread_data('actor', actor) def cfg(): actor = get_actor() if actor: return actor.cfg def cfg_value(setting, value=None): if value is None: actor = get_actor() if actor: return actor.cfg.get(setting) return value class EventLoopPolicy(asyncio.DefaultEventLoopPolicy): def __init__(self, name, workers, debug): super().__init__() self.name = name self.workers = workers self.debug = debug @property def _local(self): l = getattr(current_process(), '_event_loop_policy', None) if l is None: self._local = l = self._Local() return l @_local.setter def _local(self, v): current_process()._event_loop_policy = v def _loop_factory(self): loop = EVENT_LOOPS[self.name]() loop.set_default_executor(ThreadPoolExecutor(self.workers)) if self.debug: loop.set_debug(True) return loop