SkillAgentSearch skills...

Rigatoni

A high-performance, type-safe CDC/Data replication framework for Rust, focused on real-time data pipelines.

Install / Use

/learn @valeriouberti/Rigatoni
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Rigatoni

<p align="center"> <img src="assets/logo.webp" alt="Rigatoni Logo" width="180" /> </p>

CI License: Apache-2.0 Rust Version Benchmarks

A high-performance, type-safe CDC/Data Replication framework for Rust, focused on real-time data pipelines.

🎯 Overview

Rigatoni is a modern CDC (Change Data Capture) and data replication framework built for speed, reliability, and developer experience. Built with Rust's type system and async/await, it provides production-ready data pipelines for real-time streaming workloads from databases to data lakes and other destinations.

Currently supporting:

  • MongoDB Change Streams - Real-time CDC (Change Data Capture) from MongoDB
  • S3 Destination - Export to AWS S3 with multiple formats (JSON, CSV, Parquet, Avro)
  • Redis State Store - Distributed state management for multi-instance deployments
  • Pipeline Orchestration - Multi-worker architecture with retry logic and state management
  • Metrics & Observability - Prometheus metrics with Grafana dashboards
  • Async-first design - Powered by Tokio for high throughput
  • Type-safe transformations - Compile-time guarantees with Rust's type system
  • Modular architecture - Extensible with feature flags

⚡ Performance

Rigatoni delivers exceptional performance for high-throughput CDC workloads:

  • ~780ns per event - Core processing with linear scaling
  • ~1.2μs per event - JSON serialization
  • 7.65ms for 1000 events - S3 writes with ZSTD compression
  • 10K-100K events/sec - Production-ready throughput
  • Sub-millisecond - State store operations

See our detailed benchmarks for comprehensive performance analysis.

✨ Features

  • 🚀 High Performance: Async/await architecture with Tokio for concurrent processing
  • 🔒 Type Safety: Leverage Rust's type system for data transformation guarantees
  • 📊 MongoDB CDC: Real-time change stream listening with resume token support
  • 📦 S3 Integration: Multiple formats (JSON, CSV, Parquet, Avro) with compression (gzip, zstd)
  • 🗄️ Distributed State: Redis-backed state store for multi-instance deployments
  • 🔐 Distributed Locking: Redis-based locking for horizontal scaling without duplicates
  • 🔄 Retry Logic: Exponential backoff with configurable limits
  • 🎯 Batching: Automatic batching based on size and time windows
  • 🎨 Composable Pipelines: Build data replication workflows from simple, testable components
  • 📊 Metrics: Prometheus metrics for throughput, latency, errors, and health
  • 📝 Observability: Comprehensive tracing, metrics, and Grafana dashboards
  • 🧪 Testable: Mock destinations and comprehensive test utilities

🏗️ Architecture

Rigatoni is organized as a workspace with three main crates:

rigatoni/
├── rigatoni-core/           # Core traits and pipeline orchestration
├── rigatoni-destinations/   # Destination implementations
└── rigatoni-stores/         # State store implementations

Core Concepts

  • Source: Extract data from systems (MongoDB change streams)
  • Destination: Load data into target systems (S3 with multiple formats)
  • Store: Manage pipeline state for reliability (in-memory, Redis)
  • Pipeline: Orchestrate the entire data replication workflow with error handling

🚀 Quick Start

Prerequisites

  • Rust 1.88 or later
  • AWS credentials configured for S3 access

Installation

Add Rigatoni to your Cargo.toml:

[dependencies]
rigatoni-core = "0.2.0"
rigatoni-destinations = { version = "0.2.0", features = ["s3"] }
rigatoni-stores = { version = "0.2.0", features = ["memory"] }

Basic Example: MongoDB to S3 Pipeline

use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
use rigatoni_destinations::s3::{S3Config, S3Destination};
use rigatoni_stores::memory::MemoryStore;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure state store (in-memory for simplicity)
    let store = MemoryStore::new();

    // Configure S3 destination
    let s3_config = S3Config::builder()
        .bucket("my-data-lake")
        .region("us-east-1")
        .prefix("mongodb-cdc")
        .build()?;

    let destination = S3Destination::new(s3_config).await?;

    // Configure pipeline - watch entire database
    let config = PipelineConfig::builder()
        .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
        .database("mydb")
        .watch_database()  // Watch all collections in the database
        .batch_size(1000)
        .build()?;

    // Create and run pipeline
    let mut pipeline = Pipeline::new(config, store, destination).await?;
    pipeline.start().await?;

    Ok(())
}

