SkillAgentSearch skills...

Aiotasks

A Celery like task manager that distributes Asyncio coroutines

Install / Use

/learn @cr0hn/Aiotasks
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

AioTasks

<div align="center">

🚀 Modern Async Task Queue for Python 3.12+

A Celery-like task manager that distributes asyncio coroutines

PyPI version Python versions License CI/CD

</div>

📋 Table of Contents


🎯 What is AioTasks?

AioTasks is a modern, high-performance task queue built on Python's asyncio. If you're familiar with Celery, you'll feel right at home - AioTasks provides a nearly identical API but is designed specifically for async/await workflows.

Perfect for:

  • 🌐 Web applications (FastAPI, aiohttp, Django async)
  • 📧 Background task processing (emails, notifications, reports)
  • 🔄 Periodic tasks and scheduling
  • 📊 Data pipelines and ETL jobs
  • 🤖 Microservices communication
  • 🔗 Celery integration and gradual migration

✨ Features

Core Features

  • 🎭 Celery-Compatible API - Same syntax, just aiotasks instead of celery
  • ⚡ Native AsyncIO - Built from scratch for async/await
  • 🔄 Multiple Backends - Memory, Redis, RabbitMQ (AMQP), ZeroMQ
  • 🏊 Pool Support - async (coroutines), thread, or process pools (Celery-like --pool)
  • 🔁 Smart Retry Logic - Exponential backoff with tenacity
  • 📊 Task Acknowledgment - Reliable ACK/NACK support
  • ⏱️ TTL Support - Automatic task expiration
  • 🎯 Type Safe - Complete type hints with modern Python
  • 🐍 Python 3.12+ - Pattern matching, StrEnum, PEP 604, type aliases

🔗 Celery Interoperability NEW in v2.3

Full Celery Protocol v2 compatibility enables seamless interoperability:

  • Send tasks from AioTasks, process with Celery workers
  • Send tasks from Celery, process with AioTasks workers
  • Mixed worker pools (some Celery, some AioTasks)
  • Gradual migration from Celery to AioTasks
  • FastAPI + existing Celery infrastructure
# Enable with one parameter
app = AioTasks(
    'myapp',
    broker='redis://localhost:6379/0',
    celery_compat=True,  # ✨ That's it!
)

📦 Result Backend NEW in v2.3

Store and retrieve task results with multiple backends:

  • Get task results by ID
  • Wait for task completion with timeout
  • Memory and Redis backends
  • Automatic result expiration (TTL)
# Configure result backend
app = AioTasks(
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',  # Results stored here
)

# Get results
result = await app.wait_for_result(task_id, timeout=60)
print(result.result)  # Task return value

⏰ Periodic Tasks NEW in v2.3

Celery Beat compatible scheduler for periodic tasks:

  • Interval schedules (every N seconds/minutes/hours)
  • Cron-style schedules
  • Start/stop scheduler independently
  • Persistent schedule configuration
from aiotasks import every, crontab

# Every hour
app.add_periodic_task(
    name="cleanup",
    schedule=every(hours=1),
    task="cleanup_old_data",
)

# Daily at 7:30 AM
app.add_periodic_task(
    name="daily_report",
    schedule=crontab(hour="7", minute="30"),
    task="generate_report",
)

await app.start_scheduler()

💀 Dead Letter Queue NEW in v2.3

Handle failed tasks gracefully:

  • Automatic DLQ for tasks that exhaust retries
  • Inspect failed tasks with full error details
  • Retry failed tasks manually or in batch
  • DLQ statistics and monitoring
# List failed tasks
failed = await app.list_failed_tasks(limit=10)

# Retry specific failed task
await app.retry_failed_task(task_id)

# Retry all failed email tasks
count = await app.retry_failed_tasks(task_name="send_email")

# Get statistics
stats = app.get_dlq_stats()
print(f"Total failed: {stats['total_tasks']}")

📦 Installation

One command installs everything:

pip install aiotasks

🎁 What's Included?

All Brokers: Redis, RabbitMQ (AMQP), ZeroMQ, Memory ✅ Performance: uvloop, ujson ✅ FastAPI: Full integration included ✅ CLI Tools: Celery-compatible commands ✅ Type Safety: Complete type hints

No optional dependencies needed - everything is included by default!


🚀 Quick Start

1. Define Your App (Celery-Style)

import asyncio
from aiotasks import AioTasks

# Create app (just like Celery!)
app = AioTasks("myapp", broker="redis://localhost:6379/0")

# Define tasks
@app.task()
async def send_email(to: str, subject: str, body: str):
    await asyncio.sleep(1)  # Simulate sending
    print(f"📧 Email sent to {to}")
    return {"status": "sent"}

