Skip to content

Task Broker

It can be imported from fluid.scheduler:

from fastapi.scheduler import TaskBroker

fluid.scheduler.TaskBroker

TaskBroker(url)

Bases: ABC

Source code in fluid/scheduler/broker.py
def __init__(self, url: URL) -> None:
    self.url: URL = url
    self.registry: TaskRegistry = TaskRegistry()

url instance-attribute

url = url

registry instance-attribute

registry = TaskRegistry()

task_queue_names abstractmethod property

task_queue_names

Names of the task queues

queue_task abstractmethod async

queue_task(task_run)

Queue a task

Source code in fluid/scheduler/broker.py
@abstractmethod
async def queue_task(self, task_run: TaskRun) -> None:
    """Queue a task"""

get_task_run abstractmethod async

get_task_run(task_manager)

Get a Task run from the task queue

Source code in fluid/scheduler/broker.py
@abstractmethod
async def get_task_run(self, task_manager: TaskManager) -> TaskRun | None:
    """Get a Task run from the task queue"""

queue_length abstractmethod async

queue_length()

Length of task queues

Source code in fluid/scheduler/broker.py
@abstractmethod
async def queue_length(self) -> dict[str, int]:
    """Length of task queues"""

get_tasks_info abstractmethod async

get_tasks_info(*task_names)

List of TaskInfo objects

Source code in fluid/scheduler/broker.py
@abstractmethod
async def get_tasks_info(self, *task_names: str) -> list[TaskInfo]:
    """List of TaskInfo objects"""

update_task abstractmethod async

update_task(task, params)

Update a task dynamic parameters

Source code in fluid/scheduler/broker.py
@abstractmethod
async def update_task(self, task: Task, params: dict[str, Any]) -> TaskInfo:
    """Update a task dynamic parameters"""

close abstractmethod async

close()

Close the broker on shutdown

Source code in fluid/scheduler/broker.py
@abstractmethod
async def close(self) -> None:
    """Close the broker on shutdown"""

lock abstractmethod

lock(name, timeout=None)

Create a lock

Source code in fluid/scheduler/broker.py
@abstractmethod
def lock(self, name: str, timeout: float | None = None) -> Lock:
    """Create a lock"""

new_uuid

new_uuid()
Source code in fluid/scheduler/broker.py
def new_uuid(self) -> str:
    return uuid4().hex

filter_tasks async

filter_tasks(scheduled=None, enabled=None)
Source code in fluid/scheduler/broker.py
async def filter_tasks(
    self,
    scheduled: bool | None = None,
    enabled: bool | None = None,
) -> list[Task]:
    task_info = await self.get_tasks_info()
    task_map = {info.name: info for info in task_info}
    tasks = []
    for task in self.registry.values():
        if scheduled is not None and bool(task.schedule) is not scheduled:
            continue
        if enabled is not None and task_map[task.name].enabled is not enabled:
            continue
        tasks.append(task)
    return tasks

task_from_registry

task_from_registry(task)
Source code in fluid/scheduler/broker.py
def task_from_registry(self, task: str | Task) -> Task:
    if isinstance(task, Task):
        self.register_task(task)
        return task
    else:
        if task_ := self.registry.get(task):
            return task_
        raise UnknownTaskError(task)

register_task

register_task(task)
Source code in fluid/scheduler/broker.py
def register_task(self, task: Task) -> None:
    self.registry[task.name] = task

enable_task async

enable_task(task, enable=True)

Enable or disable a registered task

Source code in fluid/scheduler/broker.py
async def enable_task(self, task: str | Task, enable: bool = True) -> TaskInfo:
    """Enable or disable a registered task"""
    task_ = self.task_from_registry(task)
    return await self.update_task(task_, dict(enabled=enable))

from_url classmethod

from_url(url='')
Source code in fluid/scheduler/broker.py
@classmethod
def from_url(cls, url: str = "") -> TaskBroker:
    p = URL(url or broker_url_from_env())
    if factory := _brokers.get(p.scheme):
        return factory(p)
    raise RuntimeError(f"Invalid broker {p}")

register_broker classmethod

register_broker(name, factory)
Source code in fluid/scheduler/broker.py
@classmethod
def register_broker(cls, name: str, factory: type[TaskBroker]) -> None:
    _brokers[name] = factory