Skip to content

Task Consumer

The task consumer is a TaskManager which is also a Workers that consumes tasks from the task queue and executes them. It can be imported from fluid.scheduler:

from fastapi.scheduler import TaskConsumer

fluid.scheduler.TaskConsumer

TaskConsumer(**config)

Bases: TaskManager, Workers

The Task Consumer is a Task Manager responsible for consuming tasks from a task queue

Source code in fluid/scheduler/consumer.py
def __init__(self, **config: Any) -> None:
    super().__init__(**config)
    Workers.__init__(self)
    self._async_dispatcher_worker = AsyncConsumer(AsyncTaskDispatcher())
    self._concurrent_tasks: dict[str, dict[str, TaskRun]] = defaultdict(dict)
    self._task_to_queue: deque[str | Task] = deque()
    self._priority_task_run_queue: deque[TaskRun] = deque()
    self._queue_tasks_worker = WorkerFunction(
        self._queue_task, name="queue-task-worker"
    )
    self.add_workers(self._queue_tasks_worker)
    self.add_workers(self._async_dispatcher_worker)
    for i in range(self.config.max_concurrent_tasks):
        worker_name = f"task-worker-{i+1}"
        self.add_workers(
            WorkerFunction(
                partial(self._consume_tasks, worker_name), name=worker_name
            )
        )

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

running property

running

state instance-attribute

state = {}

config instance-attribute

config = TaskManagerConfig(**kwargs)

dispatcher instance-attribute

dispatcher = TaskDispatcher()

broker instance-attribute

broker = from_url(broker_url)

registry property

registry

type property

type

num_concurrent_tasks property

num_concurrent_tasks

The number of concurrent_tasks running in the consumer

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    return await self._workers.status()

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    self._workers.gracefully_stop()

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._running

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._workers.is_stopping()

run async

run()

run the workers

Source code in fluid/utils/worker.py
async def run(self) -> None:
    """run the workers"""
    with self.start_running():
        async with self.safe_run():
            workers, _ = self._workers.workers_tasks()
            self._workers.workers = tuple(workers)
            self._workers.tasks = tuple(
                self.create_task(worker) for worker in workers
            )
            await asyncio.gather(*self._workers.tasks)
        await self.shutdown()

start_running

start_running()
Source code in fluid/utils/worker.py
@contextmanager
def start_running(self) -> Generator:
    if self._running:
        raise RuntimeError("Worker is already running")
    self._running = True
    try:
        logger.info("%s started running", self.worker_name)
        yield
    finally:
        self._running = False
        logger.warning("%s stopped running", self.worker_name)

add_workers

add_workers(*workers)

add workers to the workers

Source code in fluid/utils/worker.py
def add_workers(self, *workers: Worker) -> None:
    """add workers to the workers"""
    workers_, _ = self._workers.workers_tasks()
    for worker in workers:
        if worker not in workers_:
            workers_.append(worker)

wait_for_exit async

wait_for_exit()
Source code in fluid/utils/worker.py
async def wait_for_exit(self) -> None:
    if self._workers_task is not None:
        await self._workers_task

create_task

create_task(worker)
Source code in fluid/utils/worker.py
def create_task(self, worker: Worker) -> asyncio.Task:
    return asyncio.create_task(
        self._run_worker(worker), name=f"{self.worker_name}-{worker.worker_name}"
    )

on_shutdown async

on_shutdown()
Source code in fluid/scheduler/consumer.py
async def on_shutdown(self) -> None:
    await self.broker.close()

shutdown async

shutdown()

shutdown the workers

