SkillAgentSearch skills...

Asynq

Simple, reliable, and efficient distributed task queue in Rust

Install / Use

/learn @emo-crab/Asynq
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Asynq - Rust Distributed Task Queue

English | 中文

License Rust

Asynq is a simple, reliable, and efficient distributed task queue library written in Rust, backed by Redis, inspired by hibiken/asynq.

🔗 Fully Compatible with Go asynq: This implementation is fully compatible with the Go version of hibiken/asynq, allowing seamless interoperation with Go services.

🌟 Features

  • Guaranteed at-least-once execution - Tasks won't be lost
  • Task scheduling - Support for delayed and scheduled tasks
  • 🔄 Automatic retry - Configurable retry policies for failed tasks
  • 🛡️ Fault recovery - Automatic task recovery on worker crashes
  • 🎯 Priority queues - Support for weighted and strict priority
  • Low latency - Fast Redis writes with low task enqueue latency
  • 🔒 Task deduplication - Support for unique task options
  • ⏱️ Timeout control - Per-task timeout and deadline support
  • 📦 Task aggregation - Support for batch processing of multiple tasks
  • 🔌 Flexible interface - Support for middleware and custom handlers
  • ⏸️ Queue pause - Ability to pause/resume specific queues
  • 🕒 Periodic tasks - Support for cron-style scheduled tasks
  • 🏠 High availability - Support for Redis Cluster
  • 🖥️ Web UI - Web-based management interface for queues and tasks
  • 🔄 Go compatible - Fully compatible with Go version asynq, can be deployed together
  • 🎯 Macro support - Attribute macros for easy handler registration (optional feature)

🚀 Quick Start

Add Dependencies

Add to your Cargo.toml:

[dependencies]
asynq = { version = "0.1", features = ["json"] }
## Enable macro support (optional)
# asynq = { version = "0.1", features = ["json", "macros"] }
## or dev channel
#asynq = { git = "https://github.com/emo-crab/asynq", branch = "main" }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }

Basic Usage

Producer (Enqueue Tasks)

use asynq::{client::Client, task::Task, redis::RedisConnectionType};
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize)]
struct EmailPayload {
    to: String,
    subject: String,
    body: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create Redis configuration
    let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;

    // Create client
    let client = Client::new(redis_config).await?;

    // Create task
    let payload = EmailPayload {
        to: "user@example.com".to_string(),
        subject: "Welcome!".to_string(),
        body: "Welcome to our service!".to_string(),
    };

    let task = Task::new_with_json("email:send", &payload)?;

    // Enqueue task
    let task_info = client.enqueue(task).await?;
    println!("Task enqueued with ID: {}", task_info.id);

    Ok(())
}

Consumer (Process Tasks)

use asynq::{server::Server,server::Handler,task::Task, redis::RedisConnectionType, config::ServerConfig};
use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use std::collections::HashMap;

#[derive(Serialize, Deserialize)]
struct EmailPayload {
    to: String,
    subject: String,
    body: String,
}

struct EmailProcessor;

