Source code for pulsar.apps.greenio.pool
import sys
import threading
import logging
from collections import deque
from pulsar import create_future, ensure_future, AsyncObject, get_event_loop
from .utils import wait, GreenletWorker, isawaitable, getcurrent
_DEFAULT_WORKERS = 100
_MAX_WORKERS = 1000
class _DONE:
pass
[docs]class GreenPool(AsyncObject):
"""A pool of running greenlets.
This pool maintains a group of greenlets to perform asynchronous
tasks via the :meth:`submit` method.
"""
worker_name = 'exec'
def __init__(self, max_workers=None, loop=None):
self._loop = loop or get_event_loop()
self._max_workers = min(max_workers or _DEFAULT_WORKERS, _MAX_WORKERS)
self._greenlets = set()
self._available = set()
self._queue = deque()
self._shutdown = False
self._waiter = None
self._logger = logging.getLogger('pulsar.greenpool')
self._shutdown_lock = threading.Lock()
self.wait = wait
@property
def max_workers(self):
return self._max_workers
@max_workers.setter
def max_workers(self, value):
value = int(value)
assert value > 0
self._max_workers = value
@property
def in_green_worker(self):
"""True if the current greenlet is a green pool worker
"""
return isinstance(getcurrent(), GreenletWorker)
@property
def closed(self):
"""True if this pool is closed and no task can queued
"""
return self._shutdown
[docs] def submit(self, func, *args, **kwargs):
"""Equivalent to ``func(*args, **kwargs)``.
This method create a new task for function ``func`` and adds it to
the queue.
Return a :class:`~asyncio.Future` called back once the task
has finished.
"""
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError(
'cannot schedule new futures after shutdown')
if self.in_green_worker:
return wait(func(*args, **kwargs))
else:
future = create_future(self._loop)
self._put((future, func, args, kwargs))
return future
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._put(None)
if wait:
self._waiter = create_future(self._loop)
return self._waiter
def getcurrent(self):
return getcurrent()
# INTERNALS
def _adjust_greenlet_count(self):
if (not self._shutdown and not self._available and
len(self._greenlets) < self._max_workers):
green = GreenletWorker(self._green_run)
self._greenlets.add(green)
self.logger.debug('Num greenlets: %d', len(self._greenlets))
green.switch()
return self._available
def _put(self, task):
# Run in the main greenlet of the evnet-loop thread
self._queue.appendleft(task)
self._check_queue()
def _check_queue(self):
# Run in the main greenlet of the event-loop thread
if not self._adjust_greenlet_count():
self.logger.debug('No greenlet available')
return self._loop.call_soon(self._check_queue)
try:
task = self._queue.pop()
except IndexError:
return
ensure_future(self._green_task(self._available.pop(), task),
loop=self._loop)
async def _green_task(self, green, task):
# Coroutine executing the in main greenlet
# This coroutine is executed for every task put into the queue
while task is not _DONE:
# switch to the greenlet to start the task
task = green.switch(task)
# if an asynchronous result is returned, await
while isawaitable(task):
try:
task = await task
except Exception as exc:
# This call can return an asynchronous component
exc_info = sys.exc_info()
if not exc_info[0]:
exc_info = (exc, None, None)
task = green.throw(*exc_info)
def _green_run(self):
# The run method of a worker greenlet
task = True
while task:
green = getcurrent()
parent = green.parent
assert parent
# add greenlet in the available greenlets
self._available.add(green)
task = parent.switch(_DONE) # switch back to the main execution
if task:
future, func, args, kwargs = task
try:
try:
result = wait(func(*args, **kwargs), True)
except StopIteration as exc: # See PEP 479
raise RuntimeError('Unhandled StopIteration') from exc
except Exception as exc:
future.set_exception(exc)
else:
future.set_result(result)
else: # Greenlet cleanup
self._greenlets.remove(green)
if self._greenlets:
self._put(None)
elif self._waiter:
self._waiter.set_result(None)
self._waiter = None
parent.switch(_DONE)