SkillAgentSearch skills...

Pipelens

Track, inspect, and visualize every step in your pipeline. Built for debugging complex pipeline like non-deterministic AI workflows

Install / Use

/learn @lokwkin/Pipelens

README

PipeLens (Formerly Steps-track)

npm version npm downloads pypi version pypi downloads lib-ts test lib-py test

<img src="./docs/pipelens.png">

Building LLM applications without a framework?

Not sure what’s slow or what’s happening under the hood?

You’ve come to the right place!

What is PipeLens?

PipeLens is an observability tool built to help tracking, visualizing and inspecting intermediate steps in a pipeline-based / multi-steps application. It automatically captures and stores the intermediate data, results and execution times of each steps in a pipeline, visualizing the execution details and allowing easier debug or analysis through an analytic dashboard.

It supports both Python and Typescript / Node.js

<details> <summary>Some Background of the Project</summary>

PipeLens is a lightweight inspection and debugging tool originally built to monitor an agentic Retrieval-Augmented Generation (RAG) pipeline running in a production environment—where visibility, performance, and stability are critical.

When chaining multiple LLM agents with custom logic and dynamic inputs, non-deterministic nature of LLM outputs of each steps often lead to dynamic route of logics and behaviors. I needed a inspection tool but the existing tools didn't provide the granularity I needed to trace what happened inside each step of the pipeline.

So I built PipeLens to do just that: trace, inspect, and understand every step of each request. It helped me quickly spot bottlenecks, unexpected behaviors and performance drags, and address them effectively.

I'm open-sourcing it in the hope that it helps others building and operating complex LLM pipelines.

Contributions welcome!

</details>

Sounds interesting? Kindly give it a Star!

Features

1. Tracking Pipeline Steps

  • Tracking: Define steps in pipeline to track intermediates data, results and execution time
  • Visualizing: Exporting the details and generating basic visualizations including Gantt and Execution Graph
  • Event Emitting: Listen to step events for real-time monitoring and custom handling
  • ES6 Decorators: Easy integration with ES6 decorators
  • LLM Tracking Extension: Simple tracker optimized for LLM usage

2. Using Dashboard

Monitor and analyze pipeline executions through an interactive web interface

  • Detailed Steps Data and Results Inspection
  • Real-time Execution Monitoring
  • Gantt Chart Visualization for pipeline
  • Step Execution Stats

Note: PipeLens is designed for any pipeline-based / multi-steps logic, especially agentic LLM pipelines

Getting Started

This repository is a monorepo containing following packages:

  • Typescript / Python libraries that provides basic tracker and chart generation function for your pipeline
  • Dashboard that visualizes and allows you to monitor tracked data for analysis.

Setup from your codebase

Installation

# Typescript
npm install --save pipelens

# Python
pip install pipelens

Define steps in your code where you intended to track

<details> <summary>Typescript</summary>
// ================================================
// Using TypeScript Decorators
// ================================================
import { Pipeline, Step, WithStep } from 'pipelens';

@WithStep('some_step')
async function someStep(inputStr: string, st: Step) {
  // ... some logic ...
  st.record('key', 'value'); // Record data for analysis
  return 'some_result'; // Results are automatically recorded
}

@WithStep('child')
async function childFunc(param: number, st: Step) {
  // ... some logic ...
}

@WithStep('parent')
async function parentFunc(st: Step) {
  // Track nested steps
  await childFunc(1, st);
  await childFunc(2, st);
  
  // Track parallel steps
  await Promise.all([
    childFunc(3, st),
    childFunc(4, st),
  ]);
}

async function runPipeline(st: Step) {
  await someStep('some_value', st);
  await parentFunc(st);
}

const pipeline = new Pipeline('my_pipeline');
await pipeline.track(runPipeline);

// ================================================
// Or you may optionally run without decorators
// ================================================
import { Pipeline, Step } from 'pipelens';

// Create pipeline
const pipeline = new Pipeline('my-pipeline');