Watch Levels

Rigatoni supports three levels of change stream watching:

// Watch specific collections only
let config = PipelineConfig::builder()
    .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
    .database("mydb")
    .watch_collections(vec!["users".to_string(), "orders".to_string()])
    .build()?;

// Watch all collections in a database (recommended)
let config = PipelineConfig::builder()
    .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
    .database("mydb")
    .watch_database()  // Automatically picks up new collections
    .build()?;

// Watch all databases in the deployment (cluster-wide)
let config = PipelineConfig::builder()
    .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
    .database("mydb")
    .watch_deployment()  // Requires MongoDB 4.0+ and cluster-wide permissions
    .build()?;

Distributed State with Redis

For multi-instance deployments, use Redis to share state across pipeline instances:

use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
use rigatoni_destinations::s3::{S3Config, S3Destination};
use rigatoni_stores::redis::{RedisStore, RedisConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure Redis state store
    let redis_config = RedisConfig::builder()
        .url("redis://localhost:6379")
        .pool_size(10)
        .ttl(Duration::from_secs(7 * 24 * 60 * 60))  // 7 days
        .build()?;

    let store = RedisStore::new(redis_config).await?;

    // Configure S3 destination
    let s3_config = S3Config::builder()
        .bucket("my-data-lake")
        .region("us-east-1")
        .build()?;

    let destination = S3Destination::new(s3_config).await?;

    // Configure pipeline with Redis store
    let config = PipelineConfig::builder()
        .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
        .database("mydb")
        .collections(vec!["users", "orders"])
        .build()?;

    // Create and run pipeline with distributed state
    let mut pipeline = Pipeline::new(config, store, destination).await?;
    pipeline.start().await?;

    Ok(())
}

Horizontal Scaling with Distributed Locking

Rigatoni supports horizontal scaling with distributed locking to prevent duplicate event processing:

use rigatoni_core::pipeline::{Pipeline, PipelineConfig, DistributedLockConfig};
use rigatoni_stores::redis::{RedisStore, RedisConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure Redis store (required for distributed locking)
    let redis_config = RedisConfig::builder()
        .url("redis://localhost:6379")
        .pool_size(10)
        .build()?;

    let store = RedisStore::new(redis_config).await?;

    // Configure pipeline with distributed locking
    let config = PipelineConfig::builder()
        .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
        .database("mydb")
        .watch_collections(vec!["users".to_string(), "orders".to_string()])
        .distributed_lock(DistributedLockConfig {
            enabled: true,
            ttl: Duration::from_secs(30),           // Lock expires if holder crashes
            refresh_interval: Duration::from_secs(10), // Heartbeat interval
            retry_interval: Duration::from_secs(5),    // Retry claiming locks
        })
        .build()?;

    // Create and run pipeline
    let mut pipeline = Pipeline::new(config, store, destination).await?;
    pipeline.start().await?;

    Ok(())
}

How it works:

  • Each collection is protected by a distributed lock (stored in Redis)
  • Only one instance processes a collection at a time
  • If an instance crashes, its locks expire after TTL (default 30s)
  • Other instances automatically take over orphaned collections
  • Total throughput scales linearly with number of instances
Instance 1          Instance 2          Instance 3
    |                   |                   |
    v                   v                   v
Acquires locks     Acquires locks     Acquires locks
"users"            "orders"           "products"
    |                   |                   |
    v                   v                   v
Process events     Process events     Process events
(no duplicates!)   (no duplicates!)   (no duplicates!)

See Multi-Instance Deployment Guide for Kubernetes examples, configuration tuning, and failure handling.

See Getting Started for detailed tutorials and Redis Configuration Guide for production deployment.

Metrics and Monitoring

Rigatoni in

Related Skills

View on GitHub
GitHub Stars11
CategoryData
Updated3mo ago
Forks0

Languages

Rust

Security Score

95/100

Audited on Dec 30, 2025

No findings