SkillAgentSearch skills...

Minigun

A lightweight framework for rapid-fire batch job processing

Install / Use

/learn @tablecheck/Minigun
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Minigun

Minigun go BRRRRR

Minigun is a high-performance data processing pipeline framework for Ruby.

Features

  • Define multi-stage processing pipelines with a simple, expressive DSL.
  • Process data using multiple threads and/or processes for maximum performance.
  • Use Copy-On-Write (COW) or IPC forking for efficient parallel processing.
  • Direct connections between stages with from and to options.
  • Queue-based routing with selective queue subscriptions.
  • Batch accumulation for efficient processing.
  • Comprehensive error handling and retry mechanisms.
  • Optional MessagePack serialization for faster IPC.
  • Data compression for large transfers between processes.
  • Smart garbage collection for memory optimization.

In many use cases, Minigun can replace queue systems like Resque, Solid Queue, or Sidekiq. Minigun itself is run entire in Ruby's memory, and is database and application agnostic.

TODOs

  • [ ] Extract examples to examples folder
  • [ ] Add more examples based on real-world use cases
  • [ ] Add support for named queues and queue-based routing (already there?)
  • [ ] Add parallel and sequential blocks for defining parallel and sequential stages
  • [ ] Add support for custom error handling and retry strategies
  • [ ] Add support for custom logging and monitoring
  • [ ] Add support for custom thread and process management (?)

Installation

Add this line to your application's Gemfile:

gem 'minigun'

Quick Start

require 'minigun'

class MyTask
  include Minigun::DSL

  pipeline do
    producer :generate do
      10.times { |i| emit(i) }
    end

    processor :transform do |number|
      emit(number * 2)
    end

    accumulator :batch do |item|
      @items ||= []
      @items << item

      if @items.size >= 5
        batch = @items.dup
        @items.clear
        emit(batch)
      end
    end

    cow_fork :process_batch do |batch|
      # Process the batch in a forked child process
      batch.each { |item| puts "Processing #{item}" }
    end
  end
end

# Run the task
MyTask.new.run

Core Concepts

Pipeline Stages

Minigun has unified its stage types into a cohesive system where each specialized stage is a variation of a common processor implementation:

  1. Producer: Generates data for the pipeline. A producer is a processor without input.
  2. Processor: Transforms data and passes it to the next stage. It can filter, modify, or route data.
  3. Accumulator: Collects and batches items before forwarding them in groups.
  4. Consumer: Consumes data without emitting anything further. A consumer is a processor without output.

Fork Variants

For handling batched data processing, two fork implementations are available:

  • cow_fork: Uses Copy-On-Write fork to efficiently process batches in separate child processes
  • ipc_fork: Uses IPC-style forking for batch processing with different memory characteristics

These are actually aliases for the consumer stage with specific fork configurations.

Custom Stage Classes

Minigun allows you to create custom stage classes to encapsulate complex behavior or implement specialized processing patterns. All stages inherit from the base Stage class and can override its behavior.

Execution Modes

Every stage has a run_mode that determines how it executes within the pipeline. There are three execution strategies:

:autonomous  # Generates data independently (ProducerStage)
:streaming   # Processes stream of items in worker loop (Stage, ConsumerStage)
:composite   # Manages internal stages (PipelineStage)

The run_mode method controls critical behaviors like:

  • Whether the stage needs an input queue
  • Whether it needs an executor for concurrent processing
  • How the pipeline routes data to and from the stage
  • How the stage participates in disconnection detection

Creating Custom Stages

To create a custom stage class, inherit from Minigun::Stage and implement the required methods:

class CustomStage < Minigun::Stage
  # Define execution mode (default: :streaming)
  def run_mode
    :streaming
  end

  # Define how a single item is processed
  def execute(context, item: nil, input_queue: nil, output_queue: nil)
    # Your custom processing logic
    result = process_item(item)
    output_queue << result if output_queue
  end

  # Optional: Customize the stage execution
  def run_stage(stage_ctx)
    # Custom execution implementation
    # See ProducerStage or ConsumerStage for examples
  end

  # Optional: Customize logging type
  def log_type
    "Custom"
  end
end

Example: Batch Processor Stage

Here's a custom stage that batches items with a timeout:

