Strego
A modern distributed task queue for Go. Redis Streams with consumer groups for reliable task processing, scheduled tasks, automatic retries with exponential backoff, dead letter queues, and built-in web UI. Built-in idempotency support. Optional PostgreSQL for task history.
Install / Use
/learn @erennakbas/StregoREADME
strego
A modern distributed task queue library for Go, built on Redis Streams with consumer groups for reliable task processing and horizontal scaling.
Features
- Mostly Exactly-Once Delivery - Redis Streams consumer groups + mandatory application-level idempotency
- Redis Streams - Native consumer groups, automatic crash recovery, horizontal scaling
- Typed Handlers - Generic handlers with automatic payload decoding, zero boilerplate
- JSON by Default - Human-readable, debuggable, widely supported (custom serializers available)
- Dead Letter Queue - Failed tasks after max retries
- Scheduled Tasks - Delayed execution with future processing time
- Retry with Backoff - Exponential backoff strategy
- Crash Recovery - Automatic claiming of orphaned tasks from dead workers
- Multi-worker - Horizontal scaling with consumer groups
- PostgreSQL (optional) - Task history, search, filtering for UI
- Built-in Web UI - Real-time dashboard with consumer group monitoring
- Redis Streams: Live queue stats, active consumers
- PostgreSQL: Task history and search (optional)
- Logrus logging - Structured logging out of the box
Delivery Guarantees
Strego provides mostly exactly-once delivery under normal operation:
- ✅ Redis Streams consumer groups ensure each task delivered to ONE worker per group
- ✅ Pending list + XAUTOCLAIM recover tasks from crashed workers
- ✅ Application-level idempotency (mandatory) handles duplicate business operations
- ✅ Normal operation = exactly-once - Worker processes, ACKs, task completes once
When Tasks Can Be Processed Twice
-
Long-running handlers - Worker still processing when
ClaimStaleAfterexpires- Solution: Set
ClaimStaleAfter > max handler duration(see IDEMPOTENCY.md)
- Solution: Set
-
Duplicate business operations - Same operation enqueued multiple times (e.g., network retries, API idempotency key misuse, concurrent requests)
- Each enqueue creates a unique task ID, but represents the same business operation
- Solution: Use
WithUniqueKey()for enqueue-time deduplication + mandatory business ID validation in handler
When Tasks Are Lost (Redis Failure)
CRITICAL: Strego cannot prevent task loss if Redis crashes!
- ❌ Redis in-memory mode (default) - All tasks in Redis lost on crash
- ❌ AOF/RDB persistence - Still loses recent tasks (seconds to minutes)
- ❌ Redis down - Application cannot enqueue or process tasks (system-wide outage)
Reality check:
- Redis failure = system-wide outage (not just task loss)
- Most deployments use Redis in-memory for speed
- If you need durability: Use PostgreSQL store + Redis persistence + monitoring + alerts
See IDEMPOTENCY.md for detailed scenarios and best practices.
Documentation
- Architecture Documentation - Detailed architecture diagrams and design decisions
- Idempotency Implementation - Delivery guarantees, edge cases, and best practices
Installation
go get github.com/erennakbas/strego@latest
Or install a specific version:
go get github.com/erennakbas/strego@v0.3.1
Quick Start
Producer
package main
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
"github.com/erennakbas/strego"
"github.com/erennakbas/strego/broker"
brokerRedis "github.com/erennakbas/strego/broker/redis"
)
func main() {
// Setup logger
logger := logrus.New()
// Connect to Redis
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// Create broker
b := brokerRedis.NewBroker(redisClient, brokerRedis.WithConsumerConfig(broker.ConsumerConfig{
Group: "my-app",
BatchSize: 10,
BlockDuration: 5 * time.Second,
}))
client := strego.NewClient(b, strego.WithClientLogger(logger))
// Create and enqueue a task
task := strego.NewTaskFromBytes("email:send",
[]byte(`{"to":"user@example.com","subject":"Welcome!"}`),
strego.WithQueue("default"),
strego.WithMaxRetry(3),
)
info, err := client.Enqueue(context.Background(), task)
if err != nil {
logger.WithError(err).Fatal("failed to enqueue")
}
logger.WithField("task_id", info.ID).Info("task enqueued")
}
Consumer
package main
import (
"context"
"time"
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
"github.com/erennakbas/strego"
"github.com/erennakbas/strego/broker"
brokerRedis "github.com/erennakbas/strego/broker/redis"
)
func main() {
logger := logrus.New()
// Connect to Redis
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// Create broker
b := brokerRedis.NewBroker(redisClient, brokerRedis.WithConsumerConfig(broker.ConsumerConfig{
Group: "my-app",
BatchSize: 10,
BlockDuration: 5 * time.Second,
ClaimStaleAfter: 5 * time.Minute, // Claim orphaned tasks from dead workers
Concurrency: 10, // Process up to 10 tasks concurrently per batch
}))
// Create server
server := strego.NewServer(b,
strego.WithQueues("default", "critical"),
strego.WithServerLogger(logger),
)
// Register handlers (always implement application-level idempotency for critical ops!)
server.HandleFunc("email:send", func(ctx context.Context, task *strego.Task) error {
logrus.WithField("payload", string(task.Payload())).Info("processing email")
// Application-level idempotency check (recommended!)
// if db.EmailAlreadySent(emailID) { return nil }
// Your email sending logic here
// SendEmail(...)
// Record in database
// db.MarkEmailAsSent(emailID)
return nil
})
// Start processing (blocks until shutdown)
if err := server.Start(); err != nil {
logger.WithError(err).Fatal("server error")
}
}
With Web UI
import "github.com/erennakbas/strego/ui"
// Create UI server
uiServer, _ := ui.NewServer(ui.Config{
Addr: ":8080",
Broker: b,
Store: pgStore, // optional PostgreSQL store
Logger: logger,
})
go uiServer.Start()
// Visit http://localhost:8080
Task Options
task := strego.NewTaskFromBytes("task:type", payload,
strego.WithQueue("critical"), // Queue name
strego.WithMaxRetry(5), // Max retry attempts
strego.WithTimeout(30*time.Second), // Processing timeout
strego.WithProcessIn(10*time.Minute), // Delay execution
strego.WithPriority(5), // Priority (0-10)
strego.WithUniqueKey("key", 1*time.Hour), // Deduplication
strego.WithLabels(map[string]string{ // Custom labels
"user_id": "123",
}),
)
Custom Serializers
Strego uses JSON by default for wide compatibility and debuggability. For performance-critical workloads, you can use custom serializers:
// Define custom serializer (e.g., Protobuf)
type ProtobufSerializer struct{}
func (p ProtobufSerializer) Marshal(v interface{}) ([]byte, error) {
return proto.Marshal(v.(proto.Message))
}
func (p ProtobufSerializer) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message))
}
func (p ProtobufSerializer) Name() string { return "protobuf" }
// Client: Enqueue with custom serializer
task, _ := strego.NewTaskWith("payment", payment, protobufSerializer)
client.Enqueue(ctx, task)
// Server: Handler with custom serializer
strego.HandleWith(server, "payment", protobufSerializer,
func(ctx context.Context, payment *PaymentProto, task *strego.Task) error {
return processPayment(payment)
})
Performance:
- Protobuf: 3-5x faster, 2-10x smaller
- Msgpack: 2-3x faster, 2x smaller
- JSON (default): Human-readable, debuggable
Tradeoff: Binary payloads lose debuggability in Redis/PostgreSQL.
See examples/custom-serializer for complete example.
Broker Configuration
b := brokerRedis.NewBroker(redisClient, brokerRedis.WithConsumerConfig(broker.ConsumerConfig{
Group: "my-workers", // Consumer group name
Consumer: "", // Auto-generated if empty
BatchSize: 10, // Tasks to fetch per batch
BlockDuration: 5 * time.Second, // Wait time for new messages
ClaimStaleAfter: 5 * time.Minute, // Claim orphaned tasks from crashed workers
ClaimCheckInterval: 30 * time.Second, // How often to check for stale tasks
SkipOwnPendingRecovery: false, // Recover own pending messages on startup
Concurrency: 1, // Concurrent tasks per batch (1=sequential)
}))
Configuration Options:
- Group: Consumer group name. Multiple workers in the same group share the workload. **⚠️ IMPORTANT: Us
