Dagengine
Type-safe DAG execution engine for AI workflows
Install / Use
/learn @dagengine/DagengineREADME
@dagengine/core
<div align="center">🚀 Type-Safe DAG Engine for AI Workflows
Define task dependencies. Get automatic parallelization, cost tracking, and 10x speedup.
🚀 Quick Start • 📖 Documentation • 💬 Discussions • 🐛 Issues • 📦 Examples
</div>🎯 What is dagengine?
dagengine is a TypeScript DAG engine that turns sequential AI workflows into parallel ones automatically.
The Problem
// ❌ What most developers do (sequential, slow, expensive)
for (const item of items) {
const sentiment = await ai.analyze(item); // Wait...
const topics = await ai.extract(item); // Wait...
const summary = await ai.summarize(item); // Wait...
}
// Result: 100 items × 15 seconds = 25 minutes, $15
The Solution
// ✅ With dagengine (parallel, fast, cheap)
const engine = new DagEngine({ plugin: new MyPlugin() });
const result = await engine.process(items);
// Result: 100 items in 2.5 minutes, $5
10x faster. 67% cheaper. Zero orchestration code.
Define dependencies → get automatic parallelization.
🚀 5-Minute Quick Start
Install
npm install @dagengine/core
Requirements: Node.js ≥ 18.0.0, TypeScript ≥ 5.0 (recommended)
Example: Analyze Customer Reviews
import { DagEngine, Plugin, type PromptContext, type ProviderSelection } from '@dagengine/core';
// Define result types (optional but helps with TypeScript)
interface SentimentResult {
sentiment: "positive" | "negative" | "neutral";
score: number;
}
interface TopicsResult {
topics: string[];
}
// 1. Define your workflow
class ReviewAnalyzer extends Plugin {
constructor() {
super('analyzer', 'Review Analyzer', 'Analyzes reviews');
this.dimensions = ['sentiment', 'topics', 'summary'];
}
defineDependencies(): Record<string, string[]> {
return {
summary: ['sentiment', 'topics']
};
}
createPrompt(context: PromptContext): string {
const content = context.sections[0]?.content || '';
if (context.dimension === 'sentiment') {
return `Analyze sentiment: "${content}"
Return JSON: {"sentiment": "positive|negative|neutral", "score": 0-1}`;
}
if (context.dimension === 'topics') {
return `Extract topics: "${content}"
Return JSON: {"topics": ["topic1", "topic2"]}`;
}
if (context.dimension === 'summary') {
const sentiment = context.dependencies.sentiment?.data as SentimentResult;
const topics = context.dependencies.topics?.data as TopicsResult;
return `Create a ${sentiment.sentiment} summary covering ${topics.topics.join(', ')}:
"${content}"
Return JSON: {"summary": "summary text"}`;
}
throw new Error(`Unknown dimension: ${context.dimension}`);
}
selectProvider(): ProviderSelection {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' }
};
}
}
// 2. Process your data
async function main(): Promise<void> {
// Validate API key
if (!process.env.ANTHROPIC_API_KEY) {
console.error('❌ Missing ANTHROPIC_API_KEY environment variable');
console.error('Set it with: export ANTHROPIC_API_KEY="your-key"');
process.exit(1);
}
// Create engine
const engine = new DagEngine({
plugin: new ReviewAnalyzer(),
providers: {
anthropic: { apiKey: process.env.ANTHROPIC_API_KEY },
}
});
// Prepare reviews
const reviews = [
{ content: 'Great product!', metadata: { id: 1 } },
{ content: 'Not good.', metadata: { id: 2 } }
];
// Process
const result = await engine.process(reviews);
// Display results
console.log(JSON.stringify(result.sections[0]?.results, null, 4));
}
// 3. Run with error handling
main().catch((error: Error) => {
console.error('❌ Processing failed:', error.message);
process.exit(1);
});
What just happened?
- ✅
sentimentandtopicsran in parallel (both have no dependencies) - ✅
summarywaited for both to complete - ✅ All sections processed in parallel
- ✅ 2 reviews × 3 dimensions = 6 AI calls, all optimized automatically
Next: Full Documentation • Examples • Production Guide
📊 Why Choose dagengine?
| Feature | DIY Code | LangChain | dagengine | |---------|----------|-----------|-----------| | Setup | Manual loops | Learn LCEL | 2 methods | | Parallelization | Manual | Manual | Automatic | | Cost Tracking | Manual calc | Manual calc | Built-in | | TypeScript | ✅ Full | ⚠️ Partial | ✅ Full | | Code (100 items) | 150 lines | 80 lines | 25 lines | | Best For | Small scripts | RAG/Agents | Orchestration |
Use dagengine when:
- ✅ Processing 100+ items with multiple AI analyses
- ✅ Want automatic parallelization without complexity
- ✅ Need built-in cost tracking
- ✅ TypeScript projects
Skip dagengine when:
- ❌ Single AI calls (overkill)
- ❌ Need RAG/agents (use LangChain)
- ❌ Python projects (we're TypeScript-only)
⚡ Key Features
<table> <tr> <td width="50%" valign="top">🎯 Zero Infrastructure
Define task dependencies once. Engine handles execution order, parallelization, and coordination automatically. No queues, workers, or complex orchestration code.
💰 Cost Optimized
Skip low-value processing with conditional execution. Route tasks to optimal models. Track costs per dimension in real-time with automatic token counting.
🔄 Production Ready
Automatic retry with exponential backoff. Provider fallback chains. Graceful error recovery with partial results. Battle-tested reliability.
</td> <td width="50%" valign="top">🌍 Multi-Provider Support
Use Anthropic Claude, OpenAI GPT, Google Gemini with a unified interface. Switch providers per dimension. Mix models in one workflow.
🪝 18 Lifecycle Hooks
Full async/await support. Integrate databases, caches, APIs at every processing stage. Transform data mid-pipeline. Complete control when you need it.
📊 Real-Time Tracking
Built-in cost and token tracking per dimension and provider. Progress callbacks with throughput metrics. Detailed breakdowns in results.
</td> </tr> </table>💡 Core Concepts
1️⃣ Sections (Your Data)
const sections = [
{
content: 'Customer review text here',
metadata: { id: 1, userId: 123, productId: 'SKU-789' }
}
];
Sections are the pieces of data you analyze (reviews, emails, documents, etc.).
2️⃣ Dimensions (Your Tasks)
this.dimensions = ['sentiment', 'topics', 'summary'];
Dimensions are the analyses you run. Each dimension processes all sections.
3️⃣ Dependencies (Execution Order)
defineDependencies() {
return {
sentiment: [], // No dependencies (runs first)
topics: [], // No dependencies (runs first)
summary: ['sentiment', 'topics'] // Waits for both
};
}
Dependencies control execution order. Engine automatically parallelizes independent tasks.
Execution Plan:
sentiment ──┐
├─→ Both run in parallel → summary
topics ─────┘
4️⃣ Two Dimension Types
Section Dimensions (default) - Analyze each item independently:
this.dimensions = ['sentiment']; // Runs once per section
Global Dimensions - Analyze all items together:
this.dimensions = [
{ name: 'categorize', scope: 'global' } // Runs once for all sections
];
🎨 Advanced Features
Cost Optimization with Skip Logic
class SmartAnalyzer extends Plugin {
dimensions = ['quality_check', 'deep_analysis'];
defineDependencies() {
return { deep_analysis: ['quality_check'] };
}
shouldSkipSectionDimension(context) {
if (context.dimension === 'deep_analysis') {
const quality = context.dependencies.quality_check.data;
return quality.score < 0.7; // Skip low-quality items
}
return false;
}
selectProvider(dimension) {
if (dimension === 'quality_check') {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' } // Cheap model
};
}
return {
provider: 'anthropic',
options: { model: 'claude-3-7-sonnet-20250219' } // Expensive model
};
}
}
Result: 100 items → 40 high-quality → 60% fewer expensive API calls
Provider Fallback Chains
selectProvider() {
return {
provider: 'anthropic',
options: { model: 'claude-sonnet-4-5-20250929' },
fallbacks: [
{ provider: 'openai', options: { model: 'gpt-4o' } },
{ provider: 'gemini', options: { model: 'gemini-2.5-pro' } }
]
};
}
Automatic failover: If Anthropic fails, automatically tries OpenAI, then Gemini.
Data Transformations
class CategoryAnalyzer extends Plugin {
dimensions = [
'classify',
{ name: 'group_by_category', scope: 'global' },
'analyze_category'
];
transformSections(context) {
if (context.dimension === 'group_by_category') {
const categories = context.result.data.categories;
// Transform: 100 sections → 5 category groups
return categories.map(cat => ({
content: cat.items.join('\n\n'),
metadata: { category: cat.name, count: cat.items.length }
}));
}
}
}
Result: Analyze 5 category groups instead of 100 individual items (95% fewer API calls)
Async Integration Hooks
class DatabaseInt