2. Run the Worker

# Celery-compatible CLI - same syntax!
aiotasks -A myapp worker -l INFO -c 10

3. Queue Tasks

async def main():
    app.run()
    await send_email.delay("user@example.com", "Hello", "World!")
    await app.wait(timeout=10, exit_on_finish=True)
    app.stop()

asyncio.run(main())

📦 Result Backend

Overview

The Result Backend allows you to store and retrieve task results. This is essential for APIs that need to wait for task completion or check task status.

Configuration

from aiotasks import AioTasks

# Memory backend (development)
app = AioTasks(
    broker='memory://',
    backend='memory://',  # Results in memory
)

# Redis backend (production)
app = AioTasks(
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',  # Results in Redis
    task_ttl=3600,  # Results expire after 1 hour
)

Usage

# Define task
@app.task()
async def calculate(x: int, y: int) -> int:
    await asyncio.sleep(2)
    return x + y

# Queue task and get task ID
task_ctx = calculate.delay(4, 5)
task_id = task_ctx.task_id

# Option 1: Poll for result
result = await app.get_result(task_id)
if result and result.status == "success":
    print(result.result)  # 9

# Option 2: Wait for result (with timeout)
try:
    result = await app.wait_for_result(task_id, timeout=30)
    print(result.result)  # 9
except TimeoutError:
    print("Task didn't complete in time")
except RuntimeError as e:
    print(f"Task failed: {e}")

Result Object

class TaskResult:
    task_id: str          # Task identifier
    status: str           # pending, started, success, failure
    result: Any           # Return value (if successful)
    error: str            # Error message (if failed)
    traceback: str        # Full traceback (if failed)
    started_at: datetime  # When task started
    completed_at: datetime  # When task completed

FastAPI Integration

from fastapi import FastAPI, BackgroundTasks
from aiotasks import AioTasks

app_api = FastAPI()
tasks = AioTasks(broker='redis://...', backend='redis://...')

@app_api.post("/process")
async def process_data(data: dict):
    # Queue task
    task_ctx = process_job.delay(data)

    return {"task_id": task_ctx.task_id, "status": "processing"}

@app_api.get("/status/{task_id}")
async def get_status(task_id: str):
    result = await tasks.get_result(task_id)

    if result is None:
        return {"status": "not_found"}

    return {
        "status": result.status,
        "result": result.result if result.status == "success" else None,
        "error": result.error if result.status == "failure" else None,
    }

⏰ Periodic Tasks

Overview

Periodic tasks allow you to schedule tasks to run automatically at specific intervals or times, similar to Celery Beat.

Creating Schedules

from aiotasks import AioTasks, every, crontab

app = AioTasks(broker='redis://localhost:6379/0')

# Define your tasks
@app.task()
async def cleanup_old_data():
    # Cleanup logic
    pass

@app.task()
async def send_daily_report():
    # Report logic
    pass

@app.task()
async def health_check():
    # Health check logic
    pass

Interval Schedules

Run tasks at fixed intervals:

# Every 30 seconds
app.add_periodic_task(
    name="health_check",
    schedule=every(seconds=30),
    task="health_check",
)

# Every 5 minutes
app.add_periodic_task(
    name="quick_cleanup",
    schedule=every(minutes=5),
    task="cleanup_old_data",
)

# Every 2 hours
app.add_periodic_task(
    name="hourly_sync",
    schedule=every(hours=2),
    task="sync_data",
)

# Every day
app.add_periodic_task(
    name="daily_backup",
    schedule=every(days=1),
    task="backup_database",
)

Cron Schedules

Use cron-style expressions for more complex schedules:

# Every 15 minutes
app.add_periodic_task(
    name="frequent_check",
    schedule=crontab(minute="*/15"),
    task="check_status",
)

# Daily at 7:30 AM
app.add_periodic_task(
    name="morning_report",
    schedule=crontab(hour="7", minute="30"),
    task="send_daily_report",
)

# Every Monday at 9 AM
app.add_periodic_task(
    name="weekly_cleanup",
    schedule=crontab(hour="9", minute="0", day_of_week="1"),
    task="weekly_maintenance",
)

# First day of month at midnight
app.add_periodic_task(
    name="monthly_billing",
    schedule=crontab(hour="0", minute="0", day_of_month="1"),
    task="process_billing",
)

# Every 2 hours at :30 (2:30, 4:30,
View on GitHub
GitHub Stars448
CategoryDevelopment
Updated13d ago
Forks36

Languages

Python

Security Score

85/100

Audited on Mar 20, 2026

No findings