// Define your pipeline logic without decorators
async function pipelineLogic(st: Step) {
  // Helper function for your business logic
  async function someTask(someArgs: string, step: Step) {
    // ... your logic ...
    step.record('key', 'value'); // Record data for analysis
    return 'some_result'; // Results are automatically recorded
  }

  // Track a simple step
  async function someStep(step: Step) {
    return await someTask('your_args', step);
  }
  
  const result = await st.step('some_step', someStep);
  
  // Track nested steps
  async function parentStep(step: Step) {
    async function child1(step: Step) {
      return await someTask('args', step);
    }
    
    async function child2(step: Step) {
      return await someTask('args', step);
    }
    
    await step.step('child_1', child1);
    await step.step('child_2', child2);
  }
  
  await st.step('parent', parentStep);
  
  // Track parallel steps
  async function parallel1(step: Step) {
    return await someTask('args', step);
  }
  
  async function parallel2(step: Step) {
    return await someTask('args', step);
  }
  
  await Promise.all([
    st.step('parallel_1', parallel1),
    st.step('parallel_2', parallel2),
  ]);
}

// Run the pipeline
await pipeline.track(pipelineLogic);


// ================================================
// Some simple visualization functions to use if you 
// don't want to integrate with dashboard
// ================================================

// Generate a Gantt chart Buffer using quickchart.io
const ganttChartBuffer = await pipeline.ganttQuickchart();

// Generate a Gantt chart HTML file with Google Charts
const ganttChartHtml = await pipeline.ganttGoogleChartHtml();

// Generate an execution graph URL
const executionGraphUrl = pipeline.executionGraphQuickchart();

// Get the hierarchical output of all steps
const stepsHierarchy = pipeline.outputNested();
</details> <details> <summary>Python</summary>
# ================================================
# Using Python Decorators
# ================================================
from pipelens import Pipeline, Step, with_step

@with_step('some_step')
async def some_step(input_str: str, st: Step):
    # ... some logic ...
    await st.record('key', 'value')  # Record data for analysis
    return 'some_result'  # Results are automatically recorded

@with_step('child')
async def child_func(param: int, st: Step):
    # ... some logic ...

@with_step('parent')
async def parent_func(st: Step):
    # Track nested steps
    await child_func(1, st)
    await child_func(2, st)
    
    # Track parallel steps
    import asyncio
    await asyncio.gather(
        child_func(3, st),
        child_func(4, st),
    )

async def run_pipeline(st: Step):
    await some_step('some_value', st)
    await parent_func(st)

pipeline = Pipeline('my_pipeline')
await pipeline.track(run_pipeline)

# ================================================
# Or you may optionally run without decorators
# ================================================
import asyncio
from pipelens import Pipeline, Step

# Create pipeline
pipeline = Pipeline('my-pipeline')

async def pipeline_logic(st: Step):
    # Helper function for your business logic
    async def some_task(some_args: str, step: Step):
        # ... your logic ...
        await step.record('key', 'value')  # Record data for analysis
        return 'some_result'  # Results are automatically recorded

    # Track a simple step
    async def some_step(step: Step):
        return await some_task('your_args', step)
    
    result = await st.step('some_step', some_step)
    
    # Track nested steps
    async def parent_step(step: Step):
        async def child_1(step: Step):
            return await some_task('args', step)
        
        async def child_2(step: Step):
            return await some_task('args', step)
        
        await step.step('child_1', child_1)
        await step.step('child_2', child_2)
    
    await st.step('parent', parent_step)
    
    # Track parallel steps
    async def parallel_1(step: Step):
        return await some_task('args', step)
    
    async def parallel_2(step: Step):
        return await some_task('args', step)
    
    await asyncio.gather(
        st.step('parallel_1', parallel_1),
        st.step('parallel_2', parallel_2)
    )

# Run the pipeline
await pipeline.track(pipeline_logic)

# ================================================
# Some simple visualization functions to use if you 
# don't want to integrate with dashboard
# ================================================

# Generate a Gantt chart Buffer using quickchart.io
gantt_chart_buffer = await pipeline.gantt_quickchart()

# Generate a Gantt chart HTML file with Google Charts
gantt_chart_html = await pipeline.gantt_google_chart_html()

# Generate an execution graph URL
execution_graph_url = pipeline.execution_gr
View on GitHub
GitHub Stars14
CategoryDevelopment
Updated2mo ago
Forks3

Languages

TypeScript

Security Score

95/100

Audited on Jan 28, 2026

No findings