RustMQ
No description available
Install / Use
/learn @Khanh-21522203/RustMQREADME
Rust-MQ
A Kafka-like message queue system written in Rust with both CLI and library support.
Features
- Multiple Modes: Run as broker, producer, or consumer
- Multi-Broker Support: Distributed cluster with Raft consensus
- High Availability: Automatic leader election and failover
- High Performance: 800K+ msg/s throughput, <10ms latency
- Pluggable Raft Transport: gRPC (default) or SBE+TCP (5.4× faster)
- Zero-Allocation Codec: Hand-rolled SBE encoding + lock-free buffer pool on the SBE+TCP path
- YAML Configuration: Easy configuration management
- Reusable Library: Use as a library in your Rust projects
- CLI Support: Run as standalone tools
- Graceful Shutdown: Proper SIGINT/SIGTERM handling
- Batch Processing: Automatic message batching (2M+ msg/s with large batches)
- Auto-commit: Automatic offset management
- Consumer Groups: Offset tracking and load balancing
- SOLID Architecture: Clean, maintainable codebase
Architecture
Rust-MQ supports two deployment modes:
- Single Broker: Simple in-memory storage for development and testing
- Multi-Broker Cluster: Distributed system with Raft consensus for production
See docs/architecture.md for detailed architecture documentation with diagrams.
Quick Start
1. Build the Project
cargo build --release
2. Run Broker
Single Broker Mode:
# Terminal 1
cargo run -- --mode broker
Multi-Broker Cluster (3 nodes):
# Terminal 1 - Broker 1 (Bootstrap)
./target/release/Rust-MQ --mode broker --config config/broker-1.yaml
# Terminal 2 - Broker 2
./target/release/Rust-MQ --mode broker --config config/broker-2.yaml
# Terminal 3 - Broker 3
./target/release/Rust-MQ --mode broker --config config/broker-3.yaml
Or use the convenience script:
./scripts/start-cluster.sh
3. Run Consumer
# Terminal 2
cargo run -- --mode consumer --config config/consumer.yaml
4. Send Messages
# Terminal 3
cargo run -- --mode producer --config config/producer.yaml
# Type messages line by line
CLI Usage
Command-Line Interface
rust-mq --mode <MODE> [--config <CONFIG>] [--broker <BROKER>]
Options:
-m, --mode <MODE> broker, producer, or consumer
-c, --config <CONFIG> Path to YAML configuration file
--broker <BROKER> Override broker address
-h, --help Print help
Examples
# Start broker
cargo run -- --mode broker
# Producer with config
cargo run -- --mode producer --config config/producer.yaml
# Consumer with config
cargo run -- --mode consumer --config config/consumer.yaml
# Override broker address
cargo run -- --mode consumer --config config/consumer.yaml --broker http://192.168.1.100:50051
Library Usage
Producer Example
use rust_mq::client::{Producer, ProducerConfig, ProducerMessage};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = ProducerConfig {
topic: "my-topic".to_string(),
partition: 0,
partitioning: "fixed".to_string(),
num_partitions: 1,
required_acks: 1,
timeout_ms: 5000,
batch_size: 100,
flush_interval_ms: 100,
};
let mut producer = Producer::new("http://localhost:50051", config).await?;
producer.start().await?;
// Send messages
producer.send(ProducerMessage::new(b"Hello, Kafka!")).await?;
producer.shutdown().await?;
Ok(())
}
Consumer Example
use rust_mq::client::{Consumer, ConsumerConfig, ConsumedMessage, MessageHandler};
struct MyHandler;
#[async_trait::async_trait]
impl MessageHandler for MyHandler {
async fn handle(&mut self, message: ConsumedMessage) -> anyhow::Result<()> {
println!("Received: {:?}", message.value_as_string()?);
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = ConsumerConfig {
topic: "my-topic".to_string(),
partitions: vec![0],
group_id: Some("my-group".to_string()),
offset: -2,
max_bytes: 1_048_576,
max_wait_ms: 1000,
min_bytes: 1,
auto_commit: true,
auto_commit_interval_ms: 5000,
poll_interval_ms: 1000,
};
let mut consumer = Consumer::new("http://localhost:50051", config).await?;
consumer.start(MyHandler).await?;
tokio::signal::ctrl_c().await?;
consumer.shutdown().await?;
Ok(())
}
Structured Payload Encoding/Decoding
Rust-MQ stores message values as raw bytes. You can use the built-in codec module to wrap your payload with a simple versioned envelope:
use serde::{Deserialize, Serialize};
use rust_mq::codec::{MessageEnvelope, encode, decode};
use rust_mq::client::ProducerMessage;
#[derive(Debug, Serialize, Deserialize)]
struct OrderCreated {
order_id: String,
amount_cents: u64,
}
// Encode before produce
let envelope = MessageEnvelope::new(
"order.created",
1,
OrderCreated { order_id: "ORD-1001".into(), amount_cents: 1250 },
);
let bytes = encode(&envelope)?; // compact binary (bincode)
producer.send(ProducerMessage::new(bytes)).await?;
// Decode after consume
let decoded: MessageEnvelope<OrderCreated> = decode(&consumed_message.value)?;
println!("event={} order={}", decoded.event_type, decoded.payload.order_id);
Configuration
Example configuration files are provided in the config/ directory:
config/producer.yaml- Producer configurationconfig/consumer.yaml- Consumer configurationconfig/example-full.yaml- Complete configuration example
Configuration Structure
broker:
address: "http://localhost:50051"
timeout_secs: 30
max_retries: 3
producer:
topic: "events"
partition: 0
partitioning: "fixed"
num_partitions: 1
required_acks: 1
timeout_ms: 5000
batch_size: 100
flush_interval_ms: 100
consumer:
topic: "events"
partitions: [0]
group_id: "my-consumer-group"
offset: -2 # -2=earliest, -1=latest, 0+=specific
max_bytes: 1048576
max_wait_ms: 1000
min_bytes: 1
auto_commit: true
auto_commit_interval_ms: 5000
poll_interval_ms: 1000
Examples
Run the Complete Example
# Demonstrates producer-consumer communication
cargo run --example producer_consumer
Output:
=== Rust-MQ Producer-Consumer Example ===
Step 1: Starting broker...
✓ Broker started at 0.0.0.0:50051
Step 2: Creating producer...
✓ Producer started
Step 3: Creating consumer...
✓ Consumer started
Step 4: Sending messages...
[Producer] Sent: Message number 1
[Consumer] Message #1: [demo-topic:0:0] Message number 1
...
Architecture
┌─────────────────────────────────────────┐
│ CLI Layer (main.rs) │
│ Mode selection, argument parsing │
└──────────────────┬──────────────────────┘
│
┌──────────────────▼──────────────────────┐
│ Configuration Layer (config.rs) │
│ YAML parsing, validation │
└──────────────────┬──────────────────────┘
│
┌──────────────────▼──────────────────────┐
│ Client Layer (producer.rs │
│ consumer.rs) │
│ Business logic, batching, polling │
└──────────────────┬──────────────────────┘
│
┌──────────────────▼──────────────────────┐
│ gRPC Client Layer │
│ (kafka_broker_client.rs) │
└──────────────────┬──────────────────────┘
│
┌──────────────────▼──────────────────────┐
│ Broker (Server Side) │
│ gRPC Server → Broker Core → Storage │
└─────────────────────────────────────────┘
Key Design Principles
- Separation of Concerns: CLI, configuration, and business logic are separate
- Dependency Injection: Components accept configuration, not hardcoded values
- Interface Segregation: MessageHandler trait for custom processing
- Open/Closed Principle: Easy to extend with new handlers
- Graceful Shutdown: Proper cleanup and resource management
- Type Safety: Leverages Rust's type system for correctness
Project Structure
Rust-MQ/
├── src/
│ ├── main.rs # CLI entry point
│ ├── lib.rs # Library entry point
│ ├── api/ # Protobuf definitions
│ ├── broker/ # Broker implementation
│ │ ├── core.rs # Broker logic
│ │ ├── storage.rs # Storage layer
│ │ ├── raft_transport.rs # RaftTransport trait
│ │ └── sbe_tcp/ # SBE + TCP transport
│ │ ├── codec.rs # Hand-rolled SBE encoder/decoder
│ │ ├── pool.rs # Lock-free buffer pool
│ │ ├── connection.rs # Persistent TCP connection manager
│ │ ├── server.rs # TCP inbound server
│ │ └── transport.rs # RaftTransport impl
│ └── client/ # Client library
│ ├── config.rs # Configuration
│ ├── producer.rs # Producer implementation
│ ├── consumer.rs # Consumer implementation
│ └── kafka_broker_client.rs # gRPC client
├── config/
│ ├── docker/ # gRPC cluster configs
│ └── docker/sbe-tcp/ # SBE+TCP cluster configs
├── docker-compose.yml # gRPC cluster
├── docker-compose-sbe-tcp.yml # SBE+TCP cluster
├── examples/ # Usage examples
└── docs/ # Documentation
Raft Transport Configuration
Add transport to your broker YAML to switch inter-broker communication:
# gRPC (default — works out of the box, no extra config needed)
transport: "grpc"
# SBE + TCP (recommended for production — 5.4× faster than gRPC)
transport: "sbe_tcp"
Testing
# Run all tests
cargo test
# Run with logging
RUST_LOG=debug c
