Skip to content

Workers

Workers are the main building block for asynchronous programming with aio-fluid. They are responsible for running tasks and managing their lifecycle. There are several worker classes which can be imported from fluid.utils.worker:

from fastapi.utils.worker import StoppingWorker

fluid.utils.worker.Worker

Worker(name='')

Bases: ABC

The base class of a worker that can be run

Source code in fluid/utils/worker.py
def __init__(self, name: str = "") -> None:
    self._name: str = name or underscore(type(self).__name__)

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

The number of workers in this worker

status abstractmethod async

status()

Get the status of the worker.

Source code in fluid/utils/worker.py
@abstractmethod
async def status(self) -> dict:
    """
    Get the status of the worker.
    """

gracefully_stop abstractmethod

gracefully_stop()

gracefully stop the worker

Source code in fluid/utils/worker.py
@abstractmethod
def gracefully_stop(self) -> None:
    "gracefully stop the worker"

is_running abstractmethod

is_running()

Is the worker running?

Source code in fluid/utils/worker.py
@abstractmethod
def is_running(self) -> bool:
    """Is the worker running?"""

is_stopping abstractmethod

is_stopping()

Is the worker stopping?

Source code in fluid/utils/worker.py
@abstractmethod
def is_stopping(self) -> bool:
    """Is the worker stopping?"""

run abstractmethod async

run()

run the worker

Source code in fluid/utils/worker.py
@abstractmethod
async def run(self) -> None:
    """run the worker"""

fluid.utils.worker.RunningWorker

RunningWorker(name='')

Bases: Worker

A Worker that can be started

Source code in fluid/utils/worker.py
def __init__(self, name: str = "") -> None:
    super().__init__(name)
    self._running: bool = False

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

The number of workers in this worker

status abstractmethod async

status()

Get the status of the worker.

Source code in fluid/utils/worker.py
@abstractmethod
async def status(self) -> dict:
    """
    Get the status of the worker.
    """

gracefully_stop abstractmethod

gracefully_stop()

gracefully stop the worker

Source code in fluid/utils/worker.py
@abstractmethod
def gracefully_stop(self) -> None:
    "gracefully stop the worker"

is_stopping abstractmethod

is_stopping()

Is the worker stopping?

Source code in fluid/utils/worker.py
@abstractmethod
def is_stopping(self) -> bool:
    """Is the worker stopping?"""

run abstractmethod async

run()

run the worker

Source code in fluid/utils/worker.py
@abstractmethod
async def run(self) -> None:
    """run the worker"""

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._running

start_running

start_running()
Source code in fluid/utils/worker.py
@contextmanager
def start_running(self) -> Generator:
    if self._running:
        raise RuntimeError("Worker is already running")
    self._running = True
    try:
        logger.info("%s started running", self.worker_name)
        yield
    finally:
        self._running = False
        logger.warning("%s stopped running", self.worker_name)

fluid.utils.worker.StoppingWorker

StoppingWorker(name='')

Bases: RunningWorker

A Worker that can be stopped

Source code in fluid/utils/worker.py
def __init__(self, name: str = "") -> None:
    super().__init__(name)
    self._stopping: bool = False

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

The number of workers in this worker

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._running

run abstractmethod async

run()

run the worker

Source code in fluid/utils/worker.py
@abstractmethod
async def run(self) -> None:
    """run the worker"""

start_running

start_running()
Source code in fluid/utils/worker.py
@contextmanager
def start_running(self) -> Generator:
    if self._running:
        raise RuntimeError("Worker is already running")
    self._running = True
    try:
        logger.info("%s started running", self.worker_name)
        yield
    finally:
        self._running = False
        logger.warning("%s stopped running", self.worker_name)

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._stopping

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    self._stopping = True

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    return {"stopping": self.is_stopping(), "running": self.is_running()}

fluid.utils.worker.WorkerFunction

WorkerFunction(run_function, heartbeat=0, name='')

Bases: StoppingWorker

A Worker that runs a coroutine function

Source code in fluid/utils/worker.py
def __init__(
    self,
    run_function: Callable[[], Awaitable[None]],
    heartbeat: float | int = 0,
    name: str = "",
) -> None:
    super().__init__(name=name)
    self._run_function = run_function
    self._heartbeat = heartbeat

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

