Bubus
π’ Production-ready python event bus library with support for async and sync handlers, forwarding betwen busses w/ parent event tracking + loop prevention, FIFO and concurrency options, and WAL persistence. Powers the browser-use library.
Install / Use
/learn @browser-use/BubusREADME
bubus: π’ Production-ready event bus library for Python
Bubus is a fully-featured, Pydantic-powered event bus library for async Python.
It's designed for quickly building event-driven applications with Python in a way that "just works" with async support, proper support for nested events, and real concurrency control.
It provides a pydantic-based API for implementing publish-subscribe patterns with type safety, async/sync handler support, and advanced features like event forwarding between buses.
βΎοΈ It's inspired by the simplicity of async and events in JS, we aim to bring a fully type-checked EventTarget-style API to Python.
π’ Quickstart
Install bubus and get started with a simple event-driven application:
pip install bubus
import asyncio
from bubus import EventBus, BaseEvent
from your_auth_events import AuthRequestEvent, AuthResponseEvent
class UserLoginEvent(BaseEvent[str]):
username: str
is_admin: bool
async def handle_login(event: UserLoginEvent) -> str:
auth_request = await event.event_bus.dispatch(AuthRequestEvent(...)) # nested events supported
auth_response = await event.event_bus.expect(AuthResponseEvent, timeout=30.0)
return f"User {event.username} logged in admin={event.is_admin} with API response: {await auth_response.event_result()}"
bus = EventBus()
bus.on(UserLoginEvent, handle_login)
bus.on(AuthRequestEvent, AuthAPI.post)
event = bus.dispatch(UserLoginEvent(username="alice", is_admin=True))
print(await event.event_result())
# User alice logged in admin=True with API response: {...}
<br/>
<br/>
β¨ Features
<br/>π Event Pattern Matching
Subscribe to events using multiple patterns:
# By event model class (recommended for best type hinting)
bus.on(UserActionEvent, handler)
# By event type string
bus.on('UserActionEvent', handler)
# Wildcard - handle all events
bus.on('*', universal_handler)
<br/>
π Async and Sync Handler Support
Register both synchronous and asynchronous handlers for maximum flexibility:
# Async handler
async def async_handler(event: SomeEvent) -> str:
await asyncio.sleep(0.1) # Simulate async work
return "async result"
# Sync handler
def sync_handler(event: SomeEvent) -> str:
return "sync result"
bus.on(SomeEvent, async_handler)
bus.on(SomeEvent, sync_handler)
Handlers can also be defined under classes for easier organization:
class SomeService:
some_value = 'this works'
async def handlers_can_be_methods(self, event: SomeEvent) -> str:
return self.some_value
@classmethod
async def handler_can_be_classmethods(cls, event: SomeEvent) -> str:
return cls.some_value
@staticmethod
async def handlers_can_be_staticmethods(event: SomeEvent) -> str:
return 'this works too'
# All usage patterns behave the same:
bus.on(SomeEvent, SomeClass().handlers_can_be_methods)
bus.on(SomeEvent, SomeClass.handler_can_be_classmethods)
bus.on(SomeEvent, SomeClass.handlers_can_be_staticmethods)
<br/>
π Type-Safe Events with Pydantic
Define events as Pydantic models with full type checking and validation:
from typing import Any
from bubus import BaseEvent
class OrderCreatedEvent(BaseEvent):
order_id: str
customer_id: str
total_amount: float
items: list[dict[str, Any]]
# Events are automatically validated
event = OrderCreatedEvent(
order_id="ORD-123",
customer_id="CUST-456",
total_amount=99.99,
items=[{"sku": "ITEM-1", "quantity": 2}]
)
<br/>[!TIP] You can also enforce the types of event handler return values.
β© Forward Events Between EventBuss
You can define separate EventBus instances in different "microservices" to separate different areas of concern.
EventBuss can be set up to forward events between each other (with automatic loop prevention):
# Create a hierarchy of buses
main_bus = EventBus(name='MainBus')
auth_bus = EventBus(name='AuthBus')
data_bus = EventBus(name='DataBus')
# Share all or specific events between buses
main_bus.on('*', auth_bus.dispatch) # if main bus gets LoginEvent, will forward to AuthBus
auth_bus.on('*', data_bus.dispatch) # auth bus will forward everything to DataBus
data_bus.on('*', main_bus.dispatch) # don't worry! event will only be processed once by each, no infinite loop occurs
# Events flow through the hierarchy with tracking
event = main_bus.dispatch(LoginEvent())
await event
print(event.event_path) # ['MainBus', 'AuthBus', 'DataBus'] # list of buses that have already procssed the event
<br/>
π± Event Results Aggregation
Collect and aggregate results from multiple handlers:
async def load_user_config(event: GetConfigEvent) -> dict[str, Any]:
return {"debug": True, "port": 8080}
async def load_system_config(event: GetConfigEvent) -> dict[str, Any]:
return {"debug": False, "timeout": 30}
bus.on(GetConfigEvent, load_user_config)
bus.on(GetConfigEvent, load_system_config)
# Get a merger of all dict results
event = await bus.dispatch(GetConfigEvent())
config = await event.event_results_flat_dict(raise_if_conflicts=False)
# {'debug': False, 'port': 8080, 'timeout': 30}
# Or get individual results
await event.event_results_by_handler_id()
await event.event_results_list()
<br/>
π¦ FIFO Event Processing
Events are processed in strict FIFO order, maintaining consistency:
# Events are processed in the order they were dispatched
for i in range(10):
bus.dispatch(ProcessTaskEvent(task_id=i))
# Even with async handlers, order is preserved
await bus.wait_until_idle(timeout=30.0)
If a handler dispatches and awaits any child events during execution, those events will jump the FIFO queue and be processed immediately:
def child_handler(event: SomeOtherEvent) -> str:
return 'xzy123'
def main_handler(event: MainEvent) -> str:
# enqueue event for processing after main_handler exits
child_event = bus.dispatch(SomeOtherEvent())
# can also await child events to process immediately instead of adding to FIFO queue
completed_child_event = await child_event
return f'result from awaiting child event: {await completed_child_event.event_result()}' # 'xyz123'
bus.on(SomeOtherEvent, child_handler)
bus.on(MainEvent, main_handler)
await bus.dispatch(MainEvent()).event_result()
# result from awaiting child event: xyz123
<br/>
πͺ Dispatch Nested Child Events From Handlers
Automatically track event relationships and causality tree:
async def parent_handler(event: BaseEvent):
# handlers can emit more events to be processed asynchronously after this handler completes
child = ChildEvent()
child_event_async = event.event_bus.dispatch(child) # equivalent to bus.dispatch(...)
assert child.event_status != 'completed'
assert child_event_async.event_parent_id == event.event_id
await child_event_async
# or you can dispatch an event and block until it finishes processing by awaiting the event
# this recursively waits for all handlers, including if event is forwarded to other buses
# (note: awaiting an event from inside a handler jumps the FIFO queue and will process it immediately, before any other pending events)
child_event_sync = await bus.dispatch(ChildEvent())
# ChildEvent handlers run immediately
assert child_event_sync.event_status == 'completed'
# in all cases, parent-child relationships are automagically tracked
assert child_event_sync.event_parent_id == event.event_id
async def run_main():
bus.on(ChildEvent, child_handler)
bus.on(ParentEvent, parent_handler)
parent_event = bus.dispatch(ParentEvent())
print(parent_event.event_children) # show all the child events emitted during handling of an event
await parent_event
print(bus.log_tree())
await bus.stop()
if __name__ == '__main__':
asyncio.run(run_main())
<img width="100%" alt="show the whole tree of events at any time using the logging helpers" src="https://github.com/user-attachments/assets/f94684a6-7694-4066-b948-46925f47b56c" /><br/> <img width="100%" alt="intelligent timeout handling to differentiate handler that timed out from handler that was interrupted" src="https://github.com/user-attachments/assets/8da341fd-6c26-4c68-8fec-aef1ca55c189" />
<br/><br/>
β³ Expect an Event to be Dispatched
Wait for specific events to be seen on a bus with optional filtering:
# Block until a specific event is seen (with optional timeout)
request_event = await bus.dispatch(RequestEvent(id=123, table='invoices', request_id=999234))
response_event = await bus.expect(ResponseEvent, timeout=30)
A more complex real-world example showing off all the features:
async def on_generate_invoice_pdf(event: GenerateInvoiceEvent) -> pdf:
request_event = await bus.dispatch(APIRequestEvent( # example: fire a backend request via some RPC client using bubus
method='invoices.generatePdf',
invoice_id=event.invoice_id,
request_id=uuid4(),
))
# ...rpc client should send the request, then call event_bus.dispatch(APIResponseEvent(...)) when it gets a response ...
# wait for the response event to be fired by the RPC client
is_our_response = lambda response_event: response_event.request_id == request_event.request_id
is_succesful = lambda response_event: response_event.invoice_id == event.invoice_id and response_event.invoice_url
try:
response_event: APIResponseEvent = await bus.expect(
APIResponseEvent, # wait for events of this type (also accepts str name)
include=lambda e: is_our_response(e) and is_succesful(e), # only include events that match a certain f