Source code in fluid/utils/worker.py
async def shutdown(self) -> None:
    """shutdown the workers"""
    if self._has_shutdown:
        return
    self._has_shutdown = True
    logger.warning(
        "gracefully stopping %d workers: %s",
        self.num_workers,
        ", ".join(w.worker_name for w in self._workers.workers),
    )
    self.gracefully_stop()
    try:
        async with async_timeout.timeout(self._stopping_grace_period):
            await self.wait_for_exit()
        await self.on_shutdown()
        return
    except asyncio.TimeoutError:
        logger.warning(
            "could not stop workers %s gracefully after %s"
            " seconds - force shutdown",
            ", ".join(
                task.get_name() for task in self._workers.tasks if not task.done()
            ),
            self._stopping_grace_period,
        )
    except asyncio.CancelledError:
        pass
    self._force_shutdown = True
    self._workers.cancel()
    try:
        await self.wait_for_exit()
    except asyncio.CancelledError:
        pass
    await self.on_shutdown()

bail_out

bail_out(reason, code=1)
Source code in fluid/utils/worker.py
def bail_out(self, reason: str, code: int = 1) -> None:
    self.gracefully_stop()

safe_run async

safe_run()

Context manager to run a worker safely

Source code in fluid/utils/worker.py
@asynccontextmanager
async def safe_run(self) -> AsyncGenerator:
    """Context manager to run a worker safely"""
    try:
        yield
    except asyncio.CancelledError:
        if self._force_shutdown:
            # we are shutting down, this is expected
            pass
        raise
    except Exception as e:
        reason = f"unhandled exception while running workers: {e}"
        logger.exception(reason)
        asyncio.get_event_loop().call_soon(self.bail_out, reason, 2)
    else:
        # worker finished without error
        # make sure we are shutting down
        asyncio.get_event_loop().call_soon(self.bail_out, "worker exit", 1)

remove_workers

remove_workers(*workers)

remove workers from the workers

Source code in fluid/utils/worker.py
def remove_workers(self, *workers: Worker) -> None:
    "remove workers from the workers"
    workers_, _ = self._workers.workers_tasks()
    for worker in workers:
        try:
            workers_.remove(worker)
        except ValueError:
            pass

startup async

startup()

start the workers

Source code in fluid/utils/worker.py
async def startup(self) -> None:
    """start the workers"""
    if self._workers_task is None:
        self._workers_task = asyncio.create_task(self.run(), name=self.worker_name)
        for args in self._delayed_callbacks:
            self._delayed_callback(*args)
        self._delayed_callbacks = []

register_callback

register_callback(
    callback, seconds, jitter=0.0, periodic=False
)

Register a callback

The callback can be periodic or not.

Source code in fluid/utils/worker.py
def register_callback(
    self,
    callback: Callable[[], None],
    seconds: float,
    jitter: float = 0.0,
    periodic: bool | float = False,
) -> None:
    """Register a callback

    The callback can be periodic or not.
    """
    if periodic is True:
        periodic_float = seconds
    elif periodic is False:
        periodic_float = 0.0
    else:
        periodic_float = periodic
    if not self.running:
        self._delayed_callbacks.append((callback, seconds, jitter, periodic_float))
    else:
        self._delayed_callback(callback, seconds, jitter, periodic_float)

enter_async_context async

enter_async_context(cm)
Source code in fluid/scheduler/consumer.py
async def enter_async_context(self, cm: Any) -> Any:
    return await self._stack.enter_async_context(cm)

execute async

execute(task, **params)

Execute a task and wait for it to finish

Source code in fluid/scheduler/consumer.py
async def execute(self, task: Task | str, **params: Any) -> TaskRun:
    """Execute a task and wait for it to finish"""
    task_run = self.create_task_run(task, **params)
    await task_run.execute()
    return task_run

execute_sync

execute_sync(task, **params)
Source code in fluid/scheduler/consumer.py
def execute_sync(self, task: Task | str, **params: Any) -> TaskRun:
    return asyncio.run(self._execute_and_exit(task, **params))

register_task

register_task(task)

Register a task with the task manager

Only tasks registered can be executed by a task manager

Source code in fluid/scheduler/consumer.py
def register_task(self, task: Task) -> None:
    """Register a task with the task manager

    Only tasks registered can be executed by a task manager
    """
    self.broker.register_task(task)

queue async

