Rmq
Message queue system written in Go and backed by Redis
Install / Use
/learn @wellle/RmqREADME
Overview
rmq is short for Redis message queue. It's a message queue system written in Go and backed by Redis.
Basic Usage
Let's take a look at how to use rmq.
Import
Of course you need to import rmq wherever you want to use it.
import "github.com/adjust/rmq/v5"
Connection
Before we get to queues, we first need to establish a connection. Each rmq connection has a name (used in statistics) and Redis connection details including which Redis database to use. The most basic Redis connection uses a TCP connection to a given host and a port:
connection, err := rmq.OpenConnection("my service", "tcp", "localhost:6379", 1, errChan)
It's also possible to access a Redis listening on a Unix socket:
connection, err := rmq.OpenConnection("my service", "unix", "/tmp/redis.sock", 1, errChan)
For more flexible setup you can pass Redis options or create your own Redis client:
connection, err := OpenConnectionWithRedisOptions("my service", redisOptions, errChan)
connection, err := OpenConnectionWithRedisClient("my service", redisClient, errChan)
If the Redis instance can't be reached you will receive an error indicating this.
Please also note the errChan parameter. There is some rmq logic running in
the background which can run into Redis errors. If you pass an error channel to
the OpenConnection() functions rmq will send those background errors to this
channel so you can handle them asynchronously. For more details about this and
handling suggestions see the section about handling background errors below.
Connecting to a Redis cluster
In order to connect to a Redis cluster please use OpenClusterConnection():
redisClusterOptions := &redis.ClusterOptions{ /* ... */ }
redisClusterClient := redis.NewClusterClient(redisClusterOptions)
connection, err := OpenClusterConnection("my service", redisClusterClient, errChan)
Note that such an rmq cluster connection uses different Redis than rmq connections
opened by OpenConnection() or similar. If you have used a Redis instance
with OpenConnection() then it is NOT SAFE to reuse that rmq system by connecting
to it via OpenClusterConnection(). The cluster state won't be compatible and
this will likely lead to data loss.
If you've previously used OpenConnection() or similar you should only consider
using OpenClusterConnection() with a fresh Redis cluster.
Queues
Once we have a connection we can use it to finally access queues. Each queue must have a unique name by which we address it. Queues are created once they are accessed. There is no need to declare them in advance. Here we open a queue named "tasks":
taskQueue, err := connection.OpenQueue("tasks")
Again, possibly Redis errors might be returned.
Producers
An empty queue is boring, let's add some deliveries! Internally all deliveries are saved to Redis lists as strings. This is how you can publish a string payload to a queue:
delivery := "task payload"
err := taskQueue.Publish(delivery)
In practice, however, it's more common to have instances of some struct that we
want to publish to a queue. Assuming task is of some type like Task, this
is how to publish the JSON representation of that task:
// create task
taskBytes, err := json.Marshal(task)
if err != nil {
// handle error
}
err = taskQueue.PublishBytes(taskBytes)
For a full example see example/producer.
Consumers
Now that our queue starts filling, let's add a consumer. After opening the queue as before, we need it to start consuming before we can add consumers.
err := taskQueue.StartConsuming(10, time.Second)
This sets the prefetch limit to 10 and the poll duration to one second. This means the queue will fetch up to 10 deliveries at a time before giving them to the consumers. To avoid idling consumers while the queues are full, the prefetch limit should always be greater than the number of consumers you are going to add. If the queue gets empty, the poll duration sets how long rmq will wait before checking for new deliveries in Redis.
Once this is set up, we can actually add consumers to the consuming queue.
taskConsumer := &TaskConsumer{}
name, err := taskQueue.AddConsumer("task-consumer", taskConsumer)
To uniquely identify each consumer internally rmq creates a random name with
the given prefix. For example in this case name might be
task-consumer-WB1zaq. This name is only used in statistics.
In our example above the injected taskConsumer (of type *TaskConsumer) must
implement the rmq.Consumer interface. For example:
func (consumer *TaskConsumer) Consume(delivery rmq.Delivery) {
var task Task
if err = json.Unmarshal([]byte(delivery.Payload()), &task); err != nil {
// handle json error
if err := delivery.Reject(); err != nil {
// handle reject error
}
return
}
// perform task
log.Printf("performing task %s", task)
if err := delivery.Ack(); err != nil {
// handle ack error
}
}
First we unmarshal the JSON package found in the delivery payload. If this fails we reject the delivery. Otherwise we perform the task and ack the delivery.
If you don't actually need a consumer struct you can use AddConsumerFunc
instead and pass a consumer function which handles an rmq.Delivery:
name, err := taskQueue.AddConsumerFunc(func(delivery rmq.Delivery) {
// handle delivery and call Ack() or Reject() on it
})
Please note that delivery.Ack() and similar functions have a built-in retry
mechanism which will block your consumers in some cases. This is because
failing to acknowledge a delivery is potentially dangerous. For details
see the section about background errors below.
For a full example see example/consumer.
Consumer Lifecycle
As described above you can add consumers to a queue. For each consumer rmq
takes one of the prefetched unacked deliveries from the delivery channel and
passes it to the consumer's Consume() function. The next delivery will only
be passed to the same consumer once the prior Consume() call returns. So each
consumer will only be consuming a single delivery at any given time.
Furthermore each Consume() call is expected to call either delivery.Ack(),
delivery.Reject() or delivery.Push() (see below). If that's not the case
these deliveries will remain unacked and the prefetch goroutine won't make
progress after a while. So make sure you always call exactly one of those
functions in your Consume() implementations.
Background Errors
It's recommended to inject an error channel into the OpenConnection()
functions. This section describes it's purpose and how you might use it to
monitor rmq background Redis errors.
There are three sources of background errors which rmq detects (and handles internally):
-
The
OpenConnection()functions spawn a goroutine which keeps a heartbeat Redis key alive. This is important so that the cleaner (see below) can tell which connections are still alive and must not be cleaned yet. If the heartbeat goroutine fails to update the heartbeat Redis key repeatedly foo too long the cleaner might clean up the connection prematurely. To avoid this the connection will automatically stop all consumers after 45 consecutive heartbeat errors. This magic number is based on the details of the heartbeat key: The heartbeat tries to update the key every second with a TTL of one minute. So only after 60 failed attempts the heartbeat key would be dead.Every time this goroutine runs into a Redis error it gets send to the error channel as
HeartbeatError. -
The
StartConsuming()function spawns a goroutine which is responsible for prefetching deliveries from the Redisreadylist and moving them into a delivery channel. This delivery channels feeds into your consumersConsume()functions. If the prefetch goroutine runs into Redis errors this basically means that there won't be new deliveries being sent to your consumers until it can fetch new ones. So these Redis errors are not dangerous, it just means that your consumers will start idling until the Redis connection recovers.Every time this goroutine runs into a Redis error it gets send to the error channel as
ConsumeError. -
The delivery functions
Ack(),Reject()andPush()have a built-in retry mechanism. This is because failing to acknowledge a delivery is potentially dangerous. The consumer has already handled the delivery, so if it can't ack it the cleaner might end up moving it back to the ready list so another consumer might end up consuming it again in the future, leading to double delivery.So if a delivery failed to be acked because of a Redis error the
Ack()call will block and retry once a second until it either succeeds or until consuming gets stopped (see below). In the latter case theAck()call will returnrmq.ErrorConsumingStoppedwhich you should handle in your consume function. For example you might want to log about the delivery so you can manually remove it from the unacked or ready list before you start new consumers. Or at least you can know which deliveries might end up being consumed twice.Every time these functions runs into a Redis error it gets send to the error channel as
DeliveryError.
Each of those error types has a field Count which tells you how often the
operation failed consecutively. This indicates for how long the affected Redis
instan
Related Skills
node-connect
340.5kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
xurl
340.5kA CLI tool for making authenticated requests to the X (Twitter) API. Use this skill when you need to post tweets, reply, quote, search, read posts, manage followers, send DMs, upload media, or interact with any X API v2 endpoint.
frontend-design
84.2kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
340.5kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
