Bases: TaskManager
, Workers
The Task Consumer is a Task Manager responsible for consuming tasks
from a task queue
Source code in fluid/scheduler/consumer.py
| def __init__(self, **config: Any) -> None:
super().__init__(**config)
Workers.__init__(self)
self._async_dispatcher_worker = AsyncConsumer(AsyncTaskDispatcher())
self._concurrent_tasks: dict[str, dict[str, TaskRun]] = defaultdict(dict)
self._task_to_queue: deque[str | Task] = deque()
self._priority_task_run_queue: deque[TaskRun] = deque()
self._queue_tasks_worker = WorkerFunction(
self._queue_task, name="queue-task-worker"
)
self.add_workers(self._queue_tasks_worker)
self.add_workers(self._async_dispatcher_worker)
for i in range(self.config.max_concurrent_tasks):
worker_name = f"task-worker-{i+1}"
self.add_workers(
WorkerFunction(
partial(self._consume_tasks, worker_name), name=worker_name
)
)
|
config
instance-attribute
config = TaskManagerConfig(**kwargs)
dispatcher
instance-attribute
dispatcher = TaskDispatcher()
broker
instance-attribute
num_concurrent_tasks
property
The number of concurrent_tasks running in the consumer
status
async
Source code in fluid/utils/worker.py
| async def status(self) -> dict:
return await self._workers.status()
|
gracefully_stop
Source code in fluid/utils/worker.py
| def gracefully_stop(self) -> None:
self._workers.gracefully_stop()
|
is_running
Source code in fluid/utils/worker.py
| def is_running(self) -> bool:
return self._running
|
is_stopping
Source code in fluid/utils/worker.py
| def is_stopping(self) -> bool:
return self._workers.is_stopping()
|
run
async
run the workers
Source code in fluid/utils/worker.py
| async def run(self) -> None:
"""run the workers"""
with self.start_running():
async with self.safe_run():
workers, _ = self._workers.workers_tasks()
self._workers.workers = tuple(workers)
self._workers.tasks = tuple(
self.create_task(worker) for worker in workers
)
await asyncio.gather(*self._workers.tasks)
await self.shutdown()
|
start_running
Source code in fluid/utils/worker.py
| @contextmanager
def start_running(self) -> Generator:
if self._running:
raise RuntimeError("Worker is already running")
self._running = True
try:
logger.info("%s started running", self.worker_name)
yield
finally:
self._running = False
logger.warning("%s stopped running", self.worker_name)
|
add_workers
add workers to the workers
Source code in fluid/utils/worker.py
| def add_workers(self, *workers: Worker) -> None:
"""add workers to the workers"""
workers_, _ = self._workers.workers_tasks()
for worker in workers:
if worker not in workers_:
workers_.append(worker)
|
wait_for_exit
async
Source code in fluid/utils/worker.py
| async def wait_for_exit(self) -> None:
if self._workers_task is not None:
await self._workers_task
|
create_task
Source code in fluid/utils/worker.py
| def create_task(self, worker: Worker) -> asyncio.Task:
return asyncio.create_task(
self._run_worker(worker), name=f"{self.worker_name}-{worker.worker_name}"
)
|
on_shutdown
async
Source code in fluid/scheduler/consumer.py
| async def on_shutdown(self) -> None:
await self.broker.close()
|
shutdown
async
shutdown the workers
Source code in fluid/utils/worker.py
| async def shutdown(self) -> None:
"""shutdown the workers"""
if self._has_shutdown:
return
self._has_shutdown = True
logger.warning(
"gracefully stopping %d workers: %s",
self.num_workers,
", ".join(w.worker_name for w in self._workers.workers),
)
self.gracefully_stop()
try:
async with async_timeout.timeout(self._stopping_grace_period):
await self.wait_for_exit()
await self.on_shutdown()
return
except asyncio.TimeoutError:
logger.warning(
"could not stop workers %s gracefully after %s"
" seconds - force shutdown",
", ".join(
task.get_name() for task in self._workers.tasks if not task.done()
),
self._stopping_grace_period,
)
except asyncio.CancelledError:
pass
self._force_shutdown = True
self._workers.cancel()
try:
await self.wait_for_exit()
except asyncio.CancelledError:
pass
await self.on_shutdown()
|
bail_out
Source code in fluid/utils/worker.py
| def bail_out(self, reason: str, code: int = 1) -> None:
self.gracefully_stop()
|
safe_run
async
Context manager to run a worker safely
Source code in fluid/utils/worker.py
| @asynccontextmanager
async def safe_run(self) -> AsyncGenerator:
"""Context manager to run a worker safely"""
try:
yield
except asyncio.CancelledError:
if self._force_shutdown:
# we are shutting down, this is expected
pass
raise
except Exception as e:
reason = f"unhandled exception while running workers: {e}"
logger.exception(reason)
asyncio.get_event_loop().call_soon(self.bail_out, reason, 2)
else:
# worker finished without error
# make sure we are shutting down
asyncio.get_event_loop().call_soon(self.bail_out, "worker exit", 1)
|
remove_workers
remove workers from the workers
Source code in fluid/utils/worker.py
| def remove_workers(self, *workers: Worker) -> None:
"remove workers from the workers"
workers_, _ = self._workers.workers_tasks()
for worker in workers:
try:
workers_.remove(worker)
except ValueError:
pass
|
startup
async
start the workers
Source code in fluid/utils/worker.py
| async def startup(self) -> None:
"""start the workers"""
if self._workers_task is None:
self._workers_task = asyncio.create_task(self.run(), name=self.worker_name)
for args in self._delayed_callbacks:
self._delayed_callback(*args)
self._delayed_callbacks = []
|
register_callback
register_callback(
callback, seconds, jitter=0.0, periodic=False
)
Register a callback
The callback can be periodic or not.
Source code in fluid/utils/worker.py
| def register_callback(
self,
callback: Callable[[], None],
seconds: float,
jitter: float = 0.0,
periodic: bool | float = False,
) -> None:
"""Register a callback
The callback can be periodic or not.
"""
if periodic is True:
periodic_float = seconds
elif periodic is False:
periodic_float = 0.0
else:
periodic_float = periodic
if not self.running:
self._delayed_callbacks.append((callback, seconds, jitter, periodic_float))
else:
self._delayed_callback(callback, seconds, jitter, periodic_float)
|
enter_async_context
async
Source code in fluid/scheduler/consumer.py
| async def enter_async_context(self, cm: Any) -> Any:
return await self._stack.enter_async_context(cm)
|
execute
async
Execute a task and wait for it to finish
Source code in fluid/scheduler/consumer.py
| async def execute(self, task: Task | str, **params: Any) -> TaskRun:
"""Execute a task and wait for it to finish"""
task_run = self.create_task_run(task, **params)
await task_run.execute()
return task_run
|
execute_sync
execute_sync(task, **params)
Source code in fluid/scheduler/consumer.py
| def execute_sync(self, task: Task | str, **params: Any) -> TaskRun:
return asyncio.run(self._execute_and_exit(task, **params))
|
register_task
Register a task with the task manager
Only tasks registered can be executed by a task manager
Source code in fluid/scheduler/consumer.py
| def register_task(self, task: Task) -> None:
"""Register a task with the task manager
Only tasks registered can be executed by a task manager
"""
self.broker.register_task(task)
|
queue
async
queue(task, priority=None, **params)
Queue a task for execution
This methods fires two events:
- queue: when the task is about to be queued
- queued: after the task is queued
Source code in fluid/scheduler/consumer.py
| async def queue(
self,
task: str | Task,
priority: TaskPriority | None = None,
**params: Any,
) -> TaskRun:
"""Queue a task for execution
This methods fires two events:
- queue: when the task is about to be queued
- queued: after the task is queued
"""
task_run = self.create_task_run(task, priority=priority, **params)
self.dispatcher.dispatch(task_run)
task_run.set_state(TaskState.queued)
await self.broker.queue_task(task_run)
return task_run
|
create_task_run
create_task_run(task, run_id='', priority=None, **params)
Create a TaskRun in init
state
Source code in fluid/scheduler/consumer.py
| def create_task_run(
self,
task: str | Task,
run_id: str = "",
priority: TaskPriority | None = None,
**params: Any,
) -> TaskRun:
"""Create a TaskRun in `init` state"""
if isinstance(task, str):
task = self.broker.task_from_registry(task)
run_id = run_id or self.broker.new_uuid()
return TaskRun(
id=run_id,
task=task,
priority=priority or task.priority,
params=params,
task_manager=self,
)
|
register_from_module
register_from_module(module)
Source code in fluid/scheduler/consumer.py
| def register_from_module(self, module: Any) -> None:
for name in dir(module):
if name.startswith("_"):
continue
if isinstance(obj := getattr(module, name), Task):
self.register_task(obj)
|
cli
Create the task manager command line interface
Source code in fluid/scheduler/consumer.py
| def cli(self, **kwargs: Any) -> Any:
"""Create the task manager command line interface"""
try:
from fluid.scheduler.cli import TaskManagerCLI
except ImportError:
raise ImportError(
"TaskManagerCLI is not available - "
"install with `pip install aio-fluid[cli]`"
) from None
return TaskManagerCLI(self, **kwargs)
|
sync_queue
Source code in fluid/scheduler/consumer.py
| def sync_queue(self, task: str | Task) -> None:
self._task_to_queue.appendleft(task)
|
sync_priority_queue
sync_priority_queue(task)
Source code in fluid/scheduler/consumer.py
| def sync_priority_queue(self, task: str | Task) -> None:
self._priority_task_run_queue.appendleft(self.create_task_run(task))
|
num_concurrent_tasks_for
num_concurrent_tasks_for(task_name)
The number of concurrent tasks for a given task_name
Source code in fluid/scheduler/consumer.py
| def num_concurrent_tasks_for(self, task_name: str) -> int:
"""The number of concurrent tasks for a given task_name"""
return len(self._concurrent_tasks[task_name])
|
queue_and_wait
async
queue_and_wait(task, *, timeout=2, **params)
Queue a task and wait for it to finish
Source code in fluid/scheduler/consumer.py
| async def queue_and_wait(
self, task: str, *, timeout: int = 2, **params: Any
) -> TaskRun:
"""Queue a task and wait for it to finish"""
with TaskRunWaiter(self) as waiter:
return await waiter.wait(await self.queue(task, **params), timeout=timeout)
|
register_async_handler
register_async_handler(event, handler)
Source code in fluid/scheduler/consumer.py
| def register_async_handler(self, event: Event | str, handler: AsyncHandler) -> None:
event = Event.from_string_or_event(event)
self.dispatcher.register_handler(
f"{event.type}.async_dispatch",
self._async_dispatcher_worker.send,
)
self._async_dispatcher_worker.dispatcher.register_handler(event, handler)
|
unregister_async_handler
unregister_async_handler(event)
Source code in fluid/scheduler/consumer.py
| def unregister_async_handler(self, event: Event | str) -> AsyncHandler | None:
return self._async_dispatcher_worker.dispatcher.unregister_handler(event)
|