Sdax
sdax is a lightweight, high-performance, in-process micro-orchestrator for Python's asyncio. It is designed to manage complex, tiered, parallel asynchronous tasks with a declarative API, guaranteeing a correct and predictable order of execution.
Install / Use
/learn @owebeeone/SdaxREADME
sdax - Structured Declarative Async eXecution
sdax is a lightweight, high-performance, in-process micro-orchestrator for Python's asyncio. It is designed to manage complex, tiered, parallel asynchronous tasks with a declarative API, guaranteeing a correct and predictable order of execution.
It is ideal for building the internal logic of a single, fast operation, such as a complex API endpoint, where multiple dependent I/O calls (to databases, feature flags, or other services) must be reliably initialized, executed, and torn down.
Links:
Key Features
- Graph-based scheduler (DAG): Primary execution model is a task dependency graph (directed acyclic graph). Tasks depend on other tasks by name; the analyzer groups tasks into parallelizable waves.
- Elevator adapter (levels): A level-based API is provided as an adapter that builds a graph under the hood to simulate the classic "elevator" model.
- Structured Lifecycle: Rigid
pre-execute->execute->post-executelifecycle for all tasks. - Guaranteed Cleanup:
post-executeruns for any task whosepre-executestarted (even if failed/cancelled) to ensure resources are released. - Immutable Builder Pattern: Build processors via fluent APIs producing immutable, reusable instances.
- Concurrent Execution Safe: Multiple concurrent runs are fully isolated.
- Declarative & Flexible: Task functions are frozen dataclasses with optional timeouts/retries and independent per-phase configuration.
- Lightweight: Minimal dependencies, minimal overhead (see Performance).
Installation
pip install sdax
Or for development:
git clone https://github.com/owebeeone/sdax.git
cd sdax
pip install -e .
TaskFunction Helper Functions
SDAX provides convenient helper functions to simplify TaskFunction creation:
task_func() - Standard Async Functions
from sdax import task_func
# Simple usage
task = task_func(my_async_function)
# With custom configuration
task = task_func(my_function, timeout=30.0, retries=3,
retryable_exceptions=(ValueError, RuntimeError))
task_group_func() - Functions with TaskGroup Access
from sdax import task_group_func
# For functions that need to create subtasks
async def parent_task(ctx, task_group):
subtask = task_group.create_task(subtask_func(), name="subtask")
return await subtask
task = task_group_func(parent_task, retries=2)
task_sync_func() - Synchronous Function Wrapper
from sdax import task_sync_func
# Wrap synchronous functions for async compatibility
def sync_function(ctx):
# Quick synchronous work (validation, simple calculations)
ctx["result"] = ctx.get("input", 0) * 2
return ctx["result"]
task = task_sync_func(sync_function, retries=1)
# ⚠️ WARNING: Sync functions block the event loop!
# Use only for quick operations, not CPU-intensive work
join() - Graph Join Nodes
from sdax import join, AsyncTask, task_func
# Creates an empty "join" node in the graph
# Acts as a synchronization point for multiple dependencies
# A join node has no pre_execute, execute, or post_execute functions
processor = (
AsyncDagTaskProcessor.builder()
.add_task(AsyncTask("TaskA", execute=task_func(func_a)), depends_on=())
.add_task(AsyncTask("TaskB", execute=task_func(func_b)), depends_on=())
.add_task(join("WaitForBoth"), depends_on=("TaskA", "TaskB")) # Synchronizes TaskA and TaskB
.add_task(AsyncTask("TaskC", execute=task_func(func_c)), depends_on=("WaitForBoth",))
.build()
)
Default Configuration
All helpers provide sensible defaults:
timeout=None(infinite timeout)retries=0(no retries)initial_delay=1.0secondsbackoff_factor=2.0(exponential backoff)retryable_exceptions=(TimeoutError, ConnectionError, RetryableException)
Quick Start
Graph-based (task dependency graph):
import asyncio
from dataclasses import dataclass
from sdax import AsyncTask, task_func
from sdax.sdax_core import AsyncDagTaskProcessor
@dataclass
class TaskContext:
db_connection: Any = None
user_id: int | None = None
feature_flags: dict | None = None
async def open_db(ctx: TaskContext):
ctx.db_connection = await create_database_connection()
print("Database opened")
async def close_db(ctx: TaskContext):
if ctx.db_connection:
await ctx.db_connection.close()
print("Database closed")
async def check_auth(ctx: TaskContext):
await asyncio.sleep(0.1)
ctx.user_id = 123
async def load_feature_flags(ctx: TaskContext):
await asyncio.sleep(0.2)
ctx.feature_flags = {"new_api": True}
async def fetch_user_data(ctx: TaskContext):
if not ctx.user_id:
raise ValueError("Auth failed")
await asyncio.sleep(0.1)
# Fluent builder pattern with helper functions
processor = (
AsyncDagTaskProcessor[TaskContext]
.builder()
.add_task(
AsyncTask(
name="Database",
pre_execute=task_func(open_db),
post_execute=task_func(close_db)
),
depends_on=()
)
.add_task(
AsyncTask(name="Auth", pre_execute=task_func(check_auth)),
depends_on=("Database",)
)
.add_task(
AsyncTask(name="Flags", pre_execute=task_func(load_feature_flags)),
depends_on=("Database",)
)
.add_task(
AsyncTask(name="Fetch", execute=task_func(fetch_user_data)),
depends_on=("Auth",)
)
.build()
)
# await processor.process_tasks(TaskContext())
Note: Task names use string keys by default, but SDAX supports generic key types for more complex scenarios (see Advanced Features).
Elevator adapter (level-based API; builds a graph under the hood):
from sdax import AsyncTaskProcessor, AsyncTask, task_func
processor = (
AsyncTaskProcessor.builder()
.add_task(AsyncTask("Database", pre_execute=task_func(open_db), post_execute=task_func(close_db)), level=0)
.add_task(AsyncTask("Auth", pre_execute=task_func(check_auth)), level=1)
.add_task(AsyncTask("Flags", pre_execute=task_func(load_feature_flags)), level=1)
.add_task(AsyncTask("Fetch", execute=task_func(fetch_user_data)), level=2)
.build()
)
# await processor.process_tasks(TaskContext())
Important: Cleanup Guarantees & Resource Management
Critical Behavior (warning): post_execute runs for any task whose pre_execute was started, even if:
pre_executeraised an exceptionpre_executewas cancelled (due to a sibling task failure)pre_executetimed out
This is by design for resource management. If your pre_execute acquires resources (opens files, database connections, locks), your post_execute must be idempotent and handle partial initialization.
Example: Safe Resource Management
@dataclass
class TaskContext:
lock: asyncio.Lock | None = None
lock_acquired: bool = False
async def acquire_lock(ctx: TaskContext):
ctx.lock = await some_lock.acquire()
# If cancelled here, lock is acquired but flag not set
ctx.lock_acquired = True
async def release_lock(ctx: TaskContext):
# GOOD: Check if we actually acquired the lock
if ctx.lock_acquired and ctx.lock:
await ctx.lock.release()
# GOOD: Or use try/except for safety
try:
if ctx.lock:
await ctx.lock.release()
except Exception:
pass # Already released or never acquired
Why this matters: In parallel execution, if one task fails, all other tasks in that level are cancelled. Without guaranteed cleanup, you'd leak resources.
Execution Model
Task dependency graph (directed acyclic graph, DAG)
In addition to level-based execution, sdax supports execution driven by a task dependency graph (a directed acyclic graph, DAG), where tasks declare dependencies on other tasks by name. The analyzer groups tasks into waves: a wave is a set of tasks that share the same effective prerequisite tasks and can start together as soon as those prerequisites complete.
- Waves are start barriers only: Dependencies remain task-to-task; waves do not depend on waves. A wave becomes ready when all of its prerequisite tasks have completed their
pre_executesuccessfully. - Phases:
pre_execute: scheduled by waves. On the first failure, remainingpre_executetasks are cancelled; any task whose pre was started still getspost_executelater.execute: runs in a single TaskGroup after all pre phases complete.post_execute: runs in reverse dependency order (via reverse graph waves). Cleanup is best-effort; failures are collected without cancelling sibling cleanup.
- Validation: The
TaskAnalyzervalidates the graph (cycles, missing deps) at build time. - Immutability: The analyzer output and processors are immutable and safe to reuse across concurrent executions.
Advanced example with complex dependencies:
from sdax import AsyncTask, task_func, join, RetryableException
from sdax.sdax_core import AsyncDagTaskProcessor
@dataclass
class DatabaseContext:
connection: Any = None
Related Skills
diffs
344.4kUse the diffs tool to produce real, shareable diffs (viewer URL, file artifact, or both) instead of manual edit summaries.
clearshot
Structured screenshot analysis for UI implementation and critique. Analyzes every UI screenshot with a 5×5 spatial grid, full element inventory, and design system extraction — facts and taste together, every time. Escalates to full implementation blueprint when building. Trigger on any digital interface image file (png, jpg, gif, webp — websites, apps, dashboards, mockups, wireframes) or commands like 'analyse this screenshot,' 'rebuild this,' 'match this design,' 'clone this.' Skip for non-UI images (photos, memes, charts) unless the user explicitly wants to build a UI from them. Does NOT trigger on HTML source code, CSS, SVGs, or any code pasted as text.
openpencil
2.0kThe world's first open-source AI-native vector design tool and the first to feature concurrent Agent Teams. Design-as-Code. Turn prompts into UI directly on the live canvas. A modern alternative to Pencil.
ui-ux-pro-max-skill
56.5kAn AI SKILL that provide design intelligence for building professional UI/UX multiple platforms
