Aiotasks
A Celery like task manager that distributes Asyncio coroutines
Install / Use
/learn @cr0hn/AiotasksREADME
AioTasks
<div align="center">🚀 Modern Async Task Queue for Python 3.12+
A Celery-like task manager that distributes asyncio coroutines
</div>📋 Table of Contents
- What is AioTasks?
- Features
- Installation
- Quick Start
- Core Concepts
- Result Backend
- Periodic Tasks
- Dead Letter Queue
- Celery Interoperability
- Pool Support
- Backends
- CLI Reference
- Examples
- Migration Guide
- FAQ
- Contributing
- License
🎯 What is AioTasks?
AioTasks is a modern, high-performance task queue built on Python's asyncio. If you're familiar with Celery, you'll feel right at home - AioTasks provides a nearly identical API but is designed specifically for async/await workflows.
Perfect for:
- 🌐 Web applications (FastAPI, aiohttp, Django async)
- 📧 Background task processing (emails, notifications, reports)
- 🔄 Periodic tasks and scheduling
- 📊 Data pipelines and ETL jobs
- 🤖 Microservices communication
- 🔗 Celery integration and gradual migration
✨ Features
Core Features
- 🎭 Celery-Compatible API - Same syntax, just
aiotasksinstead ofcelery - ⚡ Native AsyncIO - Built from scratch for async/await
- 🔄 Multiple Backends - Memory, Redis, RabbitMQ (AMQP), ZeroMQ
- 🏊 Pool Support - async (coroutines), thread, or process pools (Celery-like
--pool) - 🔁 Smart Retry Logic - Exponential backoff with tenacity
- 📊 Task Acknowledgment - Reliable ACK/NACK support
- ⏱️ TTL Support - Automatic task expiration
- 🎯 Type Safe - Complete type hints with modern Python
- 🐍 Python 3.12+ - Pattern matching, StrEnum, PEP 604, type aliases
🔗 Celery Interoperability NEW in v2.3
Full Celery Protocol v2 compatibility enables seamless interoperability:
- Send tasks from AioTasks, process with Celery workers
- Send tasks from Celery, process with AioTasks workers
- Mixed worker pools (some Celery, some AioTasks)
- Gradual migration from Celery to AioTasks
- FastAPI + existing Celery infrastructure
# Enable with one parameter
app = AioTasks(
'myapp',
broker='redis://localhost:6379/0',
celery_compat=True, # ✨ That's it!
)
📦 Result Backend NEW in v2.3
Store and retrieve task results with multiple backends:
- Get task results by ID
- Wait for task completion with timeout
- Memory and Redis backends
- Automatic result expiration (TTL)
# Configure result backend
app = AioTasks(
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1', # Results stored here
)
# Get results
result = await app.wait_for_result(task_id, timeout=60)
print(result.result) # Task return value
⏰ Periodic Tasks NEW in v2.3
Celery Beat compatible scheduler for periodic tasks:
- Interval schedules (every N seconds/minutes/hours)
- Cron-style schedules
- Start/stop scheduler independently
- Persistent schedule configuration
from aiotasks import every, crontab
# Every hour
app.add_periodic_task(
name="cleanup",
schedule=every(hours=1),
task="cleanup_old_data",
)
# Daily at 7:30 AM
app.add_periodic_task(
name="daily_report",
schedule=crontab(hour="7", minute="30"),
task="generate_report",
)
await app.start_scheduler()
💀 Dead Letter Queue NEW in v2.3
Handle failed tasks gracefully:
- Automatic DLQ for tasks that exhaust retries
- Inspect failed tasks with full error details
- Retry failed tasks manually or in batch
- DLQ statistics and monitoring
# List failed tasks
failed = await app.list_failed_tasks(limit=10)
# Retry specific failed task
await app.retry_failed_task(task_id)
# Retry all failed email tasks
count = await app.retry_failed_tasks(task_name="send_email")
# Get statistics
stats = app.get_dlq_stats()
print(f"Total failed: {stats['total_tasks']}")
📦 Installation
One command installs everything:
pip install aiotasks
🎁 What's Included?
✅ All Brokers: Redis, RabbitMQ (AMQP), ZeroMQ, Memory ✅ Performance: uvloop, ujson ✅ FastAPI: Full integration included ✅ CLI Tools: Celery-compatible commands ✅ Type Safety: Complete type hints
No optional dependencies needed - everything is included by default!
🚀 Quick Start
1. Define Your App (Celery-Style)
import asyncio
from aiotasks import AioTasks
# Create app (just like Celery!)
app = AioTasks("myapp", broker="redis://localhost:6379/0")
# Define tasks
@app.task()
async def send_email(to: str, subject: str, body: str):
await asyncio.sleep(1) # Simulate sending
print(f"📧 Email sent to {to}")
return {"status": "sent"}
2. Run the Worker
# Celery-compatible CLI - same syntax!
aiotasks -A myapp worker -l INFO -c 10
3. Queue Tasks
async def main():
app.run()
await send_email.delay("user@example.com", "Hello", "World!")
await app.wait(timeout=10, exit_on_finish=True)
app.stop()
asyncio.run(main())
📦 Result Backend
Overview
The Result Backend allows you to store and retrieve task results. This is essential for APIs that need to wait for task completion or check task status.
Configuration
from aiotasks import AioTasks
# Memory backend (development)
app = AioTasks(
broker='memory://',
backend='memory://', # Results in memory
)
# Redis backend (production)
app = AioTasks(
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1', # Results in Redis
task_ttl=3600, # Results expire after 1 hour
)
Usage
# Define task
@app.task()
async def calculate(x: int, y: int) -> int:
await asyncio.sleep(2)
return x + y
# Queue task and get task ID
task_ctx = calculate.delay(4, 5)
task_id = task_ctx.task_id
# Option 1: Poll for result
result = await app.get_result(task_id)
if result and result.status == "success":
print(result.result) # 9
# Option 2: Wait for result (with timeout)
try:
result = await app.wait_for_result(task_id, timeout=30)
print(result.result) # 9
except TimeoutError:
print("Task didn't complete in time")
except RuntimeError as e:
print(f"Task failed: {e}")
Result Object
class TaskResult:
task_id: str # Task identifier
status: str # pending, started, success, failure
result: Any # Return value (if successful)
error: str # Error message (if failed)
traceback: str # Full traceback (if failed)
started_at: datetime # When task started
completed_at: datetime # When task completed
FastAPI Integration
from fastapi import FastAPI, BackgroundTasks
from aiotasks import AioTasks
app_api = FastAPI()
tasks = AioTasks(broker='redis://...', backend='redis://...')
@app_api.post("/process")
async def process_data(data: dict):
# Queue task
task_ctx = process_job.delay(data)
return {"task_id": task_ctx.task_id, "status": "processing"}
@app_api.get("/status/{task_id}")
async def get_status(task_id: str):
result = await tasks.get_result(task_id)
if result is None:
return {"status": "not_found"}
return {
"status": result.status,
"result": result.result if result.status == "success" else None,
"error": result.error if result.status == "failure" else None,
}
⏰ Periodic Tasks
Overview
Periodic tasks allow you to schedule tasks to run automatically at specific intervals or times, similar to Celery Beat.
Creating Schedules
from aiotasks import AioTasks, every, crontab
app = AioTasks(broker='redis://localhost:6379/0')
# Define your tasks
@app.task()
async def cleanup_old_data():
# Cleanup logic
pass
@app.task()
async def send_daily_report():
# Report logic
pass
@app.task()
async def health_check():
# Health check logic
pass
Interval Schedules
Run tasks at fixed intervals:
# Every 30 seconds
app.add_periodic_task(
name="health_check",
schedule=every(seconds=30),
task="health_check",
)
# Every 5 minutes
app.add_periodic_task(
name="quick_cleanup",
schedule=every(minutes=5),
task="cleanup_old_data",
)
# Every 2 hours
app.add_periodic_task(
name="hourly_sync",
schedule=every(hours=2),
task="sync_data",
)
# Every day
app.add_periodic_task(
name="daily_backup",
schedule=every(days=1),
task="backup_database",
)
Cron Schedules
Use cron-style expressions for more complex schedules:
# Every 15 minutes
app.add_periodic_task(
name="frequent_check",
schedule=crontab(minute="*/15"),
task="check_status",
)
# Daily at 7:30 AM
app.add_periodic_task(
name="morning_report",
schedule=crontab(hour="7", minute="30"),
task="send_daily_report",
)
# Every Monday at 9 AM
app.add_periodic_task(
name="weekly_cleanup",
schedule=crontab(hour="9", minute="0", day_of_week="1"),
task="weekly_maintenance",
)
# First day of month at midnight
app.add_periodic_task(
name="monthly_billing",
schedule=crontab(hour="0", minute="0", day_of_month="1"),
task="process_billing",
)
# Every 2 hours at :30 (2:30, 4:30,
