Rigatoni
A high-performance, type-safe CDC/Data replication framework for Rust, focused on real-time data pipelines.
Install / Use
/learn @valeriouberti/RigatoniREADME
Rigatoni
<p align="center"> <img src="assets/logo.webp" alt="Rigatoni Logo" width="180" /> </p>A high-performance, type-safe CDC/Data Replication framework for Rust, focused on real-time data pipelines.
🎯 Overview
Rigatoni is a modern CDC (Change Data Capture) and data replication framework built for speed, reliability, and developer experience. Built with Rust's type system and async/await, it provides production-ready data pipelines for real-time streaming workloads from databases to data lakes and other destinations.
Currently supporting:
- MongoDB Change Streams - Real-time CDC (Change Data Capture) from MongoDB
- S3 Destination - Export to AWS S3 with multiple formats (JSON, CSV, Parquet, Avro)
- Redis State Store - Distributed state management for multi-instance deployments
- Pipeline Orchestration - Multi-worker architecture with retry logic and state management
- Metrics & Observability - Prometheus metrics with Grafana dashboards
- Async-first design - Powered by Tokio for high throughput
- Type-safe transformations - Compile-time guarantees with Rust's type system
- Modular architecture - Extensible with feature flags
⚡ Performance
Rigatoni delivers exceptional performance for high-throughput CDC workloads:
- ~780ns per event - Core processing with linear scaling
- ~1.2μs per event - JSON serialization
- 7.65ms for 1000 events - S3 writes with ZSTD compression
- 10K-100K events/sec - Production-ready throughput
- Sub-millisecond - State store operations
See our detailed benchmarks for comprehensive performance analysis.
✨ Features
- 🚀 High Performance: Async/await architecture with Tokio for concurrent processing
- 🔒 Type Safety: Leverage Rust's type system for data transformation guarantees
- 📊 MongoDB CDC: Real-time change stream listening with resume token support
- 📦 S3 Integration: Multiple formats (JSON, CSV, Parquet, Avro) with compression (gzip, zstd)
- 🗄️ Distributed State: Redis-backed state store for multi-instance deployments
- 🔐 Distributed Locking: Redis-based locking for horizontal scaling without duplicates
- 🔄 Retry Logic: Exponential backoff with configurable limits
- 🎯 Batching: Automatic batching based on size and time windows
- 🎨 Composable Pipelines: Build data replication workflows from simple, testable components
- 📊 Metrics: Prometheus metrics for throughput, latency, errors, and health
- 📝 Observability: Comprehensive tracing, metrics, and Grafana dashboards
- 🧪 Testable: Mock destinations and comprehensive test utilities
🏗️ Architecture
Rigatoni is organized as a workspace with three main crates:
rigatoni/
├── rigatoni-core/ # Core traits and pipeline orchestration
├── rigatoni-destinations/ # Destination implementations
└── rigatoni-stores/ # State store implementations
Core Concepts
- Source: Extract data from systems (MongoDB change streams)
- Destination: Load data into target systems (S3 with multiple formats)
- Store: Manage pipeline state for reliability (in-memory, Redis)
- Pipeline: Orchestrate the entire data replication workflow with error handling
🚀 Quick Start
Prerequisites
- Rust 1.88 or later
- AWS credentials configured for S3 access
Installation
Add Rigatoni to your Cargo.toml:
[dependencies]
rigatoni-core = "0.2.0"
rigatoni-destinations = { version = "0.2.0", features = ["s3"] }
rigatoni-stores = { version = "0.2.0", features = ["memory"] }
Basic Example: MongoDB to S3 Pipeline
use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
use rigatoni_destinations::s3::{S3Config, S3Destination};
use rigatoni_stores::memory::MemoryStore;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure state store (in-memory for simplicity)
let store = MemoryStore::new();
// Configure S3 destination
let s3_config = S3Config::builder()
.bucket("my-data-lake")
.region("us-east-1")
.prefix("mongodb-cdc")
.build()?;
let destination = S3Destination::new(s3_config).await?;
// Configure pipeline - watch entire database
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.watch_database() // Watch all collections in the database
.batch_size(1000)
.build()?;
// Create and run pipeline
let mut pipeline = Pipeline::new(config, store, destination).await?;
pipeline.start().await?;
Ok(())
}
Watch Levels
Rigatoni supports three levels of change stream watching:
// Watch specific collections only
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.watch_collections(vec!["users".to_string(), "orders".to_string()])
.build()?;
// Watch all collections in a database (recommended)
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.watch_database() // Automatically picks up new collections
.build()?;
// Watch all databases in the deployment (cluster-wide)
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.watch_deployment() // Requires MongoDB 4.0+ and cluster-wide permissions
.build()?;
Distributed State with Redis
For multi-instance deployments, use Redis to share state across pipeline instances:
use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
use rigatoni_destinations::s3::{S3Config, S3Destination};
use rigatoni_stores::redis::{RedisStore, RedisConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure Redis state store
let redis_config = RedisConfig::builder()
.url("redis://localhost:6379")
.pool_size(10)
.ttl(Duration::from_secs(7 * 24 * 60 * 60)) // 7 days
.build()?;
let store = RedisStore::new(redis_config).await?;
// Configure S3 destination
let s3_config = S3Config::builder()
.bucket("my-data-lake")
.region("us-east-1")
.build()?;
let destination = S3Destination::new(s3_config).await?;
// Configure pipeline with Redis store
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.collections(vec!["users", "orders"])
.build()?;
// Create and run pipeline with distributed state
let mut pipeline = Pipeline::new(config, store, destination).await?;
pipeline.start().await?;
Ok(())
}
Horizontal Scaling with Distributed Locking
Rigatoni supports horizontal scaling with distributed locking to prevent duplicate event processing:
use rigatoni_core::pipeline::{Pipeline, PipelineConfig, DistributedLockConfig};
use rigatoni_stores::redis::{RedisStore, RedisConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure Redis store (required for distributed locking)
let redis_config = RedisConfig::builder()
.url("redis://localhost:6379")
.pool_size(10)
.build()?;
let store = RedisStore::new(redis_config).await?;
// Configure pipeline with distributed locking
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
.database("mydb")
.watch_collections(vec!["users".to_string(), "orders".to_string()])
.distributed_lock(DistributedLockConfig {
enabled: true,
ttl: Duration::from_secs(30), // Lock expires if holder crashes
refresh_interval: Duration::from_secs(10), // Heartbeat interval
retry_interval: Duration::from_secs(5), // Retry claiming locks
})
.build()?;
// Create and run pipeline
let mut pipeline = Pipeline::new(config, store, destination).await?;
pipeline.start().await?;
Ok(())
}
How it works:
- Each collection is protected by a distributed lock (stored in Redis)
- Only one instance processes a collection at a time
- If an instance crashes, its locks expire after TTL (default 30s)
- Other instances automatically take over orphaned collections
- Total throughput scales linearly with number of instances
Instance 1 Instance 2 Instance 3
| | |
v v v
Acquires locks Acquires locks Acquires locks
"users" "orders" "products"
| | |
v v v
Process events Process events Process events
(no duplicates!) (no duplicates!) (no duplicates!)
See Multi-Instance Deployment Guide for Kubernetes examples, configuration tuning, and failure handling.
See Getting Started for detailed tutorials and Redis Configuration Guide for production deployment.
Metrics and Monitoring
Rigatoni in
Related Skills
feishu-drive
342.0k|
things-mac
342.0kManage Things 3 via the `things` CLI on macOS (add/update projects+todos via URL scheme; read/search/list from the local Things database)
clawhub
342.0kUse the ClawHub CLI to search, install, update, and publish agent skills from clawhub.com
codebase-memory-mcp
1.1kHigh-performance code intelligence MCP server. Indexes codebases into a persistent knowledge graph — average repo in milliseconds. 66 languages, sub-ms queries, 99% fewer tokens. Single static binary, zero dependencies.
