Task Broker
It can be imported from fluid.scheduler
:
from fastapi.scheduler import TaskBroker
fluid.scheduler.TaskBroker
Bases: ABC
Source code in fluid/scheduler/broker.py
| def __init__(self, url: URL) -> None:
self.url: URL = url
self.registry: TaskRegistry = TaskRegistry()
|
registry
instance-attribute
registry = TaskRegistry()
task_queue_names
abstractmethod
property
queue_task
abstractmethod
async
Queue a task
Source code in fluid/scheduler/broker.py
| @abstractmethod
async def queue_task(self, task_run: TaskRun) -> None:
"""Queue a task"""
|
get_task_run
abstractmethod
async
get_task_run(task_manager)
Get a Task run from the task queue
Source code in fluid/scheduler/broker.py
| @abstractmethod
async def get_task_run(self, task_manager: TaskManager) -> TaskRun | None:
"""Get a Task run from the task queue"""
|
queue_length
abstractmethod
async
Length of task queues
Source code in fluid/scheduler/broker.py
| @abstractmethod
async def queue_length(self) -> dict[str, int]:
"""Length of task queues"""
|
get_tasks_info
abstractmethod
async
get_tasks_info(*task_names)
List of TaskInfo objects
Source code in fluid/scheduler/broker.py
| @abstractmethod
async def get_tasks_info(self, *task_names: str) -> list[TaskInfo]:
"""List of TaskInfo objects"""
|
update_task
abstractmethod
async
update_task(task, params)
Update a task dynamic parameters
Source code in fluid/scheduler/broker.py
| @abstractmethod
async def update_task(self, task: Task, params: dict[str, Any]) -> TaskInfo:
"""Update a task dynamic parameters"""
|
close
abstractmethod
async
Close the broker on shutdown
Source code in fluid/scheduler/broker.py
| @abstractmethod
async def close(self) -> None:
"""Close the broker on shutdown"""
|
lock
abstractmethod
Create a lock
Source code in fluid/scheduler/broker.py
| @abstractmethod
def lock(self, name: str, timeout: float | None = None) -> Lock:
"""Create a lock"""
|
new_uuid
Source code in fluid/scheduler/broker.py
| def new_uuid(self) -> str:
return uuid4().hex
|
filter_tasks
async
filter_tasks(scheduled=None, enabled=None)
Source code in fluid/scheduler/broker.py
| async def filter_tasks(
self,
scheduled: bool | None = None,
enabled: bool | None = None,
) -> list[Task]:
task_info = await self.get_tasks_info()
task_map = {info.name: info for info in task_info}
tasks = []
for task in self.registry.values():
if scheduled is not None and bool(task.schedule) is not scheduled:
continue
if enabled is not None and task_map[task.name].enabled is not enabled:
continue
tasks.append(task)
return tasks
|
task_from_registry
Source code in fluid/scheduler/broker.py
| def task_from_registry(self, task: str | Task) -> Task:
if isinstance(task, Task):
self.register_task(task)
return task
else:
if task_ := self.registry.get(task):
return task_
raise UnknownTaskError(task)
|
register_task
Source code in fluid/scheduler/broker.py
| def register_task(self, task: Task) -> None:
self.registry[task.name] = task
|
enable_task
async
enable_task(task_name, enable=True)
Enable or disable a registered task
Source code in fluid/scheduler/broker.py
| async def enable_task(self, task_name: str, enable: bool = True) -> TaskInfo:
"""Enable or disable a registered task"""
task = self.registry.get(task_name)
if not task:
raise UnknownTaskError(task_name)
return await self.update_task(task, dict(enabled=enable))
|
from_url
classmethod
Source code in fluid/scheduler/broker.py
| @classmethod
def from_url(cls, url: str = "") -> TaskBroker:
p = URL(url or broker_url_from_env())
if factory := _brokers.get(p.scheme):
return factory(p)
raise RuntimeError(f"Invalid broker {p}")
|
register_broker
classmethod
register_broker(name, factory)
Source code in fluid/scheduler/broker.py
| @classmethod
def register_broker(cls, name: str, factory: type[TaskBroker]) -> None:
_brokers[name] = factory
|