Pipelens
Track, inspect, and visualize every step in your pipeline. Built for debugging complex pipeline like non-deterministic AI workflows
Install / Use
/learn @lokwkin/PipelensREADME
PipeLens (Formerly Steps-track)
<img src="./docs/pipelens.png">⭐ Building LLM applications without a framework? ⭐
⭐ Not sure what’s slow or what’s happening under the hood? ⭐
⭐ You’ve come to the right place! ⭐
What is PipeLens?
PipeLens is an observability tool built to help tracking, visualizing and inspecting intermediate steps in a pipeline-based / multi-steps application. It automatically captures and stores the intermediate data, results and execution times of each steps in a pipeline, visualizing the execution details and allowing easier debug or analysis through an analytic dashboard.
It supports both Python and Typescript / Node.js
<details> <summary>Some Background of the Project</summary></details>PipeLens is a lightweight inspection and debugging tool originally built to monitor an agentic Retrieval-Augmented Generation (RAG) pipeline running in a production environment—where visibility, performance, and stability are critical.
When chaining multiple LLM agents with custom logic and dynamic inputs, non-deterministic nature of LLM outputs of each steps often lead to dynamic route of logics and behaviors. I needed a inspection tool but the existing tools didn't provide the granularity I needed to trace what happened inside each step of the pipeline.
So I built PipeLens to do just that: trace, inspect, and understand every step of each request. It helped me quickly spot bottlenecks, unexpected behaviors and performance drags, and address them effectively.
I'm open-sourcing it in the hope that it helps others building and operating complex LLM pipelines.
Contributions welcome!
⭐ Sounds interesting? Kindly give it a Star! ⭐
Features
1. Tracking Pipeline Steps
- Tracking: Define steps in pipeline to track intermediates data, results and execution time
- Visualizing: Exporting the details and generating basic visualizations including Gantt and Execution Graph
- Event Emitting: Listen to step events for real-time monitoring and custom handling
- ES6 Decorators: Easy integration with ES6 decorators
- LLM Tracking Extension: Simple tracker optimized for LLM usage
2. Using Dashboard
Monitor and analyze pipeline executions through an interactive web interface
- Detailed Steps Data and Results Inspection
- Real-time Execution Monitoring
- Gantt Chart Visualization for pipeline
- Step Execution Stats
Note: PipeLens is designed for any pipeline-based / multi-steps logic, especially agentic LLM pipelines
Getting Started
This repository is a monorepo containing following packages:
- Typescript / Python libraries that provides basic tracker and chart generation function for your pipeline
- Dashboard that visualizes and allows you to monitor tracked data for analysis.
Setup from your codebase
Installation
# Typescript
npm install --save pipelens
# Python
pip install pipelens
Define steps in your code where you intended to track
<details> <summary>Typescript</summary>// ================================================
// Using TypeScript Decorators
// ================================================
import { Pipeline, Step, WithStep } from 'pipelens';
@WithStep('some_step')
async function someStep(inputStr: string, st: Step) {
// ... some logic ...
st.record('key', 'value'); // Record data for analysis
return 'some_result'; // Results are automatically recorded
}
@WithStep('child')
async function childFunc(param: number, st: Step) {
// ... some logic ...
}
@WithStep('parent')
async function parentFunc(st: Step) {
// Track nested steps
await childFunc(1, st);
await childFunc(2, st);
// Track parallel steps
await Promise.all([
childFunc(3, st),
childFunc(4, st),
]);
}
async function runPipeline(st: Step) {
await someStep('some_value', st);
await parentFunc(st);
}
const pipeline = new Pipeline('my_pipeline');
await pipeline.track(runPipeline);
// ================================================
// Or you may optionally run without decorators
// ================================================
import { Pipeline, Step } from 'pipelens';
// Create pipeline
const pipeline = new Pipeline('my-pipeline');
// Define your pipeline logic without decorators
async function pipelineLogic(st: Step) {
// Helper function for your business logic
async function someTask(someArgs: string, step: Step) {
// ... your logic ...
step.record('key', 'value'); // Record data for analysis
return 'some_result'; // Results are automatically recorded
}
// Track a simple step
async function someStep(step: Step) {
return await someTask('your_args', step);
}
const result = await st.step('some_step', someStep);
// Track nested steps
async function parentStep(step: Step) {
async function child1(step: Step) {
return await someTask('args', step);
}
async function child2(step: Step) {
return await someTask('args', step);
}
await step.step('child_1', child1);
await step.step('child_2', child2);
}
await st.step('parent', parentStep);
// Track parallel steps
async function parallel1(step: Step) {
return await someTask('args', step);
}
async function parallel2(step: Step) {
return await someTask('args', step);
}
await Promise.all([
st.step('parallel_1', parallel1),
st.step('parallel_2', parallel2),
]);
}
// Run the pipeline
await pipeline.track(pipelineLogic);
// ================================================
// Some simple visualization functions to use if you
// don't want to integrate with dashboard
// ================================================
// Generate a Gantt chart Buffer using quickchart.io
const ganttChartBuffer = await pipeline.ganttQuickchart();
// Generate a Gantt chart HTML file with Google Charts
const ganttChartHtml = await pipeline.ganttGoogleChartHtml();
// Generate an execution graph URL
const executionGraphUrl = pipeline.executionGraphQuickchart();
// Get the hierarchical output of all steps
const stepsHierarchy = pipeline.outputNested();
</details>
<details>
<summary>Python</summary>
# ================================================
# Using Python Decorators
# ================================================
from pipelens import Pipeline, Step, with_step
@with_step('some_step')
async def some_step(input_str: str, st: Step):
# ... some logic ...
await st.record('key', 'value') # Record data for analysis
return 'some_result' # Results are automatically recorded
@with_step('child')
async def child_func(param: int, st: Step):
# ... some logic ...
@with_step('parent')
async def parent_func(st: Step):
# Track nested steps
await child_func(1, st)
await child_func(2, st)
# Track parallel steps
import asyncio
await asyncio.gather(
child_func(3, st),
child_func(4, st),
)
async def run_pipeline(st: Step):
await some_step('some_value', st)
await parent_func(st)
pipeline = Pipeline('my_pipeline')
await pipeline.track(run_pipeline)
# ================================================
# Or you may optionally run without decorators
# ================================================
import asyncio
from pipelens import Pipeline, Step
# Create pipeline
pipeline = Pipeline('my-pipeline')
async def pipeline_logic(st: Step):
# Helper function for your business logic
async def some_task(some_args: str, step: Step):
# ... your logic ...
await step.record('key', 'value') # Record data for analysis
return 'some_result' # Results are automatically recorded
# Track a simple step
async def some_step(step: Step):
return await some_task('your_args', step)
result = await st.step('some_step', some_step)
# Track nested steps
async def parent_step(step: Step):
async def child_1(step: Step):
return await some_task('args', step)
async def child_2(step: Step):
return await some_task('args', step)
await step.step('child_1', child_1)
await step.step('child_2', child_2)
await st.step('parent', parent_step)
# Track parallel steps
async def parallel_1(step: Step):
return await some_task('args', step)
async def parallel_2(step: Step):
return await some_task('args', step)
await asyncio.gather(
st.step('parallel_1', parallel_1),
st.step('parallel_2', parallel_2)
)
# Run the pipeline
await pipeline.track(pipeline_logic)
# ================================================
# Some simple visualization functions to use if you
# don't want to integrate with dashboard
# ================================================
# Generate a Gantt chart Buffer using quickchart.io
gantt_chart_buffer = await pipeline.gantt_quickchart()
# Generate a Gantt chart HTML file with Google Charts
gantt_chart_html = await pipeline.gantt_google_chart_html()
# Generate an execution graph URL
execution_graph_url = pipeline.execution_gr