The number of workers in this worker

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    return {"stopping": self.is_stopping(), "running": self.is_running()}

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    self._stopping = True

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._running

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._stopping

start_running

start_running()
Source code in fluid/utils/worker.py
@contextmanager
def start_running(self) -> Generator:
    if self._running:
        raise RuntimeError("Worker is already running")
    self._running = True
    try:
        logger.info("%s started running", self.worker_name)
        yield
    finally:
        self._running = False
        logger.warning("%s stopped running", self.worker_name)

run async

run()
Source code in fluid/utils/worker.py
async def run(self) -> None:
    with self.start_running():
        while not self.is_stopping():
            await self._run_function()
            await asyncio.sleep(self._heartbeat)

fluid.utils.worker.QueueConsumer

QueueConsumer(name='')

Bases: StoppingWorker, MessageProducer[MessageType]

A Worker that can receive messages

This worker can receive messages but not consume them.

Source code in fluid/utils/worker.py
def __init__(self, name: str = "") -> None:
    super().__init__(name=name)
    self._queue: asyncio.Queue[MessageType | None] = asyncio.Queue()

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

The number of workers in this worker

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    self._stopping = True

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._running

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._stopping

run abstractmethod async

run()

run the worker

Source code in fluid/utils/worker.py
@abstractmethod
async def run(self) -> None:
    """run the worker"""

start_running

start_running()
Source code in fluid/utils/worker.py
@contextmanager
def start_running(self) -> Generator:
    if self._running:
        raise RuntimeError("Worker is already running")
    self._running = True
    try:
        logger.info("%s started running", self.worker_name)
        yield
    finally:
        self._running = False
        logger.warning("%s stopped running", self.worker_name)

get_message async

get_message(timeout=0.5)
Source code in fluid/utils/worker.py
async def get_message(self, timeout: float = 0.5) -> MessageType | None:
    try:
        async with asyncio.timeout(timeout):
            return await self._queue.get()
    except asyncio.TimeoutError:
        return None
    except (asyncio.CancelledError, RuntimeError):
        if not self.is_stopping():
            raise
    return None

queue_size

queue_size()
Source code in fluid/utils/worker.py
def queue_size(self) -> int:
    return self._queue.qsize()

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    status = await super().status()
    status.update(queue_size=self.queue_size())
    return status

send

send(message)

Send a message into the worker

Source code in fluid/utils/worker.py
def send(self, message: MessageType | None) -> None:
    """Send a message into the worker"""
    self._queue.put_nowait(message)

fluid.utils.worker.QueueConsumerWorker

QueueConsumerWorker(on_message, name='')

Bases: QueueConsumer[MessageType]

A Worker that can receive and consume messages

Source code in fluid/utils/worker.py
def __init__(
    self,
    on_message: Callable[[MessageType], Awaitable[None]],
    name: str = "",
) -> None:
    super().__init__(name=name)
    self.on_message = on_message

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

The number of workers in this worker

on_message instance-attribute

on_message = on_message

send

send(message)

Send a message into the worker

Source code in fluid/utils/worker.py
def send(self, message: MessageType | None) -> None:
    """Send a message into the worker"""
    self._queue.put_nowait(message)

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    status = await super().status()
    status.update(queue_size=self.queue_size())
    return status

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    self._stopping = True

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._running

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._stopping

start_running

start_running()
Source code in fluid/utils/worker.py
@contextmanager
def start_running(self) -> Generator:
    if self._running:
        raise RuntimeError("Worker is already running")
    self._running = True
    try:
        logger.info("%s started running", self.worker_name)
        yield
    finally:
        self._running = False
        logger.warning("%s stopped running", self.worker_name)

get_message async

get_message(timeout=0.5)
Source code in fluid/utils/worker.py
async def get_message(self, timeout: float = 0.5) -> MessageType | None:
    try:
        async with asyncio.timeout(timeout):
            return await self._queue.get()
    except asyncio.TimeoutError:
        return None
    except (asyncio.CancelledError, RuntimeError):
        if not self.is_stopping():
            raise
    return None

queue_size

queue_size()
Source code in fluid/utils/worker.py
def queue_size(self) -> int:
    return self._queue.qsize()

run async

run()
Source code in fluid/utils/worker.py
async def run(self) -> None:
    with self.start_running():
        while not self.is_stopping():
            message = await self.get_message()
            if message is not None:
                await self.on_message(message)

