Microrabbit
MicroRabbit: A lightweight, asynchronous Python framework for RabbitMQ that simplifies building microservices and distributed systems. Features include easy message routing, plugin support, and intuitive client configuration.
Install / Use
/learn @TonnoBelloSnello/MicrorabbitREADME
MicroRabbit
MicroRabbit is a lightweight, asynchronous Python framework for working with RabbitMQ. It simplifies the process of setting up RabbitMQ consumers and publishers, making it easy to build microservices and distributed systems.
Features
- Asynchronous message handling using
asyncio - Simple decorator-based message routing
- Plugin system for modular code organization
- Easy-to-use client configuration
- Built-in logging support
- Customizable type annotations for message data
Installation
pip install microrabbit
Quick Start
Here's a simple example of how to use MicroRabbit:
import asyncio
import logging
from pydantic import BaseModel
from microrabbit import Client
from microrabbit.types import QueueOptions, ConsumerOptions, ConnectionOptions
class Message(BaseModel):
test: str
client = Client(
host="amqp://guest:guest@localhost/",
plugins="./plugins",
connection_type="ROBUST",
connection_options=ConnectionOptions()
)
log = logging.getLogger(__file__)
logging.basicConfig(level=logging.INFO)
@Client.on_message("queue_name")
async def test(data: dict) -> dict:
log.info(f"Received message {data}")
return {"connected": True}
@Client.on_message("queue_name2", queue_options=QueueOptions(exclusive=True), consume_options=ConsumerOptions(no_ack=True))
async def test2(data: int|Message) -> dict:
log.info(f"Received message {data.test}")
return {"connected": True}
@client.on_ready
async def on_ready():
log.info("[*] Waiting for messages. To exit press CTRL+C")
result = await client.simple_publish("queue_name2", {"test": "data"}, timeout=2, decode=True)
log.info(result)
if __name__ == "__main__":
asyncio.run(client.run())
Usage
Client Configuration
Create a Client instance with the following parameters:
host: RabbitMQ server URLinstance_id: Unique identifier for the client if not provided it will be generated automatically (optional)plugins: Path to the plugins folder (optional)connection_type: Connection typestr (NORMAL, ROBUST)orCONNECTION_TYPE(optional)connection_options: Connection optionsConnectionOptions(optional)
from microrabbit import Client
from microrabbit.types import CONNECTION_TYPE, ConnectionOptions
client = Client(
host="amqp://guest:guest@localhost/",
instance_id="unique_id",
plugins="./plugins",
connection_type=CONNECTION_TYPE.NORMAL,
connection_options=ConnectionOptions(ssl=True)
)
Message Handling
Use the @Client.on_message decorator to define a message handler. The decorator takes the queue name as an argument.
Arguments:
queue_name: Name of the queueinstance_id: Unique identifier for the client if not provided it will be setted as global, when a client runs the queue will be consumed (optional)queue_options: Queue optionsQueueOptions(optional)consume_options: Consumer optionsConsumerOptions(optional)
from microrabbit import Client
from microrabbit.types import QueueOptions
@Client.on_message("queue_name", queue_options=QueueOptions(exclusive=True))
async def handler(data: dict):
# Process the message
return response_data # Serializeable data
Message Data Types
In the handler function, you can specify the data type of the message using type annotations for parameters and return values.
if the types are not valid a ValueError will be raised:
from microrabbit import Client
from typing import Union
from pydantic import BaseModel
class Message(BaseModel):
test: str
@Client.on_message("queue_name")
async def handler(data: Union[Message, int]) -> Union[Message, int]:
print(data.test)
return data # Could be a Message object or an int, you can return any serializable object or any BaseModel object
Ready Event
Use the @client.on_ready decorator to define a function that runs when the client is ready:
from microrabbit import Client
client = Client(
host="amqp://guest:guest@localhost/",
plugins="./plugins"
)
@client.on_ready
async def on_ready():
print("Client is ready")
Running the Client
Run the client using asyncio.run(client.run()):
import asyncio
from microrabbit import Client
client = Client(
host="amqp://guest:guest@localhost/",
plugins="./plugins"
)
if __name__ == "__main__":
asyncio.run(client.run())
Publishing Messages
Use the simple_publish method to publish a message to a queue:
result = await client.simple_publish("queue_name", {"test": "data"}, timeout=2, decode=True)
Running with context manager
import asyncio
from microrabbit import Client
client = Client(
host="amqp://guest:guest@localhost/",
plugins="./plugins"
)
async def main():
async with client:
await client.run()
if __name__ == "__main__":
asyncio.run(main())
Plugins
MicroRabbit supports a plugin system. Place your plugin files in the specified plugins folder, and they will be automatically loaded by the client.
Plugin Example
# ./plugins/test_plugin.py
from microrabbit import Client
@Client.on_message("test_queue")
async def test_handler(data: dict):
print(f"Received message: {data}")
return {"status": "ok"}
Advanced Usage
Queue Options
Use the QueueOptions class to specify queue options:
from microrabbit.types import QueueOptions
@Client.on_message("queue_name", queue_options=QueueOptions(exclusive=True))
async def handler(data: dict):
# Process the message
return response_data
Consumer Options
Use the ConsumerOptions class to specify consumer options:
from microrabbit.types import ConsumerOptions
@Client.on_message("queue_name", consume_options=ConsumerOptions(no_ack=True))
async def handler(data: dict):
# Process the message
return response_data
Contributing
Contributions are welcome! For feature requests, bug reports, or questions, please open an issue. If you would like to contribute code, please submit a pull request.
License
This project is licensed under the MIT License - see the LICENSE file for details.
