SkillAgentSearch skills...

Dagengine

Type-safe DAG execution engine for AI workflows

Install / Use

/learn @dagengine/Dagengine

README

@dagengine/core

<div align="center">

🚀 Type-Safe DAG Engine for AI Workflows

Define task dependencies. Get automatic parallelization, cost tracking, and 10x speedup.

npm version License TypeScript

🚀 Quick Start📖 Documentation💬 Discussions🐛 Issues📦 Examples

</div>

🎯 What is dagengine?

dagengine is a TypeScript DAG engine that turns sequential AI workflows into parallel ones automatically.

The Problem

// ❌ What most developers do (sequential, slow, expensive)
for (const item of items) {
  const sentiment = await ai.analyze(item);  // Wait...
  const topics = await ai.extract(item);     // Wait...
  const summary = await ai.summarize(item);  // Wait...
}
// Result: 100 items × 15 seconds = 25 minutes, $15

The Solution

// ✅ With dagengine (parallel, fast, cheap)
const engine = new DagEngine({ plugin: new MyPlugin() });
const result = await engine.process(items);
// Result: 100 items in 2.5 minutes, $5

10x faster. 67% cheaper. Zero orchestration code.

Define dependencies → get automatic parallelization.


🚀 5-Minute Quick Start

Install

npm install @dagengine/core

Requirements: Node.js ≥ 18.0.0, TypeScript ≥ 5.0 (recommended)

Example: Analyze Customer Reviews

import { DagEngine, Plugin, type PromptContext, type ProviderSelection } from '@dagengine/core';

// Define result types (optional but helps with TypeScript)
interface SentimentResult {
	sentiment: "positive" | "negative" | "neutral";
	score: number;
}

interface TopicsResult {
	topics: string[];
}

// 1. Define your workflow
class ReviewAnalyzer extends Plugin {
	constructor() {
		super('analyzer', 'Review Analyzer', 'Analyzes reviews');
		this.dimensions = ['sentiment', 'topics', 'summary'];
	}

	defineDependencies(): Record<string, string[]> {
		return {
			summary: ['sentiment', 'topics']
		};
	}

	createPrompt(context: PromptContext): string {
		const content = context.sections[0]?.content || '';

		if (context.dimension === 'sentiment') {
			return `Analyze sentiment: "${content}"
      Return JSON: {"sentiment": "positive|negative|neutral", "score": 0-1}`;
		}

		if (context.dimension === 'topics') {
			return `Extract topics: "${content}"
      Return JSON: {"topics": ["topic1", "topic2"]}`;
		}

		if (context.dimension === 'summary') {
			const sentiment = context.dependencies.sentiment?.data as SentimentResult;
			const topics = context.dependencies.topics?.data as TopicsResult;


			return `Create a ${sentiment.sentiment} summary covering ${topics.topics.join(', ')}:
      "${content}"
      Return JSON: {"summary": "summary text"}`;
		}

		throw new Error(`Unknown dimension: ${context.dimension}`);
	}

	selectProvider(): ProviderSelection {
		return {
			provider: 'anthropic',
			options: { model: 'claude-3-5-haiku-20241022' }
		};
	}
}

// 2. Process your data
async function main(): Promise<void> {
	// Validate API key
	if (!process.env.ANTHROPIC_API_KEY) {
		console.error('❌ Missing ANTHROPIC_API_KEY environment variable');
		console.error('Set it with: export ANTHROPIC_API_KEY="your-key"');
		process.exit(1);
	}

	// Create engine
	const engine = new DagEngine({
		plugin: new ReviewAnalyzer(),
		providers: {
			anthropic: { apiKey: process.env.ANTHROPIC_API_KEY },
		}
	});

	// Prepare reviews
	const reviews = [
		{ content: 'Great product!', metadata: { id: 1 } },
		{ content: 'Not good.', metadata: { id: 2 } }
	];

	// Process
	const result = await engine.process(reviews);

	// Display results
	console.log(JSON.stringify(result.sections[0]?.results, null, 4));
}

// 3. Run with error handling
main().catch((error: Error) => {
	console.error('❌ Processing failed:', error.message);
	process.exit(1);
});

What just happened?

  • sentiment and topics ran in parallel (both have no dependencies)
  • summary waited for both to complete
  • ✅ All sections processed in parallel
  • ✅ 2 reviews × 3 dimensions = 6 AI calls, all optimized automatically

Next: Full DocumentationExamplesProduction Guide


📊 Why Choose dagengine?

| Feature | DIY Code | LangChain | dagengine | |---------|----------|-----------|-----------| | Setup | Manual loops | Learn LCEL | 2 methods | | Parallelization | Manual | Manual | Automatic | | Cost Tracking | Manual calc | Manual calc | Built-in | | TypeScript | ✅ Full | ⚠️ Partial | ✅ Full | | Code (100 items) | 150 lines | 80 lines | 25 lines | | Best For | Small scripts | RAG/Agents | Orchestration |