fluid.utils.worker.AsyncConsumer

AsyncConsumer(dispatcher, name='')

Bases: QueueConsumer[MessageType]

A Worker that can dispatch async callbacks

Source code in fluid/utils/worker.py
def __init__(
    self, dispatcher: AsyncDispatcher[MessageType], name: str = ""
) -> None:
    super().__init__(name)
    self.dispatcher: AsyncDispatcher[MessageType] = dispatcher

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

The number of workers in this worker

AsyncCallback instance-attribute

AsyncCallback

dispatcher instance-attribute

dispatcher = dispatcher

send

send(message)

Send a message into the worker

Source code in fluid/utils/worker.py
def send(self, message: MessageType | None) -> None:
    """Send a message into the worker"""
    self._queue.put_nowait(message)

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    status = await super().status()
    status.update(queue_size=self.queue_size())
    return status

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    self._stopping = True

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._running

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._stopping

start_running

start_running()
Source code in fluid/utils/worker.py
@contextmanager
def start_running(self) -> Generator:
    if self._running:
        raise RuntimeError("Worker is already running")
    self._running = True
    try:
        logger.info("%s started running", self.worker_name)
        yield
    finally:
        self._running = False
        logger.warning("%s stopped running", self.worker_name)

get_message async

get_message(timeout=0.5)
Source code in fluid/utils/worker.py
async def get_message(self, timeout: float = 0.5) -> MessageType | None:
    try:
        async with asyncio.timeout(timeout):
            return await self._queue.get()
    except asyncio.TimeoutError:
        return None
    except (asyncio.CancelledError, RuntimeError):
        if not self.is_stopping():
            raise
    return None

queue_size

queue_size()
Source code in fluid/utils/worker.py
def queue_size(self) -> int:
    return self._queue.qsize()

run async

run()
Source code in fluid/utils/worker.py
async def run(self) -> None:
    with self.start_running():
        while not self.is_stopping():
            message = await self.get_message()
            if message is not None:
                await self.dispatcher.dispatch(message)

fluid.utils.worker.Workers

Workers(
    *workers,
    name="",
    stopping_grace_period=STOPPING_GRACE_PERIOD
)

Bases: MultipleWorkers

A worker managing several workers

Source code in fluid/utils/worker.py
def __init__(
    self,
    *workers: Worker,
    name: str = "",
    stopping_grace_period: int = settings.STOPPING_GRACE_PERIOD,
) -> None:
    super().__init__(
        *workers, name=name, stopping_grace_period=stopping_grace_period
    )
    self._workers_task: asyncio.Task | None = None
    self._delayed_callbacks: list[
        tuple[Callable[[], None], float, float, float]
    ] = []

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

running property

running

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    return await self._workers.status()

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    self._workers.gracefully_stop()

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._running

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._workers.is_stopping()

start_running

start_running()
Source code in fluid/utils/worker.py
@contextmanager
def start_running(self) -> Generator:
    if self._running:
        raise RuntimeError("Worker is already running")
    self._running = True
    try:
        logger.info("%s started running", self.worker_name)
        yield
    finally:
        self._running = False
        logger.warning("%s stopped running", self.worker_name)

create_task

create_task(worker)
Source code in fluid/utils/worker.py
def create_task(self, worker: Worker) -> asyncio.Task:
    return asyncio.create_task(
        self._run_worker(worker), name=f"{self.worker_name}-{worker.worker_name}"
    )

on_shutdown async

on_shutdown()

called after the workers are stopped

Source code in fluid/utils/worker.py
async def on_shutdown(self) -> None:
    """called after the workers are stopped"""

shutdown async

shutdown()

shutdown the workers

Source code in fluid/utils/worker.py
async def shutdown(self) -> None:
    """shutdown the workers"""
    if self._has_shutdown:
        return
    self._has_shutdown = True
    logger.warning(
        "gracefully stopping %d workers: %s",
        self.num_workers,
        ", ".join(w.worker_name for w in self._workers.workers),
    )
    self.gracefully_stop()
    try:
        async with asyncio.timeout(self._stopping_grace_period):
            await self.wait_for_exit()
        await self.on_shutdown()
        return
    except asyncio.TimeoutError:
        logger.warning(
            "could not stop workers %s gracefully after %s"
            " seconds - force shutdown",
            ", ".join(
                task.get_name() for task in self._workers.tasks if not task.done()
            ),
            self._stopping_grace_period,
        )
    except asyncio.CancelledError:
        pass
    self._force_shutdown = True
    self._workers.cancel()
    try:
        await self.wait_for_exit()
    except asyncio.CancelledError:
        pass
    await self.on_shutdown()

