Agentflow
Orchestrate thousands of agents and harnesses as a graph programatically
Install / Use
/learn @shouc/AgentflowREADME
AgentFlow
Orchestrate codex, claude, and kimi agents in dependency graphs with parallel fanout, iterative cycles, and remote execution on SSH/EC2/ECS.
94-node pipeline: plan → 64 workers → 8 batch merges → 16 reviews → 4 review merges → synthesis
Install
One line:
curl -fsSL https://raw.githubusercontent.com/shouc/agentflow/master/install.sh | bash
This installs agentflow, adds it to PATH, and installs the skill for Codex and Claude Code.
Or manually:
python3 -m venv .venv && . .venv/bin/activate
pip install -e .[dev]
Quick Start
from agentflow import Graph, codex, claude
with Graph("my-pipeline", concurrency=3) as g:
plan = codex(task_id="plan", prompt="Inspect the repo and plan the work.", tools="read_only")
impl = claude(task_id="impl", prompt="Implement the plan:\n{{ nodes.plan.output }}", tools="read_write")
review = codex(task_id="review", prompt="Review:\n{{ nodes.impl.output }}")
plan >> impl >> review
print(g.to_json())
agentflow run pipeline.py --output summary
Or just ask Codex (the agentflow skill is auto-installed):
codex "Use agentflow to fan out 10 codex agents, each telling a unique joke, then merge their outputs and pick the funniest one. Write the pipeline and run it."
Parallel Fanout
Fan a node into many parallel copies with fanout():
from agentflow import Graph, codex, fanout, merge
with Graph("code-review", concurrency=8) as g:
scan = codex(task_id="scan", prompt="List the top 5 files to review.")
review = fanout(
codex(task_id="review", prompt="Review {{ item.file }}:\n{{ nodes.scan.output }}"),
[{"file": "api.py"}, {"file": "auth.py"}, {"file": "db.py"}],
)
summary = codex(task_id="summary", prompt=(
"Merge findings:\n{% for r in fanouts.review.nodes %}{{ r.output }}\n{% endfor %}"
))
scan >> review >> summary
print(g.to_json())
fanout(node, source) dispatches on type:
int-- N identical copies:fanout(node, 128)list-- one per item:fanout(node, [{"repo": "api"}, ...])dict-- cartesian product:fanout(node, {"axis1": [...], "axis2": [...]})
Reduce with merge(node, source, size=N) (batch) or merge(node, source, by=["field"]) (group).
Iterative Cycles
Loop until a stop condition with on_failure:
from agentflow import Graph, codex, claude
with Graph("iterative-impl", max_iterations=5) as g:
write = codex(
task_id="write",
prompt="Write a Python email validator.\n{% if nodes.review.output %}Fix: {{ nodes.review.output }}{% endif %}",
tools="read_write",
)
review = claude(
task_id="review",
prompt="Review:\n{{ nodes.write.output }}\nIf complete, say LGTM. Otherwise list issues.",
success_criteria=[{"kind": "output_contains", "value": "LGTM"}],
)
write >> review
review.on_failure >> write # loop until LGTM or max_iterations
print(g.to_json())
Remote Execution
Run agents on remote machines -- zero config needed:
# EC2 (auto-discovers AMI, key pair, VPC)
codex(task_id="remote", prompt="...", target={"kind": "ec2", "region": "us-east-1"})
# ECS Fargate (auto-discovers VPC, builds agent image)
codex(task_id="remote", prompt="...", target={"kind": "ecs", "region": "us-east-1"})
# SSH
codex(task_id="remote", prompt="...", target={"kind": "ssh", "host": "server", "username": "deploy"})
Shared instances across nodes:
plan = codex(task_id="plan", prompt="...", target={"kind": "ec2", "shared": "dev-box"})
impl = codex(task_id="impl", prompt="...", target={"kind": "ec2", "shared": "dev-box"})
plan >> impl # same EC2 instance, files persist
Scratchboard
Shared memory file across all agents:
with Graph("campaign", scratchboard=True) as g:
shards = fanout(codex(task_id="fuzz", prompt="..."), 128)
Examples
| Example | What it does |
|---|---|
| airflow_like.py | Basic pipeline: plan → implement → review → merge |
| code_review.py | Fan out code review across files, merge findings |
| dep_audit.py | Audit each dependency for security/license issues |
| test_gap.py | Find untested modules, suggest tests per module |
| multi_agent_debate.py | Codex vs Claude: independent solve + cross-critique |
| release_check.py | Parallel release gate: tests + security + changelog |
| iterative_impl.py | Write → review → fix cycle until LGTM |
| airflow_like_fuzz_batched.py | 128-shard fanout with batch merge + periodic monitor |
| airflow_like_fuzz_grouped.py | Matrix fanout with grouped reducers |
| ec2_remote.py | Run codex on a remote EC2 instance |
| ecs_fargate.py | Run codex on ECS Fargate |
CLI
agentflow run pipeline.py # run a pipeline
agentflow run pipeline.py --output summary
agentflow inspect pipeline.py # show expanded graph
agentflow validate pipeline.py # check without running
agentflow templates # list starter templates
agentflow init > pipeline.py # scaffold a starter
Acknowledgements
Related Skills
node-connect
344.1kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
96.8kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
344.1kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
qqbot-media
344.1kQQBot 富媒体收发能力。使用 <qqmedia> 标签,系统根据文件扩展名自动识别类型(图片/语音/视频/文件)。
