Task Queue¶
This module has a lightweight implementation of a distributed task producer (TaskScheduler) and consumer (TaskConsumer). The middleware for distributing tasks can be configured via the Broker interface. A redis broker is provided for convenience.
Tasks¶
Tasks are standard python async functions decorated with the task
decorator.
There are two types of tasks implemented
- Simple concurrent tasks - they run concurrently with the task consumer - thy must be IO type tasks (no heavy CPU bound operations)
from fluid.scheduler import task, TaskRun
@task
async def fecth_data(ctx: TaskRun):
# fetch data
data = await http_cli.get("https://...")
data_id = await datastore_cli.stote(data)
# trigger another task
ctx.task_manager.queue("heavy_calculation", data_id=data_id)
- CPU bound tasks - they run on a subprocess
from fluid.scheduler import task, TaskRun
@task(cpu_bound=True)
async def heavy_calculation(ctx: TaskRun):
data = await datastore_cli.get(ctx.params["data_id"])
# perform some heavy calculation
...
# trigger another task
ctx.task_manager.queue("fetch_data")
Both tasks can be periodically scheduled via the schedule
keyword argument:
from datetime import timedelta
from fluid.scheduler import task, TaskContext, every
@task(schedule=every(timedelta(seconds=1)))
async def scheduled(context: TaskContext) -> str:
await asyncio.sleep(0.1)
return "OK"
Broker¶
A Task broker needs to implement three abstract methods
@abstractmethod
async def queue_task(self, queued_task: QueuedTask) -> TaskRun:
"""Queue a task"""
@abstractmethod
async def get_task_run(self) -> Optional[TaskRun]:
"""Get a Task run from the task queue"""
@abstractmethod
async def queue_length(self) -> Dict[str, int]:
"""Length of task queues"""
The library ships a Redis broker for convenience.