bail_out

bail_out(reason, code=1)
Source code in fluid/utils/worker.py
def bail_out(self, reason: str, code: int = 1) -> None:
    self.gracefully_stop()

safe_run async

safe_run()

Context manager to run a worker safely

Source code in fluid/utils/worker.py
@asynccontextmanager
async def safe_run(self) -> AsyncGenerator:
    """Context manager to run a worker safely"""
    try:
        yield
    except asyncio.CancelledError:
        if self._force_shutdown:
            # we are shutting down, this is expected
            pass
        raise
    except Exception as e:
        reason = f"unhandled exception while running workers: {e}"
        logger.exception(reason)
        asyncio.get_event_loop().call_soon(self.bail_out, reason, 2)
    else:
        # worker finished without error
        # make sure we are shutting down
        asyncio.get_event_loop().call_soon(self.bail_out, "worker exit", 1)

add_workers

add_workers(*workers)

add workers to the workers

Source code in fluid/utils/worker.py
def add_workers(self, *workers: Worker) -> None:
    """add workers to the workers"""
    workers_, _ = self._workers.workers_tasks()
    for worker in workers:
        if worker not in workers_:
            workers_.append(worker)

run async

run()

run the workers

Source code in fluid/utils/worker.py
async def run(self) -> None:
    """run the workers"""
    with self.start_running():
        async with self.safe_run():
            workers, _ = self._workers.workers_tasks()
            self._workers.workers = tuple(workers)
            self._workers.tasks = tuple(
                self.create_task(worker) for worker in workers
            )
            await asyncio.gather(*self._workers.tasks)
        await self.shutdown()

wait_for_exit async

wait_for_exit()
Source code in fluid/utils/worker.py
async def wait_for_exit(self) -> None:
    if self._workers_task is not None:
        await self._workers_task

remove_workers

remove_workers(*workers)

remove workers from the workers

Source code in fluid/utils/worker.py
def remove_workers(self, *workers: Worker) -> None:
    "remove workers from the workers"
    workers_, _ = self._workers.workers_tasks()
    for worker in workers:
        try:
            workers_.remove(worker)
        except ValueError:
            pass

startup async

startup()

start the workers

Source code in fluid/utils/worker.py
async def startup(self) -> None:
    """start the workers"""
    if self._workers_task is None:
        self._workers_task = asyncio.create_task(self.run(), name=self.worker_name)
        for args in self._delayed_callbacks:
            self._delayed_callback(*args)
        self._delayed_callbacks = []

register_callback

register_callback(
    callback, seconds, jitter=0.0, periodic=False
)

Register a callback

The callback can be periodic or not.

Source code in fluid/utils/worker.py
def register_callback(
    self,
    callback: Callable[[], None],
    seconds: float,
    jitter: float = 0.0,
    periodic: bool | float = False,
) -> None:
    """Register a callback

    The callback can be periodic or not.
    """
    if periodic is True:
        periodic_float = seconds
    elif periodic is False:
        periodic_float = 0.0
    else:
        periodic_float = periodic
    if not self.running:
        self._delayed_callbacks.append((callback, seconds, jitter, periodic_float))
    else:
        self._delayed_callback(callback, seconds, jitter, periodic_float)

fluid.utils.worker.DynamicWorkers

DynamicWorkers(
    *workers,
    name="",
    heartbeat=0.1,
    stopping_grace_period=STOPPING_GRACE_PERIOD
)

Bases: MultipleWorkers

Source code in fluid/utils/worker.py
def __init__(
    self,
    *workers: Worker,
    name: str = "",
    heartbeat: float | int = 0.1,
    stopping_grace_period: int = settings.STOPPING_GRACE_PERIOD,
) -> None:
    super().__init__(name)
    self._heartbeat = heartbeat
    self._workers = WorkerTasks()
    self._has_shutdown = False
    self._force_shutdown = False
    self._stopping_grace_period = stopping_grace_period
    self.add_workers(*workers)