queue(task, priority=None, **params)

Queue a task for execution

This methods fires two events:

  • queue: when the task is about to be queued
  • queued: after the task is queued
Source code in fluid/scheduler/consumer.py
async def queue(
    self,
    task: str | Task,
    priority: TaskPriority | None = None,
    **params: Any,
) -> TaskRun:
    """Queue a task for execution

    This methods fires two events:

    - queue: when the task is about to be queued
    - queued: after the task is queued
    """
    task_run = self.create_task_run(task, priority=priority, **params)
    self.dispatcher.dispatch(task_run)
    task_run.set_state(TaskState.queued)
    await self.broker.queue_task(task_run)
    return task_run

create_task_run

create_task_run(task, run_id='', priority=None, **params)

Create a TaskRun in init state

Source code in fluid/scheduler/consumer.py
def create_task_run(
    self,
    task: str | Task,
    run_id: str = "",
    priority: TaskPriority | None = None,
    **params: Any,
) -> TaskRun:
    """Create a TaskRun in `init` state"""
    if isinstance(task, str):
        task = self.broker.task_from_registry(task)
    run_id = run_id or self.broker.new_uuid()
    return TaskRun(
        id=run_id,
        task=task,
        priority=priority or task.priority,
        params=params,
        task_manager=self,
    )

register_from_module

register_from_module(module)
Source code in fluid/scheduler/consumer.py
def register_from_module(self, module: Any) -> None:
    for name in dir(module):
        if name.startswith("_"):
            continue
        if isinstance(obj := getattr(module, name), Task):
            self.register_task(obj)

cli

cli(**kwargs)

Create the task manager command line interface

Source code in fluid/scheduler/consumer.py
def cli(self, **kwargs: Any) -> Any:
    """Create the task manager command line interface"""
    try:
        from fluid.scheduler.cli import TaskManagerCLI
    except ImportError:
        raise ImportError(
            "TaskManagerCLI is not available - "
            "install with `pip install aio-fluid[cli]`"
        ) from None
    return TaskManagerCLI(self, **kwargs)

sync_queue

sync_queue(task)
Source code in fluid/scheduler/consumer.py
def sync_queue(self, task: str | Task) -> None:
    self._task_to_queue.appendleft(task)

sync_priority_queue

sync_priority_queue(task)
Source code in fluid/scheduler/consumer.py
def sync_priority_queue(self, task: str | Task) -> None:
    self._priority_task_run_queue.appendleft(self.create_task_run(task))

num_concurrent_tasks_for

num_concurrent_tasks_for(task_name)

The number of concurrent tasks for a given task_name

Source code in fluid/scheduler/consumer.py
def num_concurrent_tasks_for(self, task_name: str) -> int:
    """The number of concurrent tasks for a given task_name"""
    return len(self._concurrent_tasks[task_name])

queue_and_wait async

queue_and_wait(task, *, timeout=2, **params)

Queue a task and wait for it to finish

Source code in fluid/scheduler/consumer.py
async def queue_and_wait(
    self, task: str, *, timeout: int = 2, **params: Any
) -> TaskRun:
    """Queue a task and wait for it to finish"""
    with TaskRunWaiter(self) as waiter:
        return await waiter.wait(await self.queue(task, **params), timeout=timeout)

register_async_handler

register_async_handler(event, handler)
Source code in fluid/scheduler/consumer.py
def register_async_handler(self, event: Event | str, handler: AsyncHandler) -> None:
    event = Event.from_string_or_event(event)
    self.dispatcher.register_handler(
        f"{event.type}.async_dispatch",
        self._async_dispatcher_worker.send,
    )
    self._async_dispatcher_worker.dispatcher.register_handler(event, handler)

unregister_async_handler

unregister_async_handler(event)
Source code in fluid/scheduler/consumer.py
def unregister_async_handler(self, event: Event | str) -> AsyncHandler | None:
    return self._async_dispatcher_worker.dispatcher.unregister_handler(event)