Use dagengine when:

  • ✅ Processing 100+ items with multiple AI analyses
  • ✅ Want automatic parallelization without complexity
  • ✅ Need built-in cost tracking
  • ✅ TypeScript projects

Skip dagengine when:

  • ❌ Single AI calls (overkill)
  • ❌ Need RAG/agents (use LangChain)
  • ❌ Python projects (we're TypeScript-only)

⚡ Key Features

<table> <tr> <td width="50%" valign="top">

🎯 Zero Infrastructure

Define task dependencies once. Engine handles execution order, parallelization, and coordination automatically. No queues, workers, or complex orchestration code.

💰 Cost Optimized

Skip low-value processing with conditional execution. Route tasks to optimal models. Track costs per dimension in real-time with automatic token counting.

🔄 Production Ready

Automatic retry with exponential backoff. Provider fallback chains. Graceful error recovery with partial results. Battle-tested reliability.

</td> <td width="50%" valign="top">

🌍 Multi-Provider Support

Use Anthropic Claude, OpenAI GPT, Google Gemini with a unified interface. Switch providers per dimension. Mix models in one workflow.

🪝 18 Lifecycle Hooks

Full async/await support. Integrate databases, caches, APIs at every processing stage. Transform data mid-pipeline. Complete control when you need it.

📊 Real-Time Tracking

Built-in cost and token tracking per dimension and provider. Progress callbacks with throughput metrics. Detailed breakdowns in results.

</td> </tr> </table>

💡 Core Concepts

1️⃣ Sections (Your Data)

const sections = [
  { 
    content: 'Customer review text here',
    metadata: { id: 1, userId: 123, productId: 'SKU-789' }
  }
];

Sections are the pieces of data you analyze (reviews, emails, documents, etc.).

2️⃣ Dimensions (Your Tasks)

this.dimensions = ['sentiment', 'topics', 'summary'];

Dimensions are the analyses you run. Each dimension processes all sections.

3️⃣ Dependencies (Execution Order)

defineDependencies() {
  return {
    sentiment: [],           // No dependencies (runs first)
    topics: [],              // No dependencies (runs first)
    summary: ['sentiment', 'topics']  // Waits for both
  };
}

Dependencies control execution order. Engine automatically parallelizes independent tasks.

Execution Plan:
sentiment ──┐
            ├─→ Both run in parallel → summary
topics ─────┘

4️⃣ Two Dimension Types

Section Dimensions (default) - Analyze each item independently:

this.dimensions = ['sentiment'];  // Runs once per section

Global Dimensions - Analyze all items together:

this.dimensions = [
  { name: 'categorize', scope: 'global' }  // Runs once for all sections
];

🎨 Advanced Features

Cost Optimization with Skip Logic

class SmartAnalyzer extends Plugin {
  dimensions = ['quality_check', 'deep_analysis'];
  
  defineDependencies() {
    return { deep_analysis: ['quality_check'] };
  }

  shouldSkipSectionDimension(context) {
    if (context.dimension === 'deep_analysis') {
      const quality = context.dependencies.quality_check.data;
      return quality.score < 0.7;  // Skip low-quality items
    }
    return false;
  }

  selectProvider(dimension) {
    if (dimension === 'quality_check') {
      return {
        provider: 'anthropic',
        options: { model: 'claude-3-5-haiku-20241022' }  // Cheap model
      };
    }
    
    return {
      provider: 'anthropic',
      options: { model: 'claude-3-7-sonnet-20250219' }  // Expensive model
    };
  }
}

Result: 100 items → 40 high-quality → 60% fewer expensive API calls

Provider Fallback Chains

selectProvider() {
  return {
    provider: 'anthropic',
    options: { model: 'claude-sonnet-4-5-20250929' },
    fallbacks: [
      { provider: 'openai', options: { model: 'gpt-4o' } },
      { provider: 'gemini', options: { model: 'gemini-2.5-pro' } }
    ]
  };
}

Automatic failover: If Anthropic fails, automatically tries OpenAI, then Gemini.

Data Transformations

class CategoryAnalyzer extends Plugin {
  dimensions = [
    'classify',
    { name: 'group_by_category', scope: 'global' },
    'analyze_category'
  ];

  transformSections(context) {
    if (context.dimension === 'group_by_category') {
      const categories = context.result.data.categories;
      
      // Transform: 100 sections → 5 category groups
      return categories.map(cat => ({
        content: cat.items.join('\n\n'),
        metadata: { category: cat.name, count: cat.items.length }
      }));
    }
  }
}

Result: Analyze 5 category groups instead of 100 individual items (95% fewer API calls)

Async Integration Hooks

class DatabaseInt
View on GitHub
GitHub Stars12
CategoryDevelopment
Updated2d ago
Forks0

Languages

TypeScript

Security Score

95/100

Audited on Mar 30, 2026

No findings