Kew
🚀 Kew - A Fast, Redis-backed Task Queue Manager for Python
Install / Use
/learn @justrach/KewREADME
Why Kew?
Building async applications often means dealing with background tasks. Existing solutions like Celery require separate worker processes and complex configuration. Kew takes a different approach:
- Runs in Your Process: No separate workers to manage - tasks run in your existing async process
- True Async: Native async/await support - no sync/async bridges needed
- Precise Control: Semaphore-based concurrency ensures exact worker limits
- Simple Setup: Just Redis and a few lines of code to get started
- Fast: Single-roundtrip atomic task submission via Lua scripts
How It Works
Kew manages task execution using a combination of Redis for persistence and asyncio for processing:
graph LR
A[Application] -->|Submit Task| B[Task Queue]
B -->|Semaphore Control| C[Worker Pool]
C -->|Execute Task| D[Task Processing]
D -->|Success| E[Complete]
D -->|Error| F[Circuit Breaker]
F -->|Retry/Reset| B
style A fill:#f9f,stroke:#333
style B fill:#bbf,stroke:#333
style C fill:#bfb,stroke:#333
style D fill:#fbb,stroke:#333
Tasks flow through several states with built-in error handling:
stateDiagram-v2
[*] --> Submitted: Task Created
Submitted --> Queued: Priority Assignment
Queued --> Processing: Worker Available
Processing --> Completed: Success
Processing --> Retry: Error (retries remaining)
Retry --> Queued: Backoff Delay
Processing --> Failed: Error (no retries)
Failed --> CircuitOpen: Multiple Failures
CircuitOpen --> Queued: Circuit Reset
Completed --> [*]
Quick Start
- Install Kew:
pip install kew
- Create a simple task processor:
import asyncio
from kew import TaskQueueManager, QueueConfig, QueuePriority
async def process_order(order_id: str):
# Simulate order processing
await asyncio.sleep(1)
return f"Order {order_id} processed"
async def main():
# Initialize queue manager
manager = TaskQueueManager(redis_url="redis://localhost:6379")
await manager.initialize()
# Create processing queue
await manager.create_queue(QueueConfig(
name="orders",
max_workers=4, # Only 4 concurrent tasks
max_size=1000
))
# Submit some tasks
tasks = []
for i in range(10):
task = await manager.submit_task(
task_id=f"order-{i}",
queue_name="orders",
task_type="process_order",
task_func=process_order,
priority=QueuePriority.MEDIUM,
order_id=str(i)
)
tasks.append(task)
# Check results
# Small delay to allow tasks to complete in this simple example
await asyncio.sleep(1.2)
for task in tasks:
status = await manager.get_task_status(task.task_id)
print(f"{task.task_id}: {status.result}")
if __name__ == "__main__":
asyncio.run(main())
Key Features
Concurrency Control
# Strictly enforce 4 concurrent tasks max
await manager.create_queue(QueueConfig(
name="api_calls",
max_workers=4 # Guaranteed not to exceed
))
Priority Queues
# High priority queue for urgent tasks
await manager.create_queue(QueueConfig(
name="urgent",
priority=QueuePriority.HIGH
))
# Lower priority for batch processing
await manager.create_queue(QueueConfig(
name="batch",
priority=QueuePriority.LOW
))
Retry with Exponential Backoff (v0.2.0)
await manager.create_queue(QueueConfig(
name="flaky_api",
max_workers=4,
max_retries=3, # Retry up to 3 times on failure
retry_delay=1.0, # Base delay of 1 second (doubles each retry)
))
# Tasks that fail will be re-queued automatically:
# Attempt 1: immediate
# Attempt 2: +1s delay
# Attempt 3: +2s delay
# Attempt 4: +4s delay (or fail permanently)
Deferred Execution (v0.2.0)
from datetime import datetime, timedelta
# Defer by a duration
await manager.submit_task(
task_id="send-reminder",
queue_name="emails",
task_type="reminder",
task_func=send_reminder,
priority=QueuePriority.MEDIUM,
_defer_by=300.0, # Execute 5 minutes from now
user_id="abc123",
)
# Defer until a specific time
await manager.submit_task(
task_id="morning-report",
queue_name="reports",
task_type="report",
task_func=generate_report,
priority=QueuePriority.LOW,
_defer_until=datetime(2025, 1, 15, 9, 0, 0), # Run at 9 AM
)
Lifecycle Hooks (v0.2.0)
async def on_start(task_info):
print(f"Task {task_info.task_id} started")
async def on_complete(task_info):
await metrics.record("task.completed", task_info.task_id)
async def on_fail(task_info, error):
await alert_channel.send(f"Task {task_info.task_id} failed: {error}")
manager = TaskQueueManager(
redis_url="redis://localhost:6379",
on_task_start=on_start,
on_task_complete=on_complete,
on_task_fail=on_fail,
)
Circuit Breakers
Redis-backed per-queue circuit breaker tracks consecutive failures and temporarily opens the circuit to protect downstreams. Auto-resets via key expiry.
await manager.create_queue(QueueConfig(
name="external_api",
max_workers=4,
max_circuit_breaker_failures=5, # Open after 5 consecutive failures
circuit_breaker_reset_timeout=30, # Auto-close after 30 seconds
))
Backpressure
from kew.exceptions import QueueProcessorError
await manager.create_queue(QueueConfig(
name="bounded_queue",
max_workers=2,
max_size=100, # Reject submissions beyond 100 queued tasks
))
try:
await manager.submit_task(...)
except QueueProcessorError:
# Queue is full - apply backpressure to caller
return {"status": "busy", "retry_after": 5}
Batch Submit (v0.2.1)
Submit thousands of tasks in a single Redis round-trip for maximum throughput:
tasks = [
{
"task_id": f"order-{i}",
"task_type": "process",
"task_func": process_order,
"priority": QueuePriority.MEDIUM,
"kwargs": {"order_id": i},
}
for i in range(1000)
]
# Single call, batched internally in chunks of 50
results = await manager.submit_tasks("orders", tasks)
# ~33,000 tasks/sec — 12x faster than sequential submit_task()
Task Monitoring
# Check task status
status = await manager.get_task_status("task-123")
print(f"Status: {status.status}")
print(f"Result: {status.result}")
print(f"Error: {status.error}")
print(f"Retries: {status.retry_count}")
# Get all currently running tasks
ongoing = await manager.get_ongoing_tasks()
# Monitor queue health
queue_status = await manager.get_queue_status("api_calls")
print(f"Active Tasks: {queue_status['current_workers']}")
print(f"Circuit Breaker: {queue_status['circuit_breaker_status']}")
Real-World Examples
Async Web Application
from fastapi import FastAPI
from kew import TaskQueueManager, QueueConfig, QueuePriority
app = FastAPI()
manager = TaskQueueManager()
@app.on_event("startup")
async def startup():
await manager.initialize()
await manager.create_queue(QueueConfig(
name="emails",
max_workers=2,
max_retries=3, # Retry failed email sends
retry_delay=5.0, # 5s base backoff
))
@app.post("/signup")
async def signup(email: str):
# Handle signup immediately
user = await create_user(email)
# Queue welcome email for background processing
await manager.submit_task(
task_id=f"welcome-{user.id}",
queue_name="emails",
task_type="send_welcome_email",
task_func=send_welcome_email,
priority=QueuePriority.MEDIUM,
user_id=user.id
)
return {"status": "success"}
<!-- BENCHMARK_START -->
Performance
v0.2.1 vs arq (head-to-head benchmark)
Single-process enqueue throughput on Redis 7, measured in CI:
| Metric | kew v0.2.1 | arq v0.27.0 | Winner |
|--------|-----------|-----------|--------|
| Mean enqueue latency | 0.67ms | 0.62ms | arq |
| Sequential throughput | ~1,525/sec | ~1,585/sec | arq |
| Concurrent (gather) | ~3,148/sec | N/A | kew |
| Batch (submit_tasks()) | ~16,202/sec | N/A | kew 10x |
| End-to-end throughput | ~351/sec | N/A* | kew |
*arq requires separate worker processes; kew runs tasks in-process.
Numbers from GitHub Actions on
ubuntu-latest(2026-02-16).
Version progression
| Version | Throughput | vs v0.1.4 | |---------|-----------|-----------| | v0.1.4 | ~850/sec | 1x | | v0.1.8 | ~1,550/sec | 1.8x | | v0.2.0 | ~2,990/sec | 3.5x | | v0.2.1 (sequential) | ~1,525/sec | 1.8x | | v0.2.1 (concurrent) | ~3,148/sec | 3.7x | | v0.2.1 (batch) | ~16,202/sec | 19.1x |
Key optimizations
- v0.2.1: Lock-free submit (Lua atomicity), batch Lua script for N tasks in 1 RTT
- v0.2.0: Atomic Lua script, binary Redis, per-queue locks, semaphore reorder, active task SET
- v0.1.8: Redis pipelining & batching
Version History
See the full changelog in CHANGELOG.md.
| Vers
Related Skills
node-connect
339.3kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
claude-opus-4-5-migration
83.9kMigrate prompts and code from Claude Sonnet 4.0, Sonnet 4.5, or Opus 4.1 to Opus 4.5
frontend-design
83.9kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
model-usage
339.3kUse CodexBar CLI local cost usage to summarize per-model usage for Codex or Claude, including the current (most recent) model or a full model breakdown. Trigger when asked for model-level usage/cost data from codexbar, or when you need a scriptable per-model summary from codexbar cost JSON.
