Skip to content

Task Manager

It can be imported from fluid.scheduler:

from fastapi.scheduler import TaskManager

fluid.scheduler.TaskManager

TaskManager(**kwargs)

The task manager is the main entry point for managing tasks

Source code in fluid/scheduler/consumer.py
def __init__(self, **kwargs: Any) -> None:
    self.state: dict[str, Any] = {}
    self.config: TaskManagerConfig = TaskManagerConfig(**kwargs)
    self.dispatcher: Annotated[
        TaskDispatcher,
        Doc(
            """
            A dispatcher of task run events.

            Register handlers to listen for task run events.
            """
        ),
    ] = TaskDispatcher()
    self.broker = TaskBroker.from_url(self.config.broker_url)
    self._stack = AsyncExitStack()

state instance-attribute

state = {}

config instance-attribute

config = TaskManagerConfig(**kwargs)

dispatcher instance-attribute

dispatcher = TaskDispatcher()

broker instance-attribute

broker = from_url(broker_url)

registry property

registry

type property

type

enter_async_context async

enter_async_context(cm)
Source code in fluid/scheduler/consumer.py
async def enter_async_context(self, cm: Any) -> Any:
    return await self._stack.enter_async_context(cm)

execute async

execute(task, **params)

Execute a task and wait for it to finish

Source code in fluid/scheduler/consumer.py
async def execute(self, task: Task | str, **params: Any) -> TaskRun:
    """Execute a task and wait for it to finish"""
    task_run = self.create_task_run(task, **params)
    await task_run.execute()
    return task_run

on_shutdown async

on_shutdown()
Source code in fluid/scheduler/consumer.py
async def on_shutdown(self) -> None:
    await self.broker.close()

execute_sync

execute_sync(task, **params)
Source code in fluid/scheduler/consumer.py
def execute_sync(self, task: Task | str, **params: Any) -> TaskRun:
    return asyncio.run(self._execute_and_exit(task, **params))

register_task

register_task(task)

Register a task with the task manager

Only tasks registered can be executed by a task manager

Source code in fluid/scheduler/consumer.py
def register_task(self, task: Task) -> None:
    """Register a task with the task manager

    Only tasks registered can be executed by a task manager
    """
    self.broker.register_task(task)

queue async

queue(task, priority=None, **params)

Queue a task for execution

This methods fires two events:

  • queue: when the task is about to be queued
  • queued: after the task is queued
Source code in fluid/scheduler/consumer.py
async def queue(
    self,
    task: str | Task,
    priority: TaskPriority | None = None,
    **params: Any,
) -> TaskRun:
    """Queue a task for execution

    This methods fires two events:

    - queue: when the task is about to be queued
    - queued: after the task is queued
    """
    task_run = self.create_task_run(task, priority=priority, **params)
    self.dispatcher.dispatch(task_run)
    task_run.set_state(TaskState.queued)
    await self.broker.queue_task(task_run)
    return task_run

create_task_run

create_task_run(task, run_id='', priority=None, **params)

Create a TaskRun in init state

Source code in fluid/scheduler/consumer.py
def create_task_run(
    self,
    task: str | Task,
    run_id: str = "",
    priority: TaskPriority | None = None,
    **params: Any,
) -> TaskRun:
    """Create a TaskRun in `init` state"""
    if isinstance(task, str):
        task = self.broker.task_from_registry(task)
    run_id = run_id or self.broker.new_uuid()
    return TaskRun(
        id=run_id,
        task=task,
        priority=priority or task.priority,
        params=params,
        task_manager=self,
    )

register_from_module

register_from_module(module)
Source code in fluid/scheduler/consumer.py
def register_from_module(self, module: Any) -> None:
    for name in dir(module):
        if name.startswith("_"):
            continue
        if isinstance(obj := getattr(module, name), Task):
            self.register_task(obj)

register_async_handler

register_async_handler(event, handler)

Register an async handler for a given event

This method is a no op for a TaskManager that is not a worker

Source code in fluid/scheduler/consumer.py
def register_async_handler(self, event: str, handler: AsyncHandler) -> None:
    """Register an async handler for a given event

    This method is a no op for a TaskManager that is not a worker
    """

unregister_async_handler

unregister_async_handler(event)

Unregister an async handler for a given event

This method is a no op for a TaskManager that is not a worker

Source code in fluid/scheduler/consumer.py
def unregister_async_handler(self, event: Event | str) -> AsyncHandler | None:
    """Unregister an async handler for a given event

    This method is a no op for a TaskManager that is not a worker
    """
    return None

cli

cli(**kwargs)

Create the task manager command line interface

Source code in fluid/scheduler/consumer.py
def cli(self, **kwargs: Any) -> Any:
    """Create the task manager command line interface"""
    try:
        from fluid.scheduler.cli import TaskManagerCLI
    except ImportError:
        raise ImportError(
            "TaskManagerCLI is not available - "
            "install with `pip install aio-fluid[cli]`"
        ) from None
    return TaskManagerCLI(self, **kwargs)