SkillAgentSearch skills...

Sdax

sdax is a lightweight, high-performance, in-process micro-orchestrator for Python's asyncio. It is designed to manage complex, tiered, parallel asynchronous tasks with a declarative API, guaranteeing a correct and predictable order of execution.

Install / Use

/learn @owebeeone/Sdax
About this skill

Quality Score

0/100

Category

Design

Supported Platforms

Universal

README

sdax - Structured Declarative Async eXecution

PyPI version Python 3.11+ License: MIT GitHub

sdax is a lightweight, high-performance, in-process micro-orchestrator for Python's asyncio. It is designed to manage complex, tiered, parallel asynchronous tasks with a declarative API, guaranteeing a correct and predictable order of execution.

It is ideal for building the internal logic of a single, fast operation, such as a complex API endpoint, where multiple dependent I/O calls (to databases, feature flags, or other services) must be reliably initialized, executed, and torn down.

Links:

Key Features

  • Graph-based scheduler (DAG): Primary execution model is a task dependency graph (directed acyclic graph). Tasks depend on other tasks by name; the analyzer groups tasks into parallelizable waves.
  • Elevator adapter (levels): A level-based API is provided as an adapter that builds a graph under the hood to simulate the classic "elevator" model.
  • Structured Lifecycle: Rigid pre-execute -> execute -> post-execute lifecycle for all tasks.
  • Guaranteed Cleanup: post-execute runs for any task whose pre-execute started (even if failed/cancelled) to ensure resources are released.
  • Immutable Builder Pattern: Build processors via fluent APIs producing immutable, reusable instances.
  • Concurrent Execution Safe: Multiple concurrent runs are fully isolated.
  • Declarative & Flexible: Task functions are frozen dataclasses with optional timeouts/retries and independent per-phase configuration.
  • Lightweight: Minimal dependencies, minimal overhead (see Performance).

Installation

pip install sdax

Or for development:

git clone https://github.com/owebeeone/sdax.git
cd sdax
pip install -e .

TaskFunction Helper Functions

SDAX provides convenient helper functions to simplify TaskFunction creation:

task_func() - Standard Async Functions

from sdax import task_func

# Simple usage
task = task_func(my_async_function)

# With custom configuration
task = task_func(my_function, timeout=30.0, retries=3, 
                 retryable_exceptions=(ValueError, RuntimeError))

task_group_func() - Functions with TaskGroup Access

from sdax import task_group_func

# For functions that need to create subtasks
async def parent_task(ctx, task_group):
    subtask = task_group.create_task(subtask_func(), name="subtask")
    return await subtask

task = task_group_func(parent_task, retries=2)

task_sync_func() - Synchronous Function Wrapper

from sdax import task_sync_func

# Wrap synchronous functions for async compatibility
def sync_function(ctx):
    # Quick synchronous work (validation, simple calculations)
    ctx["result"] = ctx.get("input", 0) * 2
    return ctx["result"]

task = task_sync_func(sync_function, retries=1)

# ⚠️ WARNING: Sync functions block the event loop!
# Use only for quick operations, not CPU-intensive work

join() - Graph Join Nodes

from sdax import join, AsyncTask, task_func

# Creates an empty "join" node in the graph
# Acts as a synchronization point for multiple dependencies
# A join node has no pre_execute, execute, or post_execute functions

processor = (
    AsyncDagTaskProcessor.builder()
    .add_task(AsyncTask("TaskA", execute=task_func(func_a)), depends_on=())
    .add_task(AsyncTask("TaskB", execute=task_func(func_b)), depends_on=())
    .add_task(join("WaitForBoth"), depends_on=("TaskA", "TaskB"))  # Synchronizes TaskA and TaskB
    .add_task(AsyncTask("TaskC", execute=task_func(func_c)), depends_on=("WaitForBoth",))
    .build()
)

Default Configuration

All helpers provide sensible defaults:

  • timeout=None (infinite timeout)
  • retries=0 (no retries)
  • initial_delay=1.0 seconds
  • backoff_factor=2.0 (exponential backoff)
  • retryable_exceptions=(TimeoutError, ConnectionError, RetryableException)

Quick Start

Graph-based (task dependency graph):

import asyncio
from dataclasses import dataclass
from sdax import AsyncTask, task_func
from sdax.sdax_core import AsyncDagTaskProcessor

@dataclass
class TaskContext:
    db_connection: Any = None
    user_id: int | None = None
    feature_flags: dict | None = None

async def open_db(ctx: TaskContext):
    ctx.db_connection = await create_database_connection()
    print("Database opened")

async def close_db(ctx: TaskContext):
    if ctx.db_connection:
        await ctx.db_connection.close()
        print("Database closed")

