Source code for bus_ride.messagebus

import logging
from collections import defaultdict
from collections.abc import Callable, Sequence
from typing import Any, Literal, TypeVar, Union, final

from attrs import define, field
from attrs.validators import in_

from .command import Command
from .event import Event
from .exceptions import MessageBusConfigurationError, UnknownMessageTypeError
from .handler import Handler, HandlerReturnType, HandlerType
from .receiver import Receiver, ReceiverReturnType
from .returnmessage import ReturnMessage

logger = logging.getLogger(__name__)

Message = Union[Command, Event, ReturnMessage]
MessageType = TypeVar("MessageType", bound=Union[Command, Event, ReturnMessage])


LOG_LEVELS = ["debug", "info", "warning", "error", "disabled"]


[docs] @define class MessageBus: """ This is a basic implementation of Message Bus. Create a subclass of this if you need to change logging and/or exception handling. Usage: .. code-block:: python results = MessageBus(my_message)() If you're doing CQRS and/or don't use :py:class:`~bus_ride.returnmessage.ReturnMessage` objects. .. code-block:: python MessageBus(my_message)() Alternatively, you can use :py:func:`~bus_ride.handle_message`. It has the same arguments and return values. (It won't work if you have subclassed ``MessageBus``). """ initial_message: Message = field() log_level: Literal["debug", "info", "disabled"] = field( default="info", kw_only=True, validator=in_(("debug", "info", "disabled")), metadata={"description": "The log level for non-error activity."}, ) error_log_level: Literal["info", "warning", "error", "disabled"] = field( default="error", kw_only=True, validator=in_(("info", "warning", "error", "disabled")), metadata={"description": "The log level for errors."}, ) return_messages: list[Any] = field(init=False) receiver_map: dict = field(init=False) handler_map: dict = field(init=False) _logger: Callable = field(init=False) _error_logger: Callable = field(init=False) def __attrs_post_init__(self): self.return_messages = [] self.receiver_map = {} self.handler_map = defaultdict(list) self._logger = ( getattr(logger, self.log_level) if self.log_level != "disabled" else None ) self._error_logger = ( getattr(logger, self.error_log_level) if self.error_log_level != "disabled" else None ) @final def __call__(self) -> list[ReturnMessage]: self._process_message(self.initial_message) return self.return_messages
[docs] def log_activity(self, *args): """ Handles logging of actions to a standard Python logger. Override in a subclass for custom handling. """ if self._logger: self._logger(*args)
[docs] def log_error(self, msg: str, exc: Exception | None = None, **kwargs): """ Logs an error including display of ``exc``. Any remaining keyword arguments are passed to the standard Python logger. Override in a subclass for custom handling. """ if self._error_logger is not None: self._error_logger(msg, exc_info=exc, **kwargs)
@final def _process_message(self, message: Message) -> Sequence[Message]: try: self.message_dispatcher(message) except Exception as exc: # noqa: BLE001 self.handle_exception(exc, message) return []
[docs] def message_dispatcher(self, message: Message) -> None: """ Override this in a subclass to add retry logic around the call to ``dispatch_message``. Otherwise, any exceptions that occur are raised immediately. Recommended: `tenacity`_ .. _tenacity: https://tenacity.readthedocs.io/en/latest/index.html """ self.dispatch_message(message)
@final def dispatch_message(self, message: Message) -> None: match message: case Command(): new_receiver_messages: ReceiverReturnType = self._execute_command( message ) for new_message in new_receiver_messages: self._process_message(new_message) case Event(): event: Event = message new_event_messages: HandlerReturnType = self._handle_event(event) for new_message in new_event_messages: self._process_message(new_message) case ReturnMessage(): if message not in self.return_messages: self.return_messages.append(message) case _: raise UnknownMessageTypeError(type(message))
[docs] def handle_exception(self, exc: Exception, message: Message) -> None: """ Override this in a subclass to customize exception handling. A note from the book to keep in mind in your code: .. note:: “Since we don’t know who’s handling an event, senders should not care whether the receivers succeeded or failed.” By default, exceptions raised will be logged and raised. """ if self._error_logger is not None: self.log_error( f"An error occurred while processing message: {message}", exc, ) raise exc
@final def _execute_command(self, command: Command) -> list[Message]: self.log_activity(f"Executing command: {command!s}") receiver = self._get_receiver(command) note = f"No receiver found for command: {command!s}" if receiver is None: self.log_error(note) raise MessageBusConfigurationError(note) return receiver.execute_command() @final def _get_receiver(self, command: Command) -> Receiver | None: receiver_cls: type[Receiver] | None = self._get_receiver_cls(command) if receiver_cls is None: return None return receiver_cls(command=command) @final def _get_receiver_cls(self, command: Command) -> type[Receiver] | None: if command.__class__ in self.receiver_map: return self.receiver_map[command.__class__] receiver_cls: type[Receiver] | None for receiver_cls in Receiver.__subclasses__(): if receiver_cls.handles_command(command): self.receiver_map[command.__class__] = receiver_cls return receiver_cls msg = f"No receiver found for command: {command!s}" raise MessageBusConfigurationError(msg) @final def _handle_event(self, event: Event) -> HandlerReturnType: self.log_activity(f"Handling event: {event!s}") handler_classes: list[type[Handler]] = self.get_event_handlers(event) if not handler_classes: msg = f"No handler found for event: {event!s}" raise MessageBusConfigurationError(msg) new_messages: HandlerReturnType = [] handler_cls: HandlerType for handler_cls in handler_classes: handler = handler_cls(event) new_messages.extend(handler.handle_event()) return new_messages @final def get_event_handlers(self, event: Event) -> list[type[Handler]]: if event.__class__ in self.handler_map: return self.handler_map[event.__class__] for handler_subclass in Handler.__subclasses__(): handler_cls: HandlerType = handler_subclass self.register_matching_handler(event, handler_cls) return self.handler_map[event.__class__] def register_matching_handler(self, event: Event, handler_cls: HandlerType): """ Register a handler class for the given event class. """ if ( type(event) in handler_cls.handles_events and handler_cls not in self.handler_map[event.__class__] ): self.handler_map[event.__class__].append(handler_cls)
[docs] def handle_message( message: Message, *, log_level: Literal["debug", "info", "disabled"] = "disabled", error_log_level: Literal["warning", "error", "disabled"] = "disabled", ): """ Convenience function to process a message using :py:class:`~bus_ride.bus_ride.MessageBus`. Usage: .. code-block:: python return_messages = handle_message(my_message) If you're doing CQRS and/or don't use :py:class:`~bus_ride.returnmessage.ReturnMessage` objects. .. code-block:: python handle_message(my_message) """ return MessageBus( message, log_level=log_level, error_log_level=error_log_level, )()