#[async_trait]
impl Handler for EmailProcessor {
    async fn process_task(&self, task: Task) -> asynq::error::Result<()> {
        match task.get_type() {
            "email:send" => {
                let payload: EmailPayload = task.get_payload_with_json()?;
                println!("Sending email to: {}", payload.to);
                // Implement actual email sending logic
                Ok(())
            }
            _ => {
                Err(asynq::error::Error::other("Unknown task type"))
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Redis configuration
    let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;

    // Configure queues
    let mut queues = HashMap::new();
    queues.insert("critical".to_string(), 6);
    queues.insert("default".to_string(), 3);
    queues.insert("low".to_string(), 1);

    // Server configuration
    let config = ServerConfig::new()
        .concurrency(4)
        .queues(queues);

    // Create server
    let mut server = Server::new(redis_config, config).await?;

    // Start server
    server.run(EmailProcessor).await?;

    Ok(())
}

Using ServeMux for Task Routing

ServeMux provides Go-like task routing functionality, automatically routing tasks to different handlers based on task type:

use asynq::{serve_mux::ServeMux, task::Task, redis::RedisConnectionType, config::ServerConfig, server::ServerBuilder};
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;

    // Create ServeMux
    let mut mux = ServeMux::new();

    // Register synchronous handler
    mux.handle_func("email:send", |task: Task| {
        println!("Processing email:send {:?}",task);
        Ok(())
    });

    // Register asynchronous handler
    mux.handle_async_func("image:resize", |task: Task| async move {
        println!("Processing image:resize {:?}",task);
        // Async processing logic
        Ok(())
    });

    mux.handle_func("payment:process", |task: Task| {
        println!("Processing payment {:?}",task);
        Ok(())
    });

    // Configure server
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    let config = ServerConfig::new().concurrency(4).queues(queues);

    // Create and run server
    let mut server = ServerBuilder::new()
        .redis_config(redis_config)
        .server_config(config)
        .build()
        .await?;

    // ServeMux implements Handler trait, can be passed directly to server.run()
    server.run(mux).await?;

    Ok(())
}

Features:

  • 🎯 Automatically route tasks to corresponding handlers based on task type
  • ⚡ Support for both synchronous (handle_func) and asynchronous (handle_async_func) handlers
  • 🔄 Fully compatible with Go version ServeMux
  • 🛡️ Type-safe with compile-time checking
  • 📝 Clean API, easy to use

See examples/servemux_example.rs for more examples.

Task Handler Macros (Optional Feature)

When the macros feature is enabled, you can use attribute macros similar to actix-web's routing macros for cleaner handler definition:

use asynq::{
    serve_mux::ServeMux, 
    task::Task, 
    task_handler, 
    task_handler_async,
    register_handlers,
    register_async_handlers,
    redis::RedisConnectionType, 
    config::ServerConfig, 
    server::ServerBuilder
};
use std::collections::HashMap;

// Define handlers with attribute macros
#[task_handler("email:send")]
fn handle_email(task: Task) -> asynq::error::Result<()> {
    println!("Processing email:send");
    Ok(())
}

#[task_handler_async("image:resize")]
async fn handle_image(task: Task) -> asynq::error::Result<()> {
    println!("Processing image:resize");
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
    
    // Create ServeMux and register handlers with convenience macros
    let mut mux = ServeMux::new();
    register_handlers!(mux, handle_email);
    register_async_handlers!(mux, handle_image);
    
    // Configure and run server
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    let config = ServerConfig::new().concurrency(4).queues(queues);
    
    let mut server = ServerBuilder::new()
        .redis_config(redis_config)
        .server_config(config)
        .build()
        .await?;
    
    server.run(mux).await?;
    Ok(())
}

Macro Features:

  • 🎯 Declarative syntax: Define handlers with clean attribute syntax
  • 📝 Reduced boilerplate: Pattern strings are stored with the function
  • 🔧 Convenient registration: Use register_handlers! and register_async_handlers! macros
  • 🌐 Familiar pattern: Similar to actix-web's #[get("/path")] routing macros

See examples/macro_example.rs for a complete example.

📚 Advanced Usage

Delayed Tasks

use std::time::Duration;
// Execute after 5 minutes delay
client.enqueue_in(task, Duration::from_secs(300)).await?;

Unique Tasks (Deduplication)

use std::time::Duration;

// Keep unique within 1 hour
let unique_task = Task::new_with_json("report:daily", &payload)?;
client.enqueue_unique(unique_task, Duration::from_secs(3600)).await?;

Task Groups (Batch Processing)

// Add tasks to group for aggregation
for i in 1..=10 {
    let item_task = Task::new_with_json("batch:process", &serde_json::json!({"item": i}))?;
    client.add_to_group(item_task, "daily_batch").await?;
}

Task Options

let task = Task::new_with_json("image:resize", &payload)?
    .with_queue("image_processing")     // Specify queue
    .with_max_retry(5)                  // Maximum retry attempts
    .with_timeout(Duration::from_secs(300)) // Timeout
    .with_unique_ttl(Duration::from_secs(3600)); // Uniqueness TTL

Priority Queues

let mut queues = HashMap::new();
queues.insert("critical".to_string(), 6);  // Highest priority
queues.insert("default".to_string(), 3);   // Medium priority
queues.insert("low".to_string(), 1);       // Low priority

let config = ServerConfig::new()
    .queues(queues)
    .strict_priority(true); // Strict priority mode

🏗️ Architecture Design

Asynq uses a modular design with main components:

asynq/
├── src/
│   ├── lib.rs              # Library entry and 
View on GitHub
GitHub Stars36
CategoryDevelopment
Updated5d ago
Forks6

Languages

Rust

Security Score

95/100

Audited on Apr 1, 2026

No findings