Minigun
A lightweight framework for rapid-fire batch job processing
Install / Use
/learn @tablecheck/MinigunREADME
Minigun go BRRRRR
Minigun is a high-performance data processing pipeline framework for Ruby.
Features
- Define multi-stage processing pipelines with a simple, expressive DSL.
- Process data using multiple threads and/or processes for maximum performance.
- Use Copy-On-Write (COW) or IPC forking for efficient parallel processing.
- Direct connections between stages with
fromandtooptions. - Queue-based routing with selective queue subscriptions.
- Batch accumulation for efficient processing.
- Comprehensive error handling and retry mechanisms.
- Optional MessagePack serialization for faster IPC.
- Data compression for large transfers between processes.
- Smart garbage collection for memory optimization.
In many use cases, Minigun can replace queue systems like Resque, Solid Queue, or Sidekiq. Minigun itself is run entire in Ruby's memory, and is database and application agnostic.
TODOs
- [ ] Extract examples to examples folder
- [ ] Add more examples based on real-world use cases
- [ ] Add support for named queues and queue-based routing (already there?)
- [ ] Add
parallelandsequentialblocks for defining parallel and sequential stages - [ ] Add support for custom error handling and retry strategies
- [ ] Add support for custom logging and monitoring
- [ ] Add support for custom thread and process management (?)
Installation
Add this line to your application's Gemfile:
gem 'minigun'
Quick Start
require 'minigun'
class MyTask
include Minigun::DSL
pipeline do
producer :generate do
10.times { |i| emit(i) }
end
processor :transform do |number|
emit(number * 2)
end
accumulator :batch do |item|
@items ||= []
@items << item
if @items.size >= 5
batch = @items.dup
@items.clear
emit(batch)
end
end
cow_fork :process_batch do |batch|
# Process the batch in a forked child process
batch.each { |item| puts "Processing #{item}" }
end
end
end
# Run the task
MyTask.new.run
Core Concepts
Pipeline Stages
Minigun has unified its stage types into a cohesive system where each specialized stage is a variation of a common processor implementation:
- Producer: Generates data for the pipeline. A producer is a processor without input.
- Processor: Transforms data and passes it to the next stage. It can filter, modify, or route data.
- Accumulator: Collects and batches items before forwarding them in groups.
- Consumer: Consumes data without emitting anything further. A consumer is a processor without output.
Fork Variants
For handling batched data processing, two fork implementations are available:
- cow_fork: Uses Copy-On-Write fork to efficiently process batches in separate child processes
- ipc_fork: Uses IPC-style forking for batch processing with different memory characteristics
These are actually aliases for the consumer stage with specific fork configurations.
Custom Stage Classes
Minigun allows you to create custom stage classes to encapsulate complex behavior or implement specialized processing patterns. All stages inherit from the base Stage class and can override its behavior.
Execution Modes
Every stage has a run_mode that determines how it executes within the pipeline. There are three execution strategies:
:autonomous # Generates data independently (ProducerStage)
:streaming # Processes stream of items in worker loop (Stage, ConsumerStage)
:composite # Manages internal stages (PipelineStage)
The run_mode method controls critical behaviors like:
- Whether the stage needs an input queue
- Whether it needs an executor for concurrent processing
- How the pipeline routes data to and from the stage
- How the stage participates in disconnection detection
Creating Custom Stages
To create a custom stage class, inherit from Minigun::Stage and implement the required methods:
class CustomStage < Minigun::Stage
# Define execution mode (default: :streaming)
def run_mode
:streaming
end
# Define how a single item is processed
def execute(context, item: nil, input_queue: nil, output_queue: nil)
# Your custom processing logic
result = process_item(item)
output_queue << result if output_queue
end
# Optional: Customize the stage execution
def run_stage(stage_ctx)
# Custom execution implementation
# See ProducerStage or ConsumerStage for examples
end
# Optional: Customize logging type
def log_type
"Custom"
end
end
Example: Batch Processor Stage
Here's a custom stage that batches items with a timeout:
class TimedBatchStage < Minigun::Stage
attr_reader :batch_size, :timeout
def initialize(name:, options: {})
super
@batch_size = options[:batch_size] || 100
@timeout = options[:timeout] || 5.0
end
def run_mode
:streaming # Processes items from input queue
end
def run_stage(stage_ctx)
require 'minigun/queue_wrappers'
wrapped_input = Minigun::InputQueue.new(
stage_ctx.input_queue,
stage_ctx.stage_name,
stage_ctx.sources_expected
)
wrapped_output = Minigun::OutputQueue.new(
stage_ctx.stage_name,
stage_ctx.dag.downstream(stage_ctx.stage_name).map { |ds|
stage_ctx.stage_input_queues[ds]
},
stage_ctx.stage_input_queues,
stage_ctx.runtime_edges
)
batch = []
last_flush = Time.now
loop do
# Check for timeout
if !batch.empty? && (Time.now - last_flush) >= @timeout
wrapped_output << batch.dup
batch.clear
last_flush = Time.now
end
# Try to get item with timeout
begin
item = wrapped_input.pop(timeout: 0.1)
if item == Minigun::AllUpstreamsDone
# Flush remaining items
wrapped_output << batch unless batch.empty?
break
end
batch << item
# Flush if batch is full
if batch.size >= @batch_size
wrapped_output << batch.dup
batch.clear
last_flush = Time.now
end
rescue ThreadError
# Timeout, continue to check for flush
end
end
send_end_signals(stage_ctx)
end
end
# Use in a pipeline
class MyTask
include Minigun::DSL
pipeline do
producer :generate do
100.times { |i| emit(i) }
end
# Use custom stage class
custom_stage TimedBatchStage, :batch, batch_size: 10, timeout: 2.0
consumer :process do |batch, output|
puts "Processing batch of #{batch.size} items"
end
end
end
Example: Stateful Filter Stage
Create a stage that filters based on accumulated state:
class DeduplicatorStage < Minigun::Stage
def initialize(name:, options: {})
super
@seen = Set.new
@mutex = Mutex.new
end
def run_mode
:streaming
end
def execute(context, item: nil, input_queue: nil, output_queue: nil)
key = extract_key(item)
is_new = @mutex.synchronize do
if @seen.include?(key)
false
else
@seen.add(key)
true
end
end
output_queue << item if is_new && output_queue
end
private
def extract_key(item)
# Override in subclass or pass as option
item[:id] || item
end
end
When to Create Custom Stages
Consider creating custom stage classes when you need:
- Complex State Management: Stages that maintain sophisticated internal state
- Specialized Worker Loops: Custom timing, batching, or control flow logic
- Reusable Patterns: Behavior you want to use across multiple pipelines
- Framework Extensions: Adding new execution modes or patterns to Minigun
- Performance Optimization: Fine-tuned control over threading, batching, or memory
For simple transformations, use the standard producer, processor, and consumer DSL methods. For complex, reusable behavior, create custom stage classes.
Stage Connections
Minigun supports two types of stage connections:
- Sequential Connections: By default, stages are connected in the order they're defined
- Explicit Connections: Use
fromandtooptions to explicitly define connections
# Sequential connection
processor :first_stage do |item|
item + 1
end
processor :second_stage do |item|
item * 2
end
# Explicit connections
processor :stage_a, to: [:stage_b, :stage_c] do |item|
item
end
processor :stage_b, from: :stage_a do |item|
# Process items from stage_a
end
processor :stage_c, from: :stage_a do |item|
# Also process items from stage_a
end
Advanced Connection Examples
Branching Pipeline
Create a pipeline that branches based on the type of data:
pipeline do
# Producer emits to multiple processors
producer :user_producer, to: [:email_processor, :notification_processor] do
User.find_each do |user|
emit(user)
end
end
# These processors receive data from the same producer
processor :email_processor, from: :user_producer do |user|
generate_email(user)
end
processor :notification_processor, from: :user_producer do |user|
generate_notification(user)
end
# Connect the email processor to an accumulator
accumulator :email_accumulator, from: :email_processor do |email|
@emails ||= []
@emails << email
if @emails.size >= 100
batch = @emails.dup
@emails.clear
emit(batch)
end
end
# Process accumulated emails
cow_fork :email_sender, from: :email_accumulator, processes: 4 do |emails|
emails.each { |email| send_email(email) }
end
# Process notifications directly
consumer :notification_sender, from: :notification_processor do |notification|
send_notification(notification)
end
end
Diamond-Shaped Pipeline
Create a pipeline that splits and rejoins:
pipeli
