Modelq
ModelQ is a lightweight, battle-tested Python library for scheduling and queuing machine learning inference tasks. It's designed as a faster and simpler alternative to Celery for ML workloads, using Redis and threading to efficiently run background tasks.
Install / Use
/learn @ModelsLab/ModelqREADME
ModelQ
ModelQ is a lightweight Python library for scheduling and queuing machine learning inference tasks. It's designed as a faster and simpler alternative to Celery for ML workloads, using Redis and threading to efficiently run background tasks.
ModelQ is developed and maintained by the team at Modelslab.
About Modelslab: Modelslab provides powerful APIs for AI-native applications including:
- Image generation
- Uncensored chat
- Video generation
- Audio generation
- And much more
✨ Features
- ✅ Retry support (automatic and manual)
- ⏱ Timeout handling for long-running tasks
- 🔁 Manual retry using
RetryTaskException - 🎮 Streaming results from tasks in real-time
- 🧹 Middleware hooks for task lifecycle events
- ⚡ Fast, non-blocking concurrency using threads
- 🧵 Built-in decorators to register tasks quickly
- 💃 Redis-based task queueing
- 🖥️ CLI interface for orchestration
- 🔢 Pydantic model support for task validation and typing
- 🌐 Auto-generated REST API for tasks
- 🚫 Task cancellation for queued or running tasks
- 📊 Progress tracking for long-running tasks
- 📜 Task history with configurable retention
🗆 Installation
pip install modelq
🚀 Auto-Generated REST API
One of ModelQ's most powerful features is the ability to expose your tasks as HTTP endpoints automatically.
By running a single command, every registered task becomes an API route:
modelq serve-api --app-path main:modelq_app --host 0.0.0.0 --port 8000
How It Works
- Each task registered with
@q.task(...)is turned into a POST endpoint under/tasks/{task_name} - If your task uses Pydantic input/output, the endpoint will validate the request and return a proper response schema
- The API is built using FastAPI, so you get automatic Swagger docs at:
http://localhost:8000/docs
Example Usage
curl -X POST http://localhost:8000/tasks/add \
-H "Content-Type: application/json" \
-d '{"a": 3, "b": 7}'
You can now build ML inference APIs without needing to write any web code!
🖥️ CLI Usage
You can interact with ModelQ using the modelq command-line tool. All commands require an --app-path parameter to locate your ModelQ instance in module:object format.
Start Workers
modelq run-workers main:modelq_app --workers 2
Start background worker threads for executing tasks.
Check Queue Status
modelq status --app-path main:modelq_app
Show number of servers, queued tasks, and registered task types.
List Queued Tasks
modelq list-queued --app-path main:modelq_app
Display a list of all currently queued task IDs and their names.
Clear the Queue
modelq clear-queue --app-path main:modelq_app
Remove all tasks from the queue.
Remove a Specific Task
modelq remove-task --app-path main:modelq_app --task-id <task_id>
Remove a specific task from the queue by ID.
Serve API
modelq serve-api --app-path main:modelq_app --host 0.0.0.0 --port 8000 --log-level info
Start a FastAPI server for ModelQ to accept task submissions over HTTP.
Version
modelq version
Print the current version of ModelQ CLI.
More commands like requeue-stuck, prune-results, and get-task-status are coming soon.
🧠 Basic Usage
from modelq import ModelQ
from modelq.exceptions import RetryTaskException
from redis import Redis
import time
imagine_db = Redis(host="localhost", port=6379, db=0)
q = ModelQ(redis_client=imagine_db)
@q.task(timeout=10, retries=2)
def add(a, b):
return a + b
@q.task(stream=True)
def stream_multiples(x):
for i in range(5):
time.sleep(1)
yield f"{i+1} * {x} = {(i+1) * x}"
@q.task()
def fragile(x):
if x < 5:
raise RetryTaskException("Try again.")
return x
q.start_workers()
task = add(2, 3)
print(task.get_result(q.redis_client))
🔑 Custom Task IDs
By default, ModelQ generates a UUID for each task. You can provide your own task ID using the _task_id parameter to correlate tasks with your database records:
from modelq import ModelQ
from redis import Redis
redis_client = Redis(host="localhost", port=6379, db=0)
mq = ModelQ(redis_client=redis_client)
@mq.task()
def process_order(order_data: dict):
# Process the order...
return {"status": "completed"}
mq.start_workers()
# Use your database record ID as the task ID
order_id = "order-12345"
task = process_order({"item": "widget"}, _task_id=order_id)
print(task.task_id) # 'order-12345'
# Later, retrieve the task using the same ID
status = mq.get_task_status(order_id)
details = mq.get_task_details(order_id)
This is useful when you want to:
- Track tasks using your existing database primary keys
- Easily correlate queue tasks with database records
- Look up task status without storing the generated UUID
🔢 Pydantic Support
ModelQ supports Pydantic models as both input and output types for tasks. This allows automatic validation of input parameters and structured return values.
Example
from pydantic import BaseModel, Field
from redis import Redis
from modelq import ModelQ
import time
class AddIn(BaseModel):
a: int = Field(ge=0)
b: int = Field(ge=0)
class AddOut(BaseModel):
total: int
redis_client = Redis(host="localhost", port=6379, db=0)
mq = ModelQ(redis_client=redis_client)
@mq.task(schema=AddIn, returns=AddOut)
def add(payload: AddIn) -> AddOut:
print(f"Processing addition: {payload.a} + {payload.b}.")
time.sleep(10) # Simulate some processing time
return AddOut(total=payload.a + payload.b)
Getting Result
output = job.get_result(mq.redis_client, returns=AddOut)
ModelQ will validate inputs using Pydantic and serialize/deserialize results seamlessly.
⚙️ Middleware Support
ModelQ allows you to plug in custom middleware to hook into events:
Supported Events
before_worker_bootafter_worker_bootbefore_worker_shutdownafter_worker_shutdownbefore_enqueueafter_enqueueon_error
Example
from modelq.app.middleware import Middleware
class LoggingMiddleware(Middleware):
def before_enqueue(self, *args, **kwargs):
print("Task about to be enqueued")
def on_error(self, task, error):
print(f"Error in task {task.task_id}: {error}")
Attach to ModelQ instance:
q.middleware = LoggingMiddleware()
🚫 Task Cancellation
ModelQ supports cancelling tasks that are queued or in progress. This is useful for long-running ML inference tasks that need to be stopped.
Cancelling a Task
from modelq import ModelQ
from redis import Redis
redis_client = Redis(host="localhost", port=6379, db=0)
mq = ModelQ(redis_client=redis_client)
# Enqueue a task
task = my_long_task({"data": "value"})
# Cancel the task
cancelled = mq.cancel_task(task.task_id)
if cancelled:
print(f"Task {task.task_id} was cancelled")
Checking Cancellation Status
# Check if a task was cancelled
if mq.is_task_cancelled(task.task_id):
print("Task was cancelled")
# Get all cancelled tasks
cancelled_tasks = mq.get_cancelled_tasks(limit=100)
for t in cancelled_tasks:
print(f"Cancelled: {t['task_id']} - {t['task_name']}")
Handling Cancellation Inside a Task
For long-running tasks, you should periodically check for cancellation and exit gracefully:
@mq.task()
def long_running_task(params: dict):
items = params.get("items", [])
results = []
for i, item in enumerate(items):
# Check if task was cancelled
task_id = params.get("_task_id") # Task ID is injected
if task_id and mq.is_task_cancelled(task_id):
return {"status": "cancelled", "processed": i}
# Process item
result = process_item(item)
results.append(result)
return {"status": "completed", "results": results}
Cancellation in Streaming Tasks
Streaming tasks automatically check for cancellation and will stop yielding results:
task = my_streaming_task({"prompt": "Generate text"})
# Start consuming stream in another thread/process
# ...
# Cancel from main thread
mq.cancel_task(task.task_id)
# The stream will stop gracefully
📊 Progress Tracking
For long-running tasks, you can report progress to let clients know how far along the task is.
Reporting Progress Inside a Task
@mq.task()
def train_model(params: dict):
task_id = params.get("_task_id")
epochs = params.get("epochs", 10)
for epoch in range(epochs):
# Report progress (0.0 to 1.0)
progress = (epoch + 1) / epochs
mq.report_progress(task_id, progress, f"Training epoch {epoch + 1}/{epochs}")
# Do actual training
train_epoch(epoch)
return {"status": "completed", "epochs": epochs}
Getting Progress from Client
Related Skills
clearshot
Structured screenshot analysis for UI implementation and critique. Analyzes every UI screenshot with a 5×5 spatial grid, full element inventory, and design system extraction — facts and taste together, every time. Escalates to full implementation blueprint when building. Trigger on any digital interface image file (png, jpg, gif, webp — websites, apps, dashboards, mockups, wireframes) or commands like 'analyse this screenshot,' 'rebuild this,' 'match this design,' 'clone this.' Skip for non-UI images (photos, memes, charts) unless the user explicitly wants to build a UI from them. Does NOT trigger on HTML source code, CSS, SVGs, or any code pasted as text.
openpencil
2.1kThe world's first open-source AI-native vector design tool and the first to feature concurrent Agent Teams. Design-as-Code. Turn prompts into UI directly on the live canvas. A modern alternative to Pencil.
openpencil
2.1kThe world's first open-source AI-native vector design tool and the first to feature concurrent Agent Teams. Design-as-Code. Turn prompts into UI directly on the live canvas. A modern alternative to Pencil.
ui-ux-pro-max-skill
59.8kAn AI SKILL that provide design intelligence for building professional UI/UX multiple platforms
