SkillAgentSearch skills...

Flashq

⚡ The task queue that works out of the box — no Redis, no RabbitMQ, just pip install and go.

Install / Use

/learn @bysiber/Flashq

README

⚡ FlashQ

The task queue that works out of the box — no Redis, no RabbitMQ, just pip install flashq and go.

CI Python License: MIT Tests


Why FlashQ?

Every Python developer has been there: you need background tasks, you look at Celery, and suddenly you need Redis or RabbitMQ running, a separate broker config, and 200 lines of boilerplate before your first task runs.

FlashQ changes that. SQLite is the default backend — zero external dependencies, zero config. Your tasks persist across restarts, and you can scale to PostgreSQL or Redis when you need to.

from flashq import FlashQ

app = FlashQ()  # That's it. Uses SQLite by default.

@app.task()
def send_email(to: str, subject: str) -> None:
    print(f"Sending email to {to}: {subject}")

# Enqueue a task
send_email.delay(to="user@example.com", subject="Welcome!")

Features

| Feature | FlashQ | Celery | Dramatiq | Huey | TaskIQ | |---------|:------:|:------:|:--------:|:----:|:------:| | Zero-config setup | ✅ | ❌ | ❌ | ⚠️ | ❌ | | SQLite backend | ✅ | ❌ | ❌ | ✅ | ❌ | | PostgreSQL backend | ✅ | ❌ | ❌ | ❌ | ⚠️ | | Redis backend | ✅ | ✅ | ✅ | ✅ | ✅ | | Async + Sync tasks | ✅ | ❌ | ❌ | ❌ | Async only | | Type-safe .delay() | ✅ | ❌ | ⚠️ | ❌ | ✅ | | Task chains/groups | ✅ | ✅ | ❌ | ❌ | ❌ | | Middleware system | ✅ | ✅ | ✅ | ❌ | ✅ | | Rate limiting | ✅ | ✅ | ❌ | ❌ | ❌ | | Web dashboard | ✅ | ⚠️ | ❌ | ❌ | ❌ | | Dead letter queue | ✅ | ❌ | ❌ | ❌ | ❌ | | Task timeouts | ✅ | ✅ | ✅ | ❌ | ✅ | | Periodic/cron scheduler | ✅ | ⚠️ | ❌ | ✅ | ⚠️ | | Zero dependencies | ✅ | ❌ | ❌ | ❌ | ❌ |

Installation

# Core (SQLite only — zero dependencies!)
pip install flashq

# With Redis
pip install "flashq[redis]"

# With PostgreSQL
pip install "flashq[postgres]"

# Development
pip install "flashq[dev]"

Try It Now!

See FlashQ in action with the built-in demo — zero config needed:

pip install flashq
python -m flashq.demo

You'll see 20 tasks being created, enqueued, processed by workers, and results printed in real-time.

Quick Start

1. Define tasks

# tasks.py
from flashq import FlashQ

app = FlashQ()

@app.task()
def add(x: int, y: int) -> int:
    return x + y

@app.task(queue="emails", max_retries=5, retry_delay=30.0)
def send_email(to: str, subject: str, body: str) -> dict:
    return {"status": "sent", "to": to}

@app.task(timeout=120.0)  # Kill if takes >2 min
async def process_image(url: str) -> str:
    # Async tasks just work™
    result = await download_and_resize(url)
    return result

2. Enqueue tasks

from tasks import add, send_email

# Simple dispatch
handle = add.delay(2, 3)

# With options
handle = send_email.apply(
    kwargs={"to": "user@example.com", "subject": "Hi", "body": "Hello!"},
    countdown=60,  # delay by 60 seconds
)

# Check result
result = handle.get_result()
if result and result.is_success:
    print(f"Result: {result.result}")

3. Start the worker

flashq worker tasks:app
 ⚡ FlashQ Worker
 ├─ name:        worker-12345
 ├─ backend:     SQLiteBackend
 ├─ queues:      default
 ├─ concurrency: 4
 ├─ tasks:       3
 │    └─ tasks.add
 │    └─ tasks.send_email
 │    └─ tasks.process_image
 └─ Ready! Waiting for tasks...

Task Composition

Chain tasks sequentially, run them in parallel, or combine both:

from flashq import chain, group, chord

# Chain: sequential — result of each passed to next
pipe = chain(
    download.s("https://example.com/data.csv"),
    parse_csv.s(),
    store_results.s(table="imports"),
)
pipe.delay(app)

# Group: parallel execution
batch = group(
    send_email.s(to="alice@test.com", subject="Hi"),
    send_email.s(to="bob@test.com", subject="Hi"),
    send_email.s(to="carol@test.com", subject="Hi"),
)
handle = batch.delay(app)
results = handle.get_results(timeout=30)

# Chord: parallel + callback when all complete
workflow = chord(
    group(fetch_price.s("AAPL"), fetch_price.s("GOOG"), fetch_price.s("MSFT")),
    aggregate_prices.s(),
)
workflow.delay(app)

Middleware

Intercept task lifecycle events for logging, monitoring, or custom logic:

from flashq import FlashQ, Middleware

class MetricsMiddleware(Middleware):
    def before_execute(self, message):
        self.start = time.time()
        return message

    def after_execute(self, message, result):
        duration = time.time() - self.start
        statsd.timing(f"task.{message.task_name}.duration", duration)

    def on_error(self, message, exc):
        sentry.capture_exception(exc)
        return False  # Don't suppress

    def on_dead(self, message, exc):
        alert_ops_team(f"Task {message.task_name} permanently failed: {exc}")

