Skip to content

Task Run

It can be imported from fluid.scheduler:

from fastapi.scheduler import TaskRun

fluid.scheduler.TaskRun

Bases: BaseModel

A TaskRun contains all the data generated by a Task run

id instance-attribute

id

task instance-attribute

task

priority instance-attribute

priority

params instance-attribute

params

state class-attribute instance-attribute

state = init

task_manager class-attribute instance-attribute

task_manager = Field(exclude=True, repr=False)

queued class-attribute instance-attribute

queued = None

start class-attribute instance-attribute

start = None

end class-attribute instance-attribute

end = None

logger property

logger

in_queue property

in_queue

duration property

duration

duration_ms property

duration_ms

total property

total

name property

name

name_id property

name_id

is_done property

is_done

is_failure property

is_failure

execute async

execute()
Source code in fluid/scheduler/models.py
async def execute(self) -> None:
    try:
        self.set_state(TaskState.running)
        await self.task.executor(self)
    except Exception:
        self.set_state(TaskState.failure)
        raise
    else:
        self.set_state(TaskState.success)

serialize_task

serialize_task(task, _info)
Source code in fluid/scheduler/models.py
@field_serializer("task")
def serialize_task(self, task: Task, _info: Any) -> str:
    return task.name

params_dump_json

params_dump_json()
Source code in fluid/scheduler/models.py
def params_dump_json(self) -> str:
    return self.task.params_dump_json(self.params)

set_state

set_state(state, state_time=None)
Source code in fluid/scheduler/models.py
def set_state(
    self,
    state: TaskState,
    state_time: datetime | None = None,
) -> None:
    if self.state == state:
        return
    state_time = as_utc(state_time)
    match (self.state, state):
        case (TaskState.init, TaskState.queued):
            self.queued = state_time
            self.state = state
            self._dispatch()
        case (TaskState.init, _):
            self.set_state(TaskState.queued, state_time)
            self.set_state(state, state_time)
        case (TaskState.queued, TaskState.running):
            self.start = state_time
            self.state = state
            self._dispatch()
        case (
            TaskState.queued,
            TaskState.success
            | TaskState.aborted
            | TaskState.rate_limited
            | TaskState.failure,
        ):
            self.set_state(TaskState.running, state_time)
            self.set_state(state, state_time)
        case (
            TaskState.running,
            TaskState.success
            | TaskState.aborted
            | TaskState.rate_limited
            | TaskState.failure,
        ):
            self.end = state_time
            self.state = state
            self._dispatch()
        case _:
            raise TaskRunError(f"invalid state transition {self.state} -> {state}")

lock

lock(timeout)
Source code in fluid/scheduler/models.py
def lock(self, timeout: float | None) -> Lock:
    return self.task_manager.broker.lock(self.name, timeout=timeout)