Routilux
Routines-based, event-driven workflow orchestration for Python—compose complex data/AI pipelines and run concurrent workflows across distributed systems with durable state and robust error handling.
Install / Use
/learn @lzjever/RoutiluxREADME
Routilux ⚡
Routilux — Event-driven workflow orchestration. Too many pipelines to tame? One event queue for orchestration, concurrency, and resume-from-checkpoint. Build in minutes, recover anytime.
✨ Why Routilux?
- 🚀 Event queue: Non-blocking, one model for sequential and concurrent execution
- 🔗 Flexible wiring: Many-to-many routines, smart routing
- 📊 State built-in: Execution state, metrics, history out of the box
- 🛡️ Error policies: STOP / CONTINUE / RETRY / SKIP with automatic recovery
- ⚡ Concurrent execution: I/O parallelized without blocking the main flow
- 💾 Checkpoint & resume: Save and restore at any node; survive interruptions
- 🎯 Production-ready: Error handling, tracing, and monitoring
- 🎨 Simple API: Flow auto-detection; fewer parameters in most cases
🎯 Perfect For
- Data Pipelines: ETL processes, data transformation workflows
- API Orchestration: Coordinating multiple API calls with complex dependencies
- Event Processing: Real-time event streams and reactive systems
- Workflow Automation: Business process automation and task scheduling
- Microservices Coordination: Managing interactions between services
- LLM Agent Workflows: Complex AI agent orchestration and chaining
📦 Installation
⚡ One-Line Install (Recommended)
# Mac / Linux - Auto-detects best method (uv > pipx > pip)
curl -fsSL https://raw.githubusercontent.com/lzjever/routilux/main/install.sh | bash
# Or with wget
wget -qO- https://raw.githubusercontent.com/lzjever/routilux/main/install.sh | bash
Installation options:
# Use pipx instead of uv
METHOD=pipx curl -fsSL https://raw.githubusercontent.com/lzjever/routilux/main/install.sh | bash
# Install specific version
VERSION=0.14.0 curl -fsSL https://raw.githubusercontent.com/lzjever/routilux/main/install.sh | bash
Recommended: Isolated CLI Installation (pipx)
The best way to install Routilux CLI without affecting your system:
# Install CLI with isolated environment
pipx install "routilux[cli]"
# Use anywhere
routilux --help
routilux run workflow.yaml
Why pipx?
- ✅ Creates isolated virtual environment (no dependency conflicts)
- ✅ CLI available globally
- ✅ Easy to update:
pipx upgrade routilux - ✅ Works on Mac and Linux
Alternative: Modern (uv tool)
Using uv (faster than pipx):
# Install
uv tool install "routilux[cli]"
# Use
routilux --help
macOS / Linux: Homebrew
# Add tap and install
brew tap lzjever/routilux
brew install routilux
# Or directly
brew install lzjever/routilux/routilux
Standard pip Install
For library use or development:
# Library only
pip install routilux
# With CLI support
pip install "routilux[cli]"
Development Setup with uv (Recommended)
This project uses uv for fast dependency management. Install uv first:
curl -LsSf https://astral.sh/uv/install.sh | sh
Then set up the development environment:
Recommended: For active development
# Install package with all development dependencies (recommended)
make dev-install
# Or manually with uv (dev group is installed by default)
uv sync --group docs --all-extras
Alternative: Dependencies only (for CI/CD or code review)
# Create virtual environment and install dependencies only (without installing the package)
# Useful for: CI/CD pipelines, code review, or when you only need development tools
make setup-venv
# Later, if you need to install the package:
make install
Understanding dependency groups vs extras:
- Dependency groups (
dev,docs): Development dependencies that are not published to PyPI. Thedevgroup is installed by default withuv sync. - Extras: Currently none, but may be added in the future.
All make commands will automatically use uv if available, otherwise fall back to pip.
Development Install (Legacy - using pip)
For development with all dependencies using pip:
pip install -e ".[dev]"
# Or using Makefile
make dev-install
🖥️ CLI
Routilux includes a command-line interface for workflow management:
# Install with CLI support
pip install routilux[cli]
# Run a workflow
routilux run --workflow flow.yaml
# Start server
routilux server start
# See all commands
routilux --help
CLI Commands
routilux init- Initialize a new project with example filesroutilux run- Execute a workflow from a DSL fileroutilux server- Start the HTTP server for API accessroutilux job- Submit and manage jobsroutilux list- List available routines or flowsroutilux validate- Validate a workflow DSL file
See CLI Documentation for details.
Server with Flow Loading
Start the HTTP server with flow auto-loading:
# Start server with flows directory
routilux server start --flows-dir ./flows --port 8080
# Built-in routines (Mapper, Filter, etc.) are automatically available
# Flows from ./flows/*.yaml are loaded at startup
# Hot reload enabled - flow files are watched for changes
Job Management
Submit and manage jobs via CLI:
# Submit job locally
routilux job submit --flow myflow --routine processor --data '{"input": "value"}'
# Submit job to remote server
routilux job submit --server http://localhost:8080 --flow myflow --routine processor --data '{}'
# Check job status
routilux job status <job_id>
# List jobs
routilux job list --flow myflow
🚀 Quick Start
For development with all dependencies using pip:
pip install -e ".[dev]"
# Or using Makefile
make dev-install
🚀 Quick Start
Create Your First Workflow in 3 Steps
Step 1: Define a Routine
from routilux import Routine
class DataProcessor(Routine):
def __init__(self):
super().__init__()
# Define input slot
self.input_slot = self.define_slot("input", handler=self.process_data)
# Define output event
self.output_event = self.define_event("output", ["result"])
def process_data(self, data=None, **kwargs):
# Flow is automatically detected from routine context
result = f"Processed: {data}"
self._stats["processed_count"] = self._stats.get("processed_count", 0) + 1
self.emit("output", result=result) # No need to pass flow!
Step 2: Create and Connect a Flow
from routilux import Flow
flow = Flow(flow_id="my_workflow")
processor1 = DataProcessor()
processor2 = DataProcessor()
id1 = flow.add_routine(processor1, "processor1")
id2 = flow.add_routine(processor2, "processor2")
# Connect: processor1's output → processor2's input
flow.connect(id1, "output", id2, "input")
Step 3: Execute
job_state = flow.execute(id1, entry_params={"data": "Hello, Routilux!"})
print(job_state.status) # "completed"
print(processor1.stats()) # {"processed_count": 1}
🎉 Done! You've created your first workflow.
💡 Key Features
🔄 Event Queue Architecture
Routines communicate through events and slots using a unified event queue pattern:
# Multiple routines can listen to the same event
flow.connect(processor1, "output", processor2, "input")
flow.connect(processor1, "output", processor3, "input") # Fan-out
# Multiple events can feed into the same slot
flow.connect(processor1, "output", aggregator, "input")
flow.connect(processor2, "output", aggregator, "input") # Fan-in
# emit() is non-blocking - returns immediately after enqueuing tasks
# Flow is automatically detected from routine context
self.emit("output", data="value") # No flow parameter needed!
🎛️ Flexible State Management
Track everything automatically:
# Access routine state
stats = routine.stats() # {"processed_count": 42, "errors": 0}
# Track execution history
history = job_state.get_execution_history()
# Performance metrics
perf = flow.execution_tracker.get_routine_performance("processor1")
🛡️ Built-in Error Handling
Choose the right strategy for your use case:
from routilux import ErrorHandler, ErrorStrategy
# Stop on error (default)
flow.set_error_handler(ErrorHandler(ErrorStrategy.STOP))
# Continue and log errors
flow.set_error_handler(ErrorHandler(ErrorStrategy.CONTINUE))
# Retry with exponential backoff
flow.set_error_handler(ErrorHandler(
ErrorStrategy.RETRY,
max_retries=3,
retry_delay=1.0,
backoff_multiplier=2.0
))
⚡ Unified Execution Model
Both sequential and concurrent modes use the same event queue mechanism:
# Sequential mode (default): max_workers=1
flow = Flow() # Sequential by default
# Concurrent mode: max_workers>1
flow.set_execution_strategy("concurrent", max_workers=4)
# Tasks are processed fairly in queue order
# Long chains don't block shorter ones
job_state = flow.execute(entry_routine_id)
flow.wait_for_completion() # Wait for async tasks
💾 Persistence & Recovery
Save and resume workflows:
# Save workflow state
job_state.save("workflow_state.json")
# Later, resume from saved state
saved_state = JobState.load("workflow_state.json")
flow.resume(saved_state)