app = FlashQ()
app.add_middleware(MetricsMiddleware())

Built-in middlewares: LoggingMiddleware, TimeoutMiddleware, RateLimiter.

Rate Limiting

from flashq.ratelimit import RateLimiter

limiter = RateLimiter(default_rate="100/m")  # 100 tasks/minute global
limiter.configure("send_email", rate="10/m")  # 10 emails/minute
limiter.configure("api_call", rate="60/h")    # 60 API calls/hour

app.add_middleware(limiter)

Web Dashboard

Built-in monitoring UI — no extra services needed:

flashq dashboard myapp:app --port 5555

Open http://localhost:5555 to see:

  • Real-time task counts by state (pending, running, success, failure)
  • Task list with filtering by state, queue, and search
  • Task detail modal with args, result, error, traceback
  • Actions: cancel, revoke, purge queue
  • Auto-refreshing every 5 seconds
# Or embed in your own ASGI app
from flashq.dashboard import create_dashboard

dashboard = create_dashboard(app, prefix="/admin/tasks")
# Mount alongside your FastAPI/Starlette app

Dead Letter Queue

Inspect and replay permanently failed tasks:

from flashq.dlq import DeadLetterQueue

dlq = DeadLetterQueue(app)
app.add_middleware(dlq.middleware())  # Auto-capture dead tasks

# Later...
for task in dlq.list():
    print(f"{task.task_name}: {task.error}")

dlq.replay(task_id="abc123")  # Re-enqueue with reset retries
dlq.replay_all()              # Replay everything
dlq.purge()                   # Clear DLQ

Periodic Tasks

from flashq import FlashQ, every, cron
from flashq.scheduler import Scheduler

app = FlashQ()

@app.task(name="cleanup")
def cleanup_old_data():
    delete_old_records(days=30)

@app.task(name="daily_report")
def daily_report():
    generate_and_send_report()

scheduler = Scheduler(app)
scheduler.add("cleanup", every(hours=6))
scheduler.add("daily_report", cron("0 9 * * 1-5"))  # 9 AM weekdays
scheduler.start()

Retry & Error Handling

@app.task(max_retries=5, retry_delay=30.0, retry_backoff=True)
def flaky_task():
    # Retries: 30s → 60s → 120s → 240s → 480s (exponential backoff)
    response = requests.get("https://unreliable-api.com")
    response.raise_for_status()
from flashq.exceptions import TaskRetryError

@app.task(max_retries=10)
def smart_retry():
    try:
        do_something()
    except TemporaryError:
        raise TaskRetryError(countdown=5.0)  # Custom retry delay

Backends

SQLite (Default — Zero Config)

app = FlashQ()  # Creates flashq.db in current dir
app = FlashQ(backend=SQLiteBackend(path="/var/lib/flashq/tasks.db"))
app = FlashQ(backend=SQLiteBackend(path=":memory:"))  # For testing

PostgreSQL

Uses LISTEN/NOTIFY for instant task delivery + FOR UPDATE SKIP LOCKED for atomic dequeue.

from flashq.backends.postgres import PostgresBackend
app = FlashQ(backend=PostgresBackend("postgresql://localhost/mydb"))

Redis

Uses sorted sets for scheduling and Lua scripts for atomic operations.

from flashq.backends.redis import RedisBackend
app = FlashQ(backend=RedisBackend("redis://localhost:6379/0"))

CLI

flashq worker myapp:app                  # Start worker
flashq worker myapp:app -q emails,sms    # Specific queues
flashq worker myapp:app -c 16            # 16 concurrent threads
flashq dashboard myapp:app               # Start web dashboard
flashq dashboard myapp:app -p 8080       # Custom port
flashq info myapp:app                    # Queue stats
flashq purge myapp:app -f                # Purge queue

Benchmarks

Measured on Apple Silicon (M-series), Python 3.12, SQLite backend:

| Benchmark | Tasks | Time | Throughput | Avg Latency | |-----------|------:|-----:|-----------:|------------:| | Enqueue (write only) | 10,000 | 0.52s | 19,298/s | 0.05ms | | Roundtrip (c=4) | 1,000 | 25.7s | 39/s | 0.02ms | | I/O-bound (c=8) | 500 | 6.7s | 75/s | 11.6ms | | CPU-bound (c=4) | 500 | 12.8s | 39/s | 0.03ms |

Concurrency scaling (500 tasks):

| Workers | Throughput | |:-------:|-----------:| | 1 | 10 tasks/s | | 2 | 19 tasks/s | | 4 | 38 tasks/s | | 8 | 100 tasks/s |

💡 Throughput scales linearly with concurrency. For I/O-bound workloads, use higher concurrency.

Run benchmarks yourself:

python benchmarks/bench.py

Architecture

Your App → FlashQ → Backend (SQLite/PG/Redis) → Worker(s)
                ↕
          Middleware Stack
          Rate Limiter
          Scheduler
          Dead Letter Queue

FlashQ uses a clean, modular architecture:

  • Backend: Pluggable storage (SQLite, PostgreSQL, Redis)
  • Worker: Thread pool executor with graceful shutdown
  • Middleware: Intercepts every stage of task lifecycle
  • Scheduler: Interval and cron-based periodic dispatch
  • Canvas: Task composition (chain, group, chord)

Roadmap

  • [x] Core engine with S

Related Skills

View on GitHub
GitHub Stars9
CategoryData
Updated7h ago
Forks0

Languages

Python

Security Score

90/100

Audited on Apr 6, 2026

No findings