RustDagcuter
Dagcuter is a Rust library for executing Directed Acyclic Graphs (DAGs) of tasks. It manages task dependencies, detects circular dependencies, and supports customizable task lifecycles (PreExecution, Execute, and PostExecution). It also enables concurrent execution of independent tasks for improved performance.
Install / Use
/learn @busyster996/RustDagcuterREADME
Dagcuter 🚀
RustDagcuter is a Rust library for executing directed acyclic graphs (DAGs) of tasks. It manages task dependencies, detects cyclic dependencies, and supports customizable task lifecycles (pre-execution, post-execution). It also supports concurrent execution of independent tasks to improve performance.
✨ Core functions
- Intelligent dependency management: Automatically parse and schedule multi-task dependencies.
- Loop detection: Real-time discovery and prevention of loop dependencies.
- High concurrent execution: Topological sorting drives parallel operation, making full use of multi-cores.
- Exponential backoff retry: Built-in configurable retry strategy; supports custom intervals, multiples and maximum times.
- Graceful cancellation: Supports mid-way cancellation and resource release.
- Execution tracking: Real-time printing of task status and execution order.
- Type safety: Static type guarantee, compile-time error checking.
- Zero cost abstraction: Minimal runtime overhead.
- Life cycle hook: Custom logic can be inserted before/after task execution.
🏗️ Project structure
dagcuter/
├─ src/
│ ├─ lib.rs # Core exports and type definitions
│ └─ executor.rs # DAG Executor Core Logic
├─ examples/ # Example code
| ├─ src/
| │ └─ main.rs
| └─ Cargo.toml
├─ Cargo.toml
└─ README.md
🚀 Quick start
- Add dependencies in
Cargo.toml:
rs-dagcuter = { version = "0.1.0" }
tokio = { version = "1.0", features = ["full"] }
async-trait = "0.1"
tokio-util = "0.7"
serde_json = "1.0"
chrono = "0.4"
- Write the task and execute it:
use rs_dagcuter::*;
use async_trait::async_trait;
use std::collections::HashMap;
use tokio_util::sync::CancellationToken;
use std::sync::Arc;
// 示例任务实现
struct ExampleTask {
name: String,
deps: Vec<String>,
}
#[async_trait]
impl Task for ExampleTask {
fn name(&self) -> &str {
&self.name
}
fn dependencies(&self) -> Vec<String> {
self.deps.clone()
}
fn retry_policy(&self) -> Option<RetryPolicy> {
Some(RetryPolicy {
max_attempts: 3,
..Default::default()
})
}
async fn execute(
&self,
_ctx: CancellationToken,
_input: &TaskInput,
) -> Result<TaskResult, Error> {
println!("执行任务: {}", self.name);
// 模拟任务执行时间
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut result = HashMap::new();
result.insert("status".to_string(), serde_json::json!("completed"));
result.insert("task_name".to_string(), serde_json::json!(self.name));
result.insert("timestamp".to_string(), serde_json::json!(chrono::Utc::now().to_rfc3339()));
Ok(result)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut tasks: HashMap<String, BoxTask> = HashMap::new();
tasks.insert("task1".to_string(), Arc::new(ExampleTask {
name: "task1".to_string(),
deps: vec![],
}));
tasks.insert("task2".to_string(), Arc::new(ExampleTask {
name: "task2".to_string(),
deps: vec!["task1".to_string()],
}));
tasks.insert("task3".to_string(), Arc::new(ExampleTask {
name: "task3".to_string(),
deps: vec!["task1".to_string()],
}));
tasks.insert("task4".to_string(), Arc::new(ExampleTask {
name: "task4".to_string(),
deps: vec!["task2".to_string(), "task3".to_string()],
}));
tasks.insert("task5".to_string(), Arc::new(ExampleTask {
name: "task5".to_string(),
deps: vec!["task2".to_string()],
}));
tasks.insert("task6".to_string(), Arc::new(ExampleTask {
name: "task6".to_string(),
deps: vec!["task1".to_string(), "task4".to_string(), "task5".to_string()],
}));
let mut dag = Dag::new(tasks)?;
let ctx = CancellationToken::new();
println!("=== 任务依赖图 ===");
dag.print_graph();
println!("=== 开始执行任务 ===");
let start = std::time::Instant::now();
let results = dag.execute(ctx).await?;
let duration = start.elapsed();
println!("=== 执行完成 ===");
println!("执行时间: {:?}", duration);
println!("执行结果: {:#?}", results);
println!("执行顺序: {}", dag.execution_order().await);
Ok(())
}
- Run the example:
cd example
cargo run
📚 API Overview
Task attribute
#[async_trait]
pub trait Task: Send + Sync {
fn name(&self) -> &str;
fn dependencies(&self) -> Vec<String>;
fn retry_policy(&self) -> Option<RetryPolicy>;
async fn pre_execution(
&self,
_ctx: CancellationToken,
_input: &TaskInput,
) -> Result<(), Error> {
Ok(())
}
async fn execute(
&self,
ctx: CancellationToken,
input: &TaskInput,
) -> Result<TaskResult, Error>;
async fn post_execution(
&self,
_ctx: CancellationToken,
_output: &TaskResult,
) -> Result<(), Error> {
Ok(())
}
}
RetryPolicy
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryPolicy {
pub interval: Duration, // Initial retry interval
pub max_interval: Duration, // Maximum retry interval
pub max_attempts: i32, // Maximum number of retries
pub multiplier: f64, // Retry interval exponential
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
interval: Duration::from_secs(1),
max_interval: Duration::from_secs(30),
max_attempts: 1,
multiplier: 2.0,
}
}
}
Dag
impl Dag {
/// Create a new DAG instance
pub fn new(tasks: HashMap<String, BoxTask>) -> Result<Self, Error>;
/// Execute all tasks in the DAG
pub async fn execute(
&mut self,
ctx: CancellationToken,
) -> Result<HashMap<String, TaskResult>, Error>;
/// Get the execution order of the DAG
pub async fn execution_order(&self) -> String;
/// Print the DAG graph
pub fn print_graph(&self);
}
Error
#[derive(Error, Debug)]
pub enum Error {
#[error("Circular dependency detected")]
CircularDependency,
#[error("Task execution failed: {0}")]
TaskExecution(String),
#[error("Context cancelled: {0}")]
ContextCancelled(String),
#[error("Retry failed: {0}")]
RetryFailed(String),
}
🔧 Advanced usage
-
Custom retry: adjust
interval,multiplier,max_attempts -
Lifecycle hook: override
pre_execution/post_execution -
Cancellation and timeout: combine
CancellationTokento control execution -
Complex data flow: process
TaskInputinexecuteand return a customTaskResult
📝 License
This project adopts the MIT protocol, see LICENSE for details.
Related Skills
openhue
345.4kControl Philips Hue lights and scenes via the OpenHue CLI.
sag
345.4kElevenLabs text-to-speech with mac-style say UX.
weather
345.4kGet current weather and forecasts via wttr.in or Open-Meteo
tweakcc
1.5kCustomize Claude Code's system prompts, create custom toolsets, input pattern highlighters, themes/thinking verbs/spinners, customize input box & user message styling, support AGENTS.md, unlock private/unreleased features, and much more. Supports both native/npm installs on all platforms.
