Bases: BaseDispatcher[MessageType, Callable[[MessageType], Awaitable[None]]]
Dispatcher for async handlers
Source code in fluid/utils/dispatcher.py
| def __init__(self) -> None:
self._msg_handlers: defaultdict[str, dict[str, MessageHandlerType]] = (
defaultdict(
dict,
)
)
|
register_handler
register_handler(event, handler)
Source code in fluid/utils/dispatcher.py
| def register_handler(
self,
event: Event | str,
handler: MessageHandlerType,
) -> MessageHandlerType | None:
event = Event.from_string_or_event(event)
previous = self._msg_handlers[event.type].get(event.tag)
self._msg_handlers[event.type][event.tag] = handler
return previous
|
unregister_handler
unregister_handler(event)
Source code in fluid/utils/dispatcher.py
| def unregister_handler(self, event: Event | str) -> MessageHandlerType | None:
event = Event.from_string_or_event(event)
return self._msg_handlers[event.type].pop(event.tag, None)
|
get_handlers
Source code in fluid/utils/dispatcher.py
| def get_handlers(
self,
message: MessageType,
) -> dict[str, MessageHandlerType] | None:
message_type = str(self.message_type(message))
return self._msg_handlers.get(message_type)
|
message_type
abstractmethod
return the message type
Source code in fluid/utils/dispatcher.py
| @abstractmethod
def message_type(self, message: MessageType) -> str:
"""return the message type"""
|
dispatch
async
Dispatch the message and wait for all handlers to complete
Source code in fluid/utils/dispatcher.py
| async def dispatch(self, message: MessageType) -> int:
"""Dispatch the message and wait for all handlers to complete"""
handlers = self.get_handlers(message)
if handlers:
await asyncio.gather(*[handler(message) for handler in handlers.values()])
return len(handlers or ())
|