worker_name property

worker_name

The name of the worker

num_workers property

num_workers

status async

status()
Source code in fluid/utils/worker.py
async def status(self) -> dict:
    return await self._workers.status()

gracefully_stop

gracefully_stop()
Source code in fluid/utils/worker.py
def gracefully_stop(self) -> None:
    self._workers.gracefully_stop()

is_running

is_running()
Source code in fluid/utils/worker.py
def is_running(self) -> bool:
    return self._running

is_stopping

is_stopping()
Source code in fluid/utils/worker.py
def is_stopping(self) -> bool:
    return self._workers.is_stopping()

start_running

start_running()
Source code in fluid/utils/worker.py
@contextmanager
def start_running(self) -> Generator:
    if self._running:
        raise RuntimeError("Worker is already running")
    self._running = True
    try:
        logger.info("%s started running", self.worker_name)
        yield
    finally:
        self._running = False
        logger.warning("%s stopped running", self.worker_name)

create_task

create_task(worker)
Source code in fluid/utils/worker.py
def create_task(self, worker: Worker) -> asyncio.Task:
    return asyncio.create_task(
        self._run_worker(worker), name=f"{self.worker_name}-{worker.worker_name}"
    )

on_shutdown async

on_shutdown()

called after the workers are stopped

Source code in fluid/utils/worker.py
async def on_shutdown(self) -> None:
    """called after the workers are stopped"""

shutdown async

shutdown()

shutdown the workers

Source code in fluid/utils/worker.py
async def shutdown(self) -> None:
    """shutdown the workers"""
    if self._has_shutdown:
        return
    self._has_shutdown = True
    logger.warning(
        "gracefully stopping %d workers: %s",
        self.num_workers,
        ", ".join(w.worker_name for w in self._workers.workers),
    )
    self.gracefully_stop()
    try:
        async with asyncio.timeout(self._stopping_grace_period):
            await self.wait_for_exit()
        await self.on_shutdown()
        return
    except asyncio.TimeoutError:
        logger.warning(
            "could not stop workers %s gracefully after %s"
            " seconds - force shutdown",
            ", ".join(
                task.get_name() for task in self._workers.tasks if not task.done()
            ),
            self._stopping_grace_period,
        )
    except asyncio.CancelledError:
        pass
    self._force_shutdown = True
    self._workers.cancel()
    try:
        await self.wait_for_exit()
    except asyncio.CancelledError:
        pass
    await self.on_shutdown()

bail_out

bail_out(reason, code=1)
Source code in fluid/utils/worker.py
def bail_out(self, reason: str, code: int = 1) -> None:
    self.gracefully_stop()

safe_run async

safe_run()

Context manager to run a worker safely

Source code in fluid/utils/worker.py
@asynccontextmanager
async def safe_run(self) -> AsyncGenerator:
    """Context manager to run a worker safely"""
    try:
        yield
    except asyncio.CancelledError:
        if self._force_shutdown:
            # we are shutting down, this is expected
            pass
        raise
    except Exception as e:
        reason = f"unhandled exception while running workers: {e}"
        logger.exception(reason)
        asyncio.get_event_loop().call_soon(self.bail_out, reason, 2)
    else:
        # worker finished without error
        # make sure we are shutting down
        asyncio.get_event_loop().call_soon(self.bail_out, "worker exit", 1)

add_workers

add_workers(*workers)

add workers to the workers

They can be added while the workers are running.

Source code in fluid/utils/worker.py
def add_workers(self, *workers: Worker) -> None:
    """add workers to the workers

    They can be added while the workers are running.
    """
    workers_, tasks_ = self._workers.workers_tasks()
    for worker in workers:
        workers_.append(worker)
        tasks_.append(self.create_task(worker))

run async

run()
Source code in fluid/utils/worker.py
async def run(self) -> None:
    with self.start_running():
        while not self.is_stopping():
            for worker, task in zip(self._workers.workers, self._workers.tasks):
                if worker.is_stopping() or task.done():
                    break
            await asyncio.sleep(self._heartbeat)
        await self.shutdown()

wait_for_exit async

wait_for_exit()
Source code in fluid/utils/worker.py
async def wait_for_exit(self) -> None:
    await asyncio.gather(*self._workers.tasks)