Task Run
It can be imported from fluid.scheduler
:
from fastapi.scheduler import TaskRun
fluid.scheduler.TaskRun
Bases: BaseModel
A TaskRun contains all the data generated by a Task run
priority
instance-attribute
params
instance-attribute
state
class-attribute
instance-attribute
task_manager
class-attribute
instance-attribute
task_manager = Field(exclude=True, repr=False)
queued
class-attribute
instance-attribute
start
class-attribute
instance-attribute
end
class-attribute
instance-attribute
execute
async
Source code in fluid/scheduler/models.py
| async def execute(self) -> None:
try:
self.set_state(TaskState.running)
await self.task.executor(self)
except Exception:
self.set_state(TaskState.failure)
raise
else:
self.set_state(TaskState.success)
|
serialize_task
serialize_task(task, _info)
Source code in fluid/scheduler/models.py
| @field_serializer("task")
def serialize_task(self, task: Task, _info: Any) -> str:
return task.name
|
params_dump_json
Source code in fluid/scheduler/models.py
| def params_dump_json(self) -> str:
return self.task.params_dump_json(self.params)
|
set_state
set_state(state, state_time=None)
Source code in fluid/scheduler/models.py
| def set_state(
self,
state: TaskState,
state_time: datetime | None = None,
) -> None:
if self.state == state:
return
state_time = as_utc(state_time)
match (self.state, state):
case (TaskState.init, TaskState.queued):
self.queued = state_time
self.state = state
self._dispatch()
case (TaskState.init, _):
self.set_state(TaskState.queued, state_time)
self.set_state(state, state_time)
case (TaskState.queued, TaskState.running):
self.start = state_time
self.state = state
self._dispatch()
case (
TaskState.queued,
TaskState.success
| TaskState.aborted
| TaskState.rate_limited
| TaskState.failure,
):
self.set_state(TaskState.running, state_time)
self.set_state(state, state_time)
case (
TaskState.running,
TaskState.success
| TaskState.aborted
| TaskState.rate_limited
| TaskState.failure,
):
self.end = state_time
self.state = state
self._dispatch()
case _:
raise TaskRunError(f"invalid state transition {self.state} -> {state}")
|
lock
Source code in fluid/scheduler/models.py
| def lock(self, timeout: float | None) -> Lock:
return self.task_manager.broker.lock(self.name, timeout=timeout)
|