class TimedBatchStage < Minigun::Stage
  attr_reader :batch_size, :timeout

  def initialize(name:, options: {})
    super
    @batch_size = options[:batch_size] || 100
    @timeout = options[:timeout] || 5.0
  end

  def run_mode
    :streaming  # Processes items from input queue
  end

  def run_stage(stage_ctx)
    require 'minigun/queue_wrappers'

    wrapped_input = Minigun::InputQueue.new(
      stage_ctx.input_queue,
      stage_ctx.stage_name,
      stage_ctx.sources_expected
    )

    wrapped_output = Minigun::OutputQueue.new(
      stage_ctx.stage_name,
      stage_ctx.dag.downstream(stage_ctx.stage_name).map { |ds|
        stage_ctx.stage_input_queues[ds]
      },
      stage_ctx.stage_input_queues,
      stage_ctx.runtime_edges
    )

    batch = []
    last_flush = Time.now

    loop do
      # Check for timeout
      if !batch.empty? && (Time.now - last_flush) >= @timeout
        wrapped_output << batch.dup
        batch.clear
        last_flush = Time.now
      end

      # Try to get item with timeout
      begin
        item = wrapped_input.pop(timeout: 0.1)

        if item == Minigun::AllUpstreamsDone
          # Flush remaining items
          wrapped_output << batch unless batch.empty?
          break
        end

        batch << item

        # Flush if batch is full
        if batch.size >= @batch_size
          wrapped_output << batch.dup
          batch.clear
          last_flush = Time.now
        end
      rescue ThreadError
        # Timeout, continue to check for flush
      end
    end

    send_end_signals(stage_ctx)
  end
end

# Use in a pipeline
class MyTask
  include Minigun::DSL

  pipeline do
    producer :generate do
      100.times { |i| emit(i) }
    end

    # Use custom stage class
    custom_stage TimedBatchStage, :batch, batch_size: 10, timeout: 2.0

    consumer :process do |batch, output|
      puts "Processing batch of #{batch.size} items"
    end
  end
end

Example: Stateful Filter Stage

Create a stage that filters based on accumulated state:

class DeduplicatorStage < Minigun::Stage
  def initialize(name:, options: {})
    super
    @seen = Set.new
    @mutex = Mutex.new
  end

  def run_mode
    :streaming
  end

  def execute(context, item: nil, input_queue: nil, output_queue: nil)
    key = extract_key(item)

    is_new = @mutex.synchronize do
      if @seen.include?(key)
        false
      else
        @seen.add(key)
        true
      end
    end

    output_queue << item if is_new && output_queue
  end

  private

  def extract_key(item)
    # Override in subclass or pass as option
    item[:id] || item
  end
end

When to Create Custom Stages

Consider creating custom stage classes when you need:

  1. Complex State Management: Stages that maintain sophisticated internal state
  2. Specialized Worker Loops: Custom timing, batching, or control flow logic
  3. Reusable Patterns: Behavior you want to use across multiple pipelines
  4. Framework Extensions: Adding new execution modes or patterns to Minigun
  5. Performance Optimization: Fine-tuned control over threading, batching, or memory

For simple transformations, use the standard producer, processor, and consumer DSL methods. For complex, reusable behavior, create custom stage classes.

Stage Connections

Minigun supports two types of stage connections:

  1. Sequential Connections: By default, stages are connected in the order they're defined
  2. Explicit Connections: Use from and to options to explicitly define connections
# Sequential connection
processor :first_stage do |item|
  item + 1
end

processor :second_stage do |item|
  item * 2
end

# Explicit connections
processor :stage_a, to: [:stage_b, :stage_c] do |item|
  item
end

processor :stage_b, from: :stage_a do |item|
  # Process items from stage_a
end

processor :stage_c, from: :stage_a do |item|
  # Also process items from stage_a
end

Advanced Connection Examples

Branching Pipeline

Create a pipeline that branches based on the type of data:

pipeline do
  # Producer emits to multiple processors
  producer :user_producer, to: [:email_processor, :notification_processor] do
    User.find_each do |user|
      emit(user)
    end
  end

  # These processors receive data from the same producer
  processor :email_processor, from: :user_producer do |user|
    generate_email(user)
  end

  processor :notification_processor, from: :user_producer do |user|
    generate_notification(user)
  end

  # Connect the email processor to an accumulator
  accumulator :email_accumulator, from: :email_processor do |email|
    @emails ||= []
    @emails << email

    if @emails.size >= 100
      batch = @emails.dup
      @emails.clear
      emit(batch)
    end
  end

  # Process accumulated emails
  cow_fork :email_sender, from: :email_accumulator, processes: 4 do |emails|
    emails.each { |email| send_email(email) }
  end

  # Process notifications directly
  consumer :notification_sender, from: :notification_processor do |notification|
    send_notification(notification)
  end
end

Diamond-Shaped Pipeline

Create a pipeline that splits and rejoins:

pipeli
View on GitHub
GitHub Stars5
CategoryDevelopment
Updated3mo ago
Forks0

Languages

Ruby

Security Score

82/100

Audited on Dec 11, 2025

No findings