SkillAgentSearch skills...

Varmq

A Simplest Storage-Agnostic and Zero-dep Message Queue for Your Concurrent Go Program

Install / Use

/learn @goptics/Varmq

README

VarMQ

Mentioned in Awesome Go Go Reference Go Report Card CI Codecov Go Version License

A high-performance message queue and pool system for Go that simplifies concurrent task processing using worker pool. Through Go generics, it provides type safety without sacrificing performance.

With VarMQ, you can process messages asynchronously, handle errors properly, store data persistently, and scale across systems using adapters. All through a clean, intuitive API that feels natural to Go developers.

✨ Features

  • ⚡ High performance: Optimized for high throughput with minimal overhead, even under heavy load. see benchmarks
  • 🛠️ Variants of queue types:
    • Standard queues for in-memory processing
    • Priority queues for importance-based ordering
    • Persistent queues for durability across restarts
    • Distributed queues for processing across multiple systems
  • 🧩 Worker abstractions:
    • NewWorker - Fire-and-forget operations (most performant)
    • NewErrWorker - Returns only error (when result isn't needed)
    • NewResultWorker - Returns result and error
  • 🚦 Concurrency control: Fine-grained control over worker pool size, dynamic tuning and idle workers management
  • 🧬 Multi Queue Binding: Bind multiple queues to a single worker
  • 💾 Persistence: Support for durable storage through adapter interfaces
  • 🌐 Distribution: Scale processing across multiple instances via adapter interfaces
  • 🧩 Extensible: Build your own storage adapters by implementing VarMQ's internal queue interfaces

Quick Start

Installation

go get github.com/goptics/varmq

Basic Usage

package main

import (
    "fmt"
    "time"

    "github.com/goptics/varmq"
)

func main() {
  worker := varmq.NewWorker(func(j varmq.Job[int]) {
    fmt.Printf("Processing %d\n", j.Data())
    time.Sleep(500 * time.Millisecond)
  }, 10) // with concurrency 10 or set 0 for parallelism
  defer worker.WaitUntilIdle()
  queue := worker.BindQueue()

  for i := range 100 {
    queue.Add(i)
  }
}

↗️ Run it on Playground

Priority Queue

You can use priority queue to prioritize jobs based on their priority. Lower number = higher priority

// just bind priority queue
queue := worker.BindPriorityQueue()

// add jobs to priority queue
for i := range 10 {
    queue.Add(i, i%2) // prioritize even tasks
}

↗️ Run it on Playground

💡 Highlighted Features

Persistent and Distributed Queues

VarMQ supports both persistent and distributed queue processing through adapter interfaces:

  • Persistent Queues: Store jobs durably so they survive program restarts
  • Distributed Queues: Process jobs across multiple systems

Usage is simple:

// For persistent queues (with any IPersistentQueue adapter)
queue := worker.WithPersistentQueue(persistentQueueAdapter)

// For distributed queues (with any IDistributedQueue adapter)
queue := worker.WithDistributedQueue(distributedQueueAdapter)

See complete working examples in the examples directory:

Create your own adapters by implementing the IPersistentQueue or IDistributedQueue interfaces.

[!Note] Before testing examples, make sure to start the Redis server using docker compose up -d.

Built-in adapters

  • ⚡ Redis: redisq
  • 🗃️ SQLite: sqliteq
  • 🦆 DuckDB: duckq
  • 🐘 PostgreSQL: 🔄 Upcoming

Multi Queue Binds

Bind multiple queues to a single worker, enabling efficient processing of jobs from different sources with configurable strategies. The worker supports four strategies:

  1. Priority (default - prioritizes higher priority queues that have pending jobs)
  2. RoundRobin (cycles through queues equally)
  3. MaxLen (prioritizes queues with more jobs)
  4. MinLen (prioritizes queues with fewer jobs)
worker := varmq.NewWorker(func(j varmq.Job[string]) {
	fmt.Println("Processing:", j.Data())
	time.Sleep(500 * time.Millisecond) // Simulate work
}) // change strategy through using varmq.WithStrategy default is varmq.Priority
defer worker.WaitUntilIdle()

// Bind to a standard queues with coronological priorities
// You can change queue priority using varmq.WithQueuePriority function
q1 := worker.BindQueue()         // highest
q2 := worker.BindQueue()         // medium
pq := worker.BindPriorityQueue() // lowest

for i := range 15 {
	q2.Add(fmt.Sprintf("Task queue-2 %d", i))
}

for i := range 10 {
	pq.Add(fmt.Sprintf("Task priority-queue %d", i), i%2) // prioritize even tasks
}

for i := range 10 {
	q1.Add(fmt.Sprintf("Task queue-1 %d", i))
}

↗️ Run it on Playground

Result and Error Worker

VarMQ provides a NewResultWorker that returns both the result and error for each job processed. This is useful when you need to handle both success and failure cases.

worker := varmq.NewResultWorker(func(j varmq.Job[string]) (int, error) {
 fmt.Println("Processing:", j.Data())
 time.Sleep(500 * time.Millisecond) // Simulate work
 data := j.Data()

 if data == "error" {
  return 0, errors.New("error occurred")
 }

 return len(data), nil
})
defer worker.WaitUntilIdle()
queue := worker.BindQueue()

// Add jobs to the queue (non-blocking)
if job, ok := queue.Add("The length of this string is 31"); ok {
 fmt.Println("Job 1 added to queue.")

 go func() {
  result, _ := job.Result()
  fmt.Println("Result:", result)
 }()
}

if job, ok := queue.Add("error"); ok {
 fmt.Println("Job 2 added to queue.")

 go func() {
  _, err := job.Result()
  fmt.Println("Result:", err)
 }()
}

↗️ Run it on Playground

NewErrWorker is similar to NewResultWorker but it returns only error.

Function Helpers

VarMQ provides helper functions that enable direct function submission similar to the Submit() pattern in other pool packages like Pond or Ants

  • Func(): For basic functions with no return values - use with NewWorker
  • ErrFunc(): For functions that return errors - use with NewErrWorker
  • ResultFunc[R](): For functions that return a result and error - use with NewResultWorker
worker := varmq.NewWorker(varmq.Func(), 10)
defer worker.WaitUntilIdle()

queue := worker.BindQueue()

for i := range 100 {
    queue.Add(func() {
        time.Sleep(500 * time.Millisecond)
        fmt.Println("Processing", i)
    })
}

↗️ Run it on Playground

[!Important] Function helpers don't support persistence or distribution since functions cannot be serialized.

Benchmarks

goos: linux
goarch: amd64
pkg: github.com/goptics/varmq
cpu: 13th Gen Intel(R) Core(TM) i7-13700

Add Operation

Command: go test -run=^$ -benchmem -bench '^(BenchmarkAdd)$' -cpu=1

Why use -cpu=1? Since the benchmark doesn’t test with more than 1 concurrent worker, a single CPU is ideal to accurately measure performance.

| Worker Type | Queue Type | Time (ns/op) | Memory (B/op) | Allocations (allocs/op) | | ---------------- | -------------- | ------------ | ------------- | ----------------------- | | Worker | Queue | 889.6 | 112 | 2 | | | Priority | 965.7 | 128 | 3 | | ErrWorker | ErrQueue | 977.8 | 288 | 5 | | | ErrPriority | 1063 | 304 | 6 | | ResultWorker | ResultQueue | 977.3 | 337 | 5 | | | ResultPriority | 1061 | 352 | 6 |

AddAll Operation

Command: go test -run=^$ -benchmem -bench '^(BenchmarkAddAll)$' -cpu=1

| Worker Type | Queue Type | Time (ns/op) | Memory (B/op) | Allocations (allocs/op) | | ---------------- | -------------- | ------------ | ------------- | ----------------------- | | Worker | Queue | 580,399 | 130,760 | 3,002 | | | Priority | 716,784 | 146,136 | 4,002 | | ErrWorker | ErrQueue | 617,236 | 155,276 | 3,505 | | | ErrPriority | 753,532 | 170,657 | 4,505 | | ResultWorker | ResultQueue | 608,826 | 171,848 | 3,005 | | | ResultPriority | 742,789 | 187,258 | 4,005 |

[!Note]

AddAll benchmarks use a batch of 1000 items per call. The reported numbers (ns/op

Related Skills

View on GitHub
GitHub Stars182
CategoryDevelopment
Updated14d ago
Forks14

Languages

Go

Security Score

100/100

Audited on Mar 13, 2026

No findings