async def check_auth(ctx: TaskContext):
    await asyncio.sleep(0.1)
    ctx.user_id = 123

async def load_feature_flags(ctx: TaskContext):
    await asyncio.sleep(0.2)
    ctx.feature_flags = {"new_api": True}

async def fetch_user_data(ctx: TaskContext):
    if not ctx.user_id:
        raise ValueError("Auth failed")
    await asyncio.sleep(0.1)

# Fluent builder pattern with helper functions
processor = (
    AsyncDagTaskProcessor[TaskContext]
    .builder()
    .add_task(
        AsyncTask(
            name="Database", 
            pre_execute=task_func(open_db), 
            post_execute=task_func(close_db)
        ), 
        depends_on=()
    )
    .add_task(
        AsyncTask(name="Auth", pre_execute=task_func(check_auth)), 
        depends_on=("Database",)
    )
    .add_task(
        AsyncTask(name="Flags", pre_execute=task_func(load_feature_flags)), 
        depends_on=("Database",)
    )
    .add_task(
        AsyncTask(name="Fetch", execute=task_func(fetch_user_data)), 
        depends_on=("Auth",)
    )
    .build()
)

# await processor.process_tasks(TaskContext())

Note: Task names use string keys by default, but SDAX supports generic key types for more complex scenarios (see Advanced Features).

Elevator adapter (level-based API; builds a graph under the hood):

from sdax import AsyncTaskProcessor, AsyncTask, task_func

processor = (
    AsyncTaskProcessor.builder()
    .add_task(AsyncTask("Database", pre_execute=task_func(open_db), post_execute=task_func(close_db)), level=0)
    .add_task(AsyncTask("Auth", pre_execute=task_func(check_auth)), level=1)
    .add_task(AsyncTask("Flags", pre_execute=task_func(load_feature_flags)), level=1)
    .add_task(AsyncTask("Fetch", execute=task_func(fetch_user_data)), level=2)
    .build()
)
# await processor.process_tasks(TaskContext())

Important: Cleanup Guarantees & Resource Management

Critical Behavior (warning): post_execute runs for any task whose pre_execute was started, even if:

  • pre_execute raised an exception
  • pre_execute was cancelled (due to a sibling task failure)
  • pre_execute timed out

This is by design for resource management. If your pre_execute acquires resources (opens files, database connections, locks), your post_execute must be idempotent and handle partial initialization.

Example: Safe Resource Management

@dataclass
class TaskContext:
    lock: asyncio.Lock | None = None
    lock_acquired: bool = False

async def acquire_lock(ctx: TaskContext):
    ctx.lock = await some_lock.acquire()
    # If cancelled here, lock is acquired but flag not set
    ctx.lock_acquired = True

async def release_lock(ctx: TaskContext):
    # GOOD: Check if we actually acquired the lock
    if ctx.lock_acquired and ctx.lock:
        await ctx.lock.release()
    # GOOD: Or use try/except for safety
    try:
        if ctx.lock:
            await ctx.lock.release()
    except Exception:
        pass  # Already released or never acquired

Why this matters: In parallel execution, if one task fails, all other tasks in that level are cancelled. Without guaranteed cleanup, you'd leak resources.

Execution Model

Task dependency graph (directed acyclic graph, DAG)

In addition to level-based execution, sdax supports execution driven by a task dependency graph (a directed acyclic graph, DAG), where tasks declare dependencies on other tasks by name. The analyzer groups tasks into waves: a wave is a set of tasks that share the same effective prerequisite tasks and can start together as soon as those prerequisites complete.

  • Waves are start barriers only: Dependencies remain task-to-task; waves do not depend on waves. A wave becomes ready when all of its prerequisite tasks have completed their pre_execute successfully.
  • Phases:
    • pre_execute: scheduled by waves. On the first failure, remaining pre_execute tasks are cancelled; any task whose pre was started still gets post_execute later.
    • execute: runs in a single TaskGroup after all pre phases complete.
    • post_execute: runs in reverse dependency order (via reverse graph waves). Cleanup is best-effort; failures are collected without cancelling sibling cleanup.
  • Validation: The TaskAnalyzer validates the graph (cycles, missing deps) at build time.
  • Immutability: The analyzer output and processors are immutable and safe to reuse across concurrent executions.

Advanced example with complex dependencies:

from sdax import AsyncTask, task_func, join, RetryableException
from sdax.sdax_core import AsyncDagTaskProcessor

@dataclass
class DatabaseContext:
    connection: Any = None
    

Related Skills

View on GitHub
GitHub Stars11
CategoryDesign
Updated4mo ago
Forks0

Languages

Python

Security Score

87/100

Audited on Nov 7, 2025

No findings