SkillAgentSearch skills...

Bubus

πŸ“’ Production-ready python event bus library with support for async and sync handlers, forwarding betwen busses w/ parent event tracking + loop prevention, FIFO and concurrency options, and WAL persistence. Powers the browser-use library.

Install / Use

/learn @browser-use/Bubus

README

bubus: πŸ“’ Production-ready event bus library for Python

Bubus is a fully-featured, Pydantic-powered event bus library for async Python.

It's designed for quickly building event-driven applications with Python in a way that "just works" with async support, proper support for nested events, and real concurrency control.

It provides a pydantic-based API for implementing publish-subscribe patterns with type safety, async/sync handler support, and advanced features like event forwarding between buses.

♾️ It's inspired by the simplicity of async and events in JS, we aim to bring a fully type-checked EventTarget-style API to Python.

<br/>

πŸ”’ Quickstart

Install bubus and get started with a simple event-driven application:

pip install bubus
import asyncio
from bubus import EventBus, BaseEvent
from your_auth_events import AuthRequestEvent, AuthResponseEvent

class UserLoginEvent(BaseEvent[str]):
    username: str
    is_admin: bool

async def handle_login(event: UserLoginEvent) -> str:
    auth_request = await event.event_bus.dispatch(AuthRequestEvent(...))  # nested events supported
    auth_response = await event.event_bus.expect(AuthResponseEvent, timeout=30.0)
    return f"User {event.username} logged in admin={event.is_admin} with API response: {await auth_response.event_result()}"

bus = EventBus()
bus.on(UserLoginEvent, handle_login)
bus.on(AuthRequestEvent, AuthAPI.post)

event = bus.dispatch(UserLoginEvent(username="alice", is_admin=True))
print(await event.event_result())
# User alice logged in admin=True with API response: {...}
<br/>
<br/>

✨ Features

<br/>

πŸ”Ž Event Pattern Matching

Subscribe to events using multiple patterns:

# By event model class (recommended for best type hinting)
bus.on(UserActionEvent, handler)

# By event type string
bus.on('UserActionEvent', handler)

# Wildcard - handle all events
bus.on('*', universal_handler)
<br/>

πŸ”€ Async and Sync Handler Support

Register both synchronous and asynchronous handlers for maximum flexibility:

# Async handler
async def async_handler(event: SomeEvent) -> str:
    await asyncio.sleep(0.1)  # Simulate async work
    return "async result"

# Sync handler
def sync_handler(event: SomeEvent) -> str:
    return "sync result"

bus.on(SomeEvent, async_handler)
bus.on(SomeEvent, sync_handler)

Handlers can also be defined under classes for easier organization:

class SomeService:
    some_value = 'this works'

    async def handlers_can_be_methods(self, event: SomeEvent) -> str:
        return self.some_value
    
    @classmethod
    async def handler_can_be_classmethods(cls, event: SomeEvent) -> str:
        return cls.some_value

    @staticmethod
    async def handlers_can_be_staticmethods(event: SomeEvent) -> str:
        return 'this works too'

# All usage patterns behave the same:
bus.on(SomeEvent, SomeClass().handlers_can_be_methods)
bus.on(SomeEvent, SomeClass.handler_can_be_classmethods)
bus.on(SomeEvent, SomeClass.handlers_can_be_staticmethods)
<br/>

πŸ”  Type-Safe Events with Pydantic

Define events as Pydantic models with full type checking and validation:

from typing import Any
from bubus import BaseEvent

class OrderCreatedEvent(BaseEvent):
    order_id: str
    customer_id: str
    total_amount: float
    items: list[dict[str, Any]]

# Events are automatically validated
event = OrderCreatedEvent(
    order_id="ORD-123",
    customer_id="CUST-456", 
    total_amount=99.99,
    items=[{"sku": "ITEM-1", "quantity": 2}]
)

[!TIP] You can also enforce the types of event handler return values.

<br/>

⏩ Forward Events Between EventBuss

You can define separate EventBus instances in different "microservices" to separate different areas of concern. EventBuss can be set up to forward events between each other (with automatic loop prevention):

# Create a hierarchy of buses
main_bus = EventBus(name='MainBus')
auth_bus = EventBus(name='AuthBus')
data_bus = EventBus(name='DataBus')

# Share all or specific events between buses
main_bus.on('*', auth_bus.dispatch)  # if main bus gets LoginEvent, will forward to AuthBus
auth_bus.on('*', data_bus.dispatch)  # auth bus will forward everything to DataBus
data_bus.on('*', main_bus.dispatch)  # don't worry! event will only be processed once by each, no infinite loop occurs

# Events flow through the hierarchy with tracking
event = main_bus.dispatch(LoginEvent())
await event
print(event.event_path)  # ['MainBus', 'AuthBus', 'DataBus']  # list of buses that have already procssed the event
<br/>

πŸ”± Event Results Aggregation

Collect and aggregate results from multiple handlers:

