Skip to content

Event Dispatchers

A set of classes for dispatching events, they can be imported from fluid.utils.dispatcher:

from fluid.utils.dispatcher import Dispatcher

fluid.utils.dispatcher.BaseDispatcher

BaseDispatcher()

Bases: Generic[MessageType, MessageHandlerType], ABC

Source code in fluid/utils/dispatcher.py
def __init__(self) -> None:
    self._msg_handlers: defaultdict[str, dict[str, MessageHandlerType]] = (
        defaultdict(
            dict,
        )
    )

register_handler

register_handler(event, handler)
Source code in fluid/utils/dispatcher.py
def register_handler(
    self,
    event: Event | str,
    handler: MessageHandlerType,
) -> MessageHandlerType | None:
    event = Event.from_string_or_event(event)
    previous = self._msg_handlers[event.type].get(event.tag)
    self._msg_handlers[event.type][event.tag] = handler
    return previous

unregister_handler

unregister_handler(event)
Source code in fluid/utils/dispatcher.py
def unregister_handler(self, event: Event | str) -> MessageHandlerType | None:
    event = Event.from_string_or_event(event)
    return self._msg_handlers[event.type].pop(event.tag, None)

get_handlers

get_handlers(message)
Source code in fluid/utils/dispatcher.py
def get_handlers(
    self,
    message: MessageType,
) -> dict[str, MessageHandlerType] | None:
    message_type = str(self.message_type(message))
    return self._msg_handlers.get(message_type)

message_type abstractmethod

message_type(message)

return the message type

Source code in fluid/utils/dispatcher.py
@abstractmethod
def message_type(self, message: MessageType) -> str:
    """return the message type"""

fluid.utils.dispatcher.Dispatcher

Dispatcher()

Bases: BaseDispatcher[MessageType, Callable[[MessageType], None]]

Dispatcher for sync handlers

Source code in fluid/utils/dispatcher.py
def __init__(self) -> None:
    self._msg_handlers: defaultdict[str, dict[str, MessageHandlerType]] = (
        defaultdict(
            dict,
        )
    )

register_handler

register_handler(event, handler)
Source code in fluid/utils/dispatcher.py
def register_handler(
    self,
    event: Event | str,
    handler: MessageHandlerType,
) -> MessageHandlerType | None:
    event = Event.from_string_or_event(event)
    previous = self._msg_handlers[event.type].get(event.tag)
    self._msg_handlers[event.type][event.tag] = handler
    return previous

unregister_handler

unregister_handler(event)
Source code in fluid/utils/dispatcher.py
def unregister_handler(self, event: Event | str) -> MessageHandlerType | None:
    event = Event.from_string_or_event(event)
    return self._msg_handlers[event.type].pop(event.tag, None)

get_handlers

get_handlers(message)
Source code in fluid/utils/dispatcher.py
def get_handlers(
    self,
    message: MessageType,
) -> dict[str, MessageHandlerType] | None:
    message_type = str(self.message_type(message))
    return self._msg_handlers.get(message_type)

message_type abstractmethod

message_type(message)

return the message type

Source code in fluid/utils/dispatcher.py
@abstractmethod
def message_type(self, message: MessageType) -> str:
    """return the message type"""

dispatch

dispatch(message)

dispatch the message

Source code in fluid/utils/dispatcher.py
def dispatch(self, message: MessageType) -> int:
    """dispatch the message"""
    handlers = self.get_handlers(message)
    if handlers:
        for handler in handlers.values():
            handler(message)
    return len(handlers or ())

fluid.utils.dispatcher.AsyncDispatcher

AsyncDispatcher()

Bases: BaseDispatcher[MessageType, Callable[[MessageType], Awaitable[None]]]

Dispatcher for async handlers

Source code in fluid/utils/dispatcher.py
def __init__(self) -> None:
    self._msg_handlers: defaultdict[str, dict[str, MessageHandlerType]] = (
        defaultdict(
            dict,
        )
    )

register_handler

register_handler(event, handler)
Source code in fluid/utils/dispatcher.py
def register_handler(
    self,
    event: Event | str,
    handler: MessageHandlerType,
) -> MessageHandlerType | None:
    event = Event.from_string_or_event(event)
    previous = self._msg_handlers[event.type].get(event.tag)
    self._msg_handlers[event.type][event.tag] = handler
    return previous

unregister_handler

unregister_handler(event)
Source code in fluid/utils/dispatcher.py
def unregister_handler(self, event: Event | str) -> MessageHandlerType | None:
    event = Event.from_string_or_event(event)
    return self._msg_handlers[event.type].pop(event.tag, None)

get_handlers

get_handlers(message)
Source code in fluid/utils/dispatcher.py
def get_handlers(
    self,
    message: MessageType,
) -> dict[str, MessageHandlerType] | None:
    message_type = str(self.message_type(message))
    return self._msg_handlers.get(message_type)

message_type abstractmethod

message_type(message)

return the message type

Source code in fluid/utils/dispatcher.py
@abstractmethod
def message_type(self, message: MessageType) -> str:
    """return the message type"""

dispatch async

dispatch(message)

Dispatch the message and wait for all handlers to complete

Source code in fluid/utils/dispatcher.py
async def dispatch(self, message: MessageType) -> int:
    """Dispatch the message and wait for all handlers to complete"""
    handlers = self.get_handlers(message)
    if handlers:
        await asyncio.gather(*[handler(message) for handler in handlers.values()])
    return len(handlers or ())