Asynq
Simple, reliable, and efficient distributed task queue in Rust
Install / Use
/learn @emo-crab/AsynqREADME
Asynq - Rust Distributed Task Queue
Asynq is a simple, reliable, and efficient distributed task queue library written in Rust, backed by Redis, inspired by hibiken/asynq.
🔗 Fully Compatible with Go asynq: This implementation is fully compatible with the Go version of hibiken/asynq, allowing seamless interoperation with Go services.
🌟 Features
- Guaranteed at-least-once execution - Tasks won't be lost
- ⏰ Task scheduling - Support for delayed and scheduled tasks
- 🔄 Automatic retry - Configurable retry policies for failed tasks
- 🛡️ Fault recovery - Automatic task recovery on worker crashes
- 🎯 Priority queues - Support for weighted and strict priority
- ⚡ Low latency - Fast Redis writes with low task enqueue latency
- 🔒 Task deduplication - Support for unique task options
- ⏱️ Timeout control - Per-task timeout and deadline support
- 📦 Task aggregation - Support for batch processing of multiple tasks
- 🔌 Flexible interface - Support for middleware and custom handlers
- ⏸️ Queue pause - Ability to pause/resume specific queues
- 🕒 Periodic tasks - Support for cron-style scheduled tasks
- 🏠 High availability - Support for Redis Cluster
- 🖥️ Web UI - Web-based management interface for queues and tasks
- 🔄 Go compatible - Fully compatible with Go version asynq, can be deployed together
- 🎯 Macro support - Attribute macros for easy handler registration (optional feature)
🚀 Quick Start
Add Dependencies
Add to your Cargo.toml:
[dependencies]
asynq = { version = "0.1", features = ["json"] }
## Enable macro support (optional)
# asynq = { version = "0.1", features = ["json", "macros"] }
## or dev channel
#asynq = { git = "https://github.com/emo-crab/asynq", branch = "main" }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
Basic Usage
Producer (Enqueue Tasks)
use asynq::{client::Client, task::Task, redis::RedisConnectionType};
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct EmailPayload {
to: String,
subject: String,
body: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create Redis configuration
let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
// Create client
let client = Client::new(redis_config).await?;
// Create task
let payload = EmailPayload {
to: "user@example.com".to_string(),
subject: "Welcome!".to_string(),
body: "Welcome to our service!".to_string(),
};
let task = Task::new_with_json("email:send", &payload)?;
// Enqueue task
let task_info = client.enqueue(task).await?;
println!("Task enqueued with ID: {}", task_info.id);
Ok(())
}
Consumer (Process Tasks)
use asynq::{server::Server,server::Handler,task::Task, redis::RedisConnectionType, config::ServerConfig};
use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
#[derive(Serialize, Deserialize)]
struct EmailPayload {
to: String,
subject: String,
body: String,
}
struct EmailProcessor;
#[async_trait]
impl Handler for EmailProcessor {
async fn process_task(&self, task: Task) -> asynq::error::Result<()> {
match task.get_type() {
"email:send" => {
let payload: EmailPayload = task.get_payload_with_json()?;
println!("Sending email to: {}", payload.to);
// Implement actual email sending logic
Ok(())
}
_ => {
Err(asynq::error::Error::other("Unknown task type"))
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Redis configuration
let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
// Configure queues
let mut queues = HashMap::new();
queues.insert("critical".to_string(), 6);
queues.insert("default".to_string(), 3);
queues.insert("low".to_string(), 1);
// Server configuration
let config = ServerConfig::new()
.concurrency(4)
.queues(queues);
// Create server
let mut server = Server::new(redis_config, config).await?;
// Start server
server.run(EmailProcessor).await?;
Ok(())
}
Using ServeMux for Task Routing
ServeMux provides Go-like task routing functionality, automatically routing tasks to different handlers based on task type:
use asynq::{serve_mux::ServeMux, task::Task, redis::RedisConnectionType, config::ServerConfig, server::ServerBuilder};
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
// Create ServeMux
let mut mux = ServeMux::new();
// Register synchronous handler
mux.handle_func("email:send", |task: Task| {
println!("Processing email:send {:?}",task);
Ok(())
});
// Register asynchronous handler
mux.handle_async_func("image:resize", |task: Task| async move {
println!("Processing image:resize {:?}",task);
// Async processing logic
Ok(())
});
mux.handle_func("payment:process", |task: Task| {
println!("Processing payment {:?}",task);
Ok(())
});
// Configure server
let mut queues = HashMap::new();
queues.insert("default".to_string(), 3);
let config = ServerConfig::new().concurrency(4).queues(queues);
// Create and run server
let mut server = ServerBuilder::new()
.redis_config(redis_config)
.server_config(config)
.build()
.await?;
// ServeMux implements Handler trait, can be passed directly to server.run()
server.run(mux).await?;
Ok(())
}
Features:
- 🎯 Automatically route tasks to corresponding handlers based on task type
- ⚡ Support for both synchronous (
handle_func) and asynchronous (handle_async_func) handlers - 🔄 Fully compatible with Go version ServeMux
- 🛡️ Type-safe with compile-time checking
- 📝 Clean API, easy to use
See examples/servemux_example.rs for more examples.
Task Handler Macros (Optional Feature)
When the macros feature is enabled, you can use attribute macros similar to actix-web's routing macros for cleaner handler definition:
use asynq::{
serve_mux::ServeMux,
task::Task,
task_handler,
task_handler_async,
register_handlers,
register_async_handlers,
redis::RedisConnectionType,
config::ServerConfig,
server::ServerBuilder
};
use std::collections::HashMap;
// Define handlers with attribute macros
#[task_handler("email:send")]
fn handle_email(task: Task) -> asynq::error::Result<()> {
println!("Processing email:send");
Ok(())
}
#[task_handler_async("image:resize")]
async fn handle_image(task: Task) -> asynq::error::Result<()> {
println!("Processing image:resize");
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
// Create ServeMux and register handlers with convenience macros
let mut mux = ServeMux::new();
register_handlers!(mux, handle_email);
register_async_handlers!(mux, handle_image);
// Configure and run server
let mut queues = HashMap::new();
queues.insert("default".to_string(), 3);
let config = ServerConfig::new().concurrency(4).queues(queues);
let mut server = ServerBuilder::new()
.redis_config(redis_config)
.server_config(config)
.build()
.await?;
server.run(mux).await?;
Ok(())
}
Macro Features:
- 🎯 Declarative syntax: Define handlers with clean attribute syntax
- 📝 Reduced boilerplate: Pattern strings are stored with the function
- 🔧 Convenient registration: Use
register_handlers!andregister_async_handlers!macros - 🌐 Familiar pattern: Similar to actix-web's
#[get("/path")]routing macros
See examples/macro_example.rs for a complete example.
📚 Advanced Usage
Delayed Tasks
use std::time::Duration;
// Execute after 5 minutes delay
client.enqueue_in(task, Duration::from_secs(300)).await?;
Unique Tasks (Deduplication)
use std::time::Duration;
// Keep unique within 1 hour
let unique_task = Task::new_with_json("report:daily", &payload)?;
client.enqueue_unique(unique_task, Duration::from_secs(3600)).await?;
Task Groups (Batch Processing)
// Add tasks to group for aggregation
for i in 1..=10 {
let item_task = Task::new_with_json("batch:process", &serde_json::json!({"item": i}))?;
client.add_to_group(item_task, "daily_batch").await?;
}
Task Options
let task = Task::new_with_json("image:resize", &payload)?
.with_queue("image_processing") // Specify queue
.with_max_retry(5) // Maximum retry attempts
.with_timeout(Duration::from_secs(300)) // Timeout
.with_unique_ttl(Duration::from_secs(3600)); // Uniqueness TTL
Priority Queues
let mut queues = HashMap::new();
queues.insert("critical".to_string(), 6); // Highest priority
queues.insert("default".to_string(), 3); // Medium priority
queues.insert("low".to_string(), 1); // Low priority
let config = ServerConfig::new()
.queues(queues)
.strict_priority(true); // Strict priority mode
🏗️ Architecture Design
Asynq uses a modular design with main components:
asynq/
├── src/
│ ├── lib.rs # Library entry and