async def load_user_config(event: GetConfigEvent) -> dict[str, Any]:
    return {"debug": True, "port": 8080}

async def load_system_config(event: GetConfigEvent) -> dict[str, Any]:
    return {"debug": False, "timeout": 30}

bus.on(GetConfigEvent, load_user_config)
bus.on(GetConfigEvent, load_system_config)

# Get a merger of all dict results
event = await bus.dispatch(GetConfigEvent())
config = await event.event_results_flat_dict(raise_if_conflicts=False)
# {'debug': False, 'port': 8080, 'timeout': 30}

# Or get individual results
await event.event_results_by_handler_id()
await event.event_results_list()
<br/>

🚦 FIFO Event Processing

Events are processed in strict FIFO order, maintaining consistency:

# Events are processed in the order they were dispatched
for i in range(10):
    bus.dispatch(ProcessTaskEvent(task_id=i))

# Even with async handlers, order is preserved
await bus.wait_until_idle(timeout=30.0)

If a handler dispatches and awaits any child events during execution, those events will jump the FIFO queue and be processed immediately:

def child_handler(event: SomeOtherEvent) -> str:
    return 'xzy123'

def main_handler(event: MainEvent) -> str:
    # enqueue event for processing after main_handler exits
    child_event = bus.dispatch(SomeOtherEvent())
    
    # can also await child events to process immediately instead of adding to FIFO queue
    completed_child_event = await child_event
    return f'result from awaiting child event: {await completed_child_event.event_result()}'  # 'xyz123'

bus.on(SomeOtherEvent, child_handler)
bus.on(MainEvent, main_handler)

await bus.dispatch(MainEvent()).event_result()
# result from awaiting child event: xyz123
<br/>

πŸͺ† Dispatch Nested Child Events From Handlers

Automatically track event relationships and causality tree:

async def parent_handler(event: BaseEvent):
    # handlers can emit more events to be processed asynchronously after this handler completes
    child = ChildEvent()
    child_event_async = event.event_bus.dispatch(child)   # equivalent to bus.dispatch(...)
    assert child.event_status != 'completed'
    assert child_event_async.event_parent_id == event.event_id
    await child_event_async

    # or you can dispatch an event and block until it finishes processing by awaiting the event
    # this recursively waits for all handlers, including if event is forwarded to other buses
    # (note: awaiting an event from inside a handler jumps the FIFO queue and will process it immediately, before any other pending events)
    child_event_sync = await bus.dispatch(ChildEvent())
    # ChildEvent handlers run immediately
    assert child_event_sync.event_status == 'completed'

    # in all cases, parent-child relationships are automagically tracked
    assert child_event_sync.event_parent_id == event.event_id

async def run_main():
    bus.on(ChildEvent, child_handler)
    bus.on(ParentEvent, parent_handler)

    parent_event = bus.dispatch(ParentEvent())
    print(parent_event.event_children)           # show all the child events emitted during handling of an event
    await parent_event
    print(bus.log_tree())
    await bus.stop()

if __name__ == '__main__':
    asyncio.run(run_main())

<img width="100%" alt="show the whole tree of events at any time using the logging helpers" src="https://github.com/user-attachments/assets/f94684a6-7694-4066-b948-46925f47b56c" /><br/> <img width="100%" alt="intelligent timeout handling to differentiate handler that timed out from handler that was interrupted" src="https://github.com/user-attachments/assets/8da341fd-6c26-4c68-8fec-aef1ca55c189" />

<br/><br/>

⏳ Expect an Event to be Dispatched

Wait for specific events to be seen on a bus with optional filtering:

# Block until a specific event is seen (with optional timeout)
request_event = await bus.dispatch(RequestEvent(id=123, table='invoices', request_id=999234))
response_event = await bus.expect(ResponseEvent, timeout=30)

A more complex real-world example showing off all the features:

async def on_generate_invoice_pdf(event: GenerateInvoiceEvent) -> pdf:
    request_event = await bus.dispatch(APIRequestEvent(  # example: fire a backend request via some RPC client using bubus
        method='invoices.generatePdf',
        invoice_id=event.invoice_id,
        request_id=uuid4(),
    ))
    # ...rpc client should send the request, then call event_bus.dispatch(APIResponseEvent(...)) when it gets a response ...

    # wait for the response event to be fired by the RPC client
    is_our_response = lambda response_event: response_event.request_id == request_event.request_id
    is_succesful = lambda response_event: response_event.invoice_id == event.invoice_id and response_event.invoice_url
    try:
        response_event: APIResponseEvent = await bus.expect(
            APIResponseEvent,                                         # wait for events of this type (also accepts str name)
            include=lambda e: is_our_response(e) and is_succesful(e), # only include events that match a certain f
View on GitHub
GitHub Stars99
CategoryCustomer
Updated2d ago
Forks16

Languages

Python

Security Score

100/100

Audited on Apr 5, 2026

No findings