Disque
Golang client for Disque, the Persistent Distributed Job Priority Queue
Install / Use
/learn @goware/DisqueREADME
disque
Golang client for Disque, the Persistent Distributed Job Priority Queue.
- Persistent - Jobs can be either in-memory or persisted on disk<sup>[1]</sup>.
- Distributed - Disque pool. Multiple producers, multiple consumers.
- Job Priority Queue - Multiple queues. Consumers
Get()from higher priority queues first. - Fault tolerant - Jobs must be replicated to N nodes before
Add()returns. Consumer mustAck()the job within a specifiedRetryAftertimeout or the job will be re-queued automatically.
Note: The examples below ignore error handling for readability.
Producer
import (
"github.com/goware/disque"
)
func main() {
// Connect to Disque pool.
jobs, _ := disque.New("127.0.0.1:7711") // Accepts more arguments.
defer jobs.Close()
// Enqueue three jobs with different priorities.
job1, _ := jobs.Add(data1, "high")
job2, _ := jobs.Add(data2, "low")
job3, _ := jobs.Add(data3, "urgent")
// Block until job3 is done.
jobs.Wait(job3)
}
Consumer (worker)
import (
"github.com/goware/disque"
)
func main() {
// Connect to Disque pool.
jobs, _ := disque.New("127.0.0.1:7711") // Accepts more arguments.
defer jobs.Close()
for {
// Get job from highest priority queue possible. Blocks by default.
job, _ := jobs.Get("urgent", "high", "low") // Left-to-right priority.
// Do some hard work with the job data.
if err := Process(job.Data); err != nil {
// Failed. Re-queue the job.
jobs.Nack(job)
}
// Acknowledge (dequeue) the job.
jobs.Ack(job)
}
}
Default configuration
| Config option | Default value | Description | | ------------- |:-------------:| ------------ | | Timeout | 0 | Block on each operation until it returns. | | Replicate | 0 | Job doesn't need to be replicated before Add() returns. | | Delay | 0 | Job is enqueued immediately. | | RetryAfter | 0 | Don't re-queue job automatically. | | TTL | 0 | Job lives until it's ACKed. | | MaxLen | 0 | Unlimited queue. |
Custom configuration
jobs, _ := disque.New("127.0.0.1:7711")
config := disque.Config{
Timeout: time.Second, // Each operation fails after 1s timeout elapses.
Replicate: 2, // Replicates job to 2+ nodes before Add() returns.
Delay: time.Hour, // Schedules the job (enqueues after 1h).
RetryAfter: time.Minute, // Re-queues the job after 1min of not being ACKed.
TTL: 24 * time.Hour, // Removes the job from the queue after one day.
MaxLen: 1000, // Fails if there are 1000+ jobs in the queue.
}
// Apply globally.
jobs.Use(config)
// Apply to a single operation.
jobs.With(config).Add(data, "queue")
// Apply single option to a single operation.
jobs.Timeout(time.Second).Get("queue", "queue2")
jobs.MaxLen(1000).RetryAfter(time.Minute).Add(data, "queue")
jobs.TTL(24 * time.Hour).Add(data, "queue")
License
Disque is licensed under the MIT License.
Related Skills
node-connect
349.0kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
109.4kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
349.0kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
qqbot-media
349.0kQQBot 富媒体收发能力。使用 <qqmedia> 标签,系统根据文件扩展名自动识别类型(图片/语音/视频/文件)。

