Message Bus

class MessageBus(initial_message: Command | Event | ReturnMessage, *, log_level: Literal['debug', 'info', 'disabled'] = 'info', error_log_level: Literal['info', 'warning', 'error', 'disabled'] = 'error')[source]

This is a basic implementation of Message Bus. Create a subclass of this if you need to change logging and/or exception handling.

Usage:

results = MessageBus(my_message)()

If you’re doing CQRS and/or don’t use ReturnMessage objects.

MessageBus(my_message)()

Alternatively, you can use handle_message(). It has the same arguments and return values. (It won’t work if you have subclassed MessageBus).

handle_exception(exc: Exception, message: Command | Event | ReturnMessage) None[source]

Override this in a subclass to customize exception handling.

A note from the book to keep in mind in your code:

Note

“Since we don’t know who’s handling an event, senders should not care whether the receivers succeeded or failed.”

By default, exceptions raised will be logged and raised.

log_activity(*args)[source]

Handles logging of actions to a standard Python logger. Override in a subclass for custom handling.

log_error(msg: str, exc: Exception | None = None, **kwargs)[source]

Logs an error including display of exc. Any remaining keyword arguments are passed to the standard Python logger. Override in a subclass for custom handling.

message_dispatcher(message: Command | Event | ReturnMessage) None[source]

Override this in a subclass to add retry logic around the call to dispatch_message. Otherwise, any exceptions that occur are raised immediately.

Recommended: tenacity

Top-level package for MessageBus.

handle_message(message: Command | Event | ReturnMessage, *, log_level: Literal['debug', 'info', 'disabled'] = 'disabled', error_log_level: Literal['warning', 'error', 'disabled'] = 'disabled')[source]

Convenience function to process a message using MessageBus.

Usage:

return_messages = handle_message(my_message)

If you’re doing CQRS and/or don’t use ReturnMessage objects.

handle_message(my_message)