SkillAgentSearch skills...

Strego

A modern distributed task queue for Go. Redis Streams with consumer groups for reliable task processing, scheduled tasks, automatic retries with exponential backoff, dead letter queues, and built-in web UI. Built-in idempotency support. Optional PostgreSQL for task history.

Install / Use

/learn @erennakbas/Strego
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

strego

Go Reference Go Report Card Tests codecov License: MIT Release

A modern distributed task queue library for Go, built on Redis Streams with consumer groups for reliable task processing and horizontal scaling.

Features

  • Mostly Exactly-Once Delivery - Redis Streams consumer groups + mandatory application-level idempotency
  • Redis Streams - Native consumer groups, automatic crash recovery, horizontal scaling
  • Typed Handlers - Generic handlers with automatic payload decoding, zero boilerplate
  • JSON by Default - Human-readable, debuggable, widely supported (custom serializers available)
  • Dead Letter Queue - Failed tasks after max retries
  • Scheduled Tasks - Delayed execution with future processing time
  • Retry with Backoff - Exponential backoff strategy
  • Crash Recovery - Automatic claiming of orphaned tasks from dead workers
  • Multi-worker - Horizontal scaling with consumer groups
  • PostgreSQL (optional) - Task history, search, filtering for UI
  • Built-in Web UI - Real-time dashboard with consumer group monitoring
    • Redis Streams: Live queue stats, active consumers
    • PostgreSQL: Task history and search (optional)
  • Logrus logging - Structured logging out of the box

Delivery Guarantees

Strego provides mostly exactly-once delivery under normal operation:

  • Redis Streams consumer groups ensure each task delivered to ONE worker per group
  • Pending list + XAUTOCLAIM recover tasks from crashed workers
  • Application-level idempotency (mandatory) handles duplicate business operations
  • Normal operation = exactly-once - Worker processes, ACKs, task completes once

When Tasks Can Be Processed Twice

  1. Long-running handlers - Worker still processing when ClaimStaleAfter expires

    • Solution: Set ClaimStaleAfter > max handler duration (see IDEMPOTENCY.md)
  2. Duplicate business operations - Same operation enqueued multiple times (e.g., network retries, API idempotency key misuse, concurrent requests)

    • Each enqueue creates a unique task ID, but represents the same business operation
    • Solution: Use WithUniqueKey() for enqueue-time deduplication + mandatory business ID validation in handler

When Tasks Are Lost (Redis Failure)

CRITICAL: Strego cannot prevent task loss if Redis crashes!

  • Redis in-memory mode (default) - All tasks in Redis lost on crash
  • AOF/RDB persistence - Still loses recent tasks (seconds to minutes)
  • Redis down - Application cannot enqueue or process tasks (system-wide outage)

Reality check:

  • Redis failure = system-wide outage (not just task loss)
  • Most deployments use Redis in-memory for speed
  • If you need durability: Use PostgreSQL store + Redis persistence + monitoring + alerts

See IDEMPOTENCY.md for detailed scenarios and best practices.

Documentation

Installation

go get github.com/erennakbas/strego@latest

Or install a specific version:

go get github.com/erennakbas/strego@v0.3.1

Quick Start

Producer

package main

import (
    "context"
    "fmt"
    "time"
    
    "github.com/redis/go-redis/v9"
    "github.com/sirupsen/logrus"
    
    "github.com/erennakbas/strego"
    "github.com/erennakbas/strego/broker"
    brokerRedis "github.com/erennakbas/strego/broker/redis"
)

func main() {
    // Setup logger
    logger := logrus.New()
    
    // Connect to Redis
    redisClient := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    // Create broker
    b := brokerRedis.NewBroker(redisClient, brokerRedis.WithConsumerConfig(broker.ConsumerConfig{
        Group:         "my-app",
        BatchSize:     10,
        BlockDuration: 5 * time.Second,
    }))
    
    client := strego.NewClient(b, strego.WithClientLogger(logger))

    // Create and enqueue a task
    task := strego.NewTaskFromBytes("email:send",
        []byte(`{"to":"user@example.com","subject":"Welcome!"}`),
        strego.WithQueue("default"),
        strego.WithMaxRetry(3),
    )

    info, err := client.Enqueue(context.Background(), task)
    if err != nil {
        logger.WithError(err).Fatal("failed to enqueue")
    }

    logger.WithField("task_id", info.ID).Info("task enqueued")
}

Consumer

package main

import (
    "context"
    "time"
    
    "github.com/redis/go-redis/v9"
    "github.com/sirupsen/logrus"
    
    "github.com/erennakbas/strego"
    "github.com/erennakbas/strego/broker"
    brokerRedis "github.com/erennakbas/strego/broker/redis"
)

func main() {
    logger := logrus.New()
    
    // Connect to Redis
    redisClient := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    // Create broker
    b := brokerRedis.NewBroker(redisClient, brokerRedis.WithConsumerConfig(broker.ConsumerConfig{
        Group:           "my-app",
        BatchSize:       10,
        BlockDuration:   5 * time.Second,
        ClaimStaleAfter: 5 * time.Minute, // Claim orphaned tasks from dead workers
        Concurrency:     10,               // Process up to 10 tasks concurrently per batch
    }))
    
    // Create server
    server := strego.NewServer(b,
        strego.WithQueues("default", "critical"),
        strego.WithServerLogger(logger),
    )

    // Register handlers (always implement application-level idempotency for critical ops!)
    server.HandleFunc("email:send", func(ctx context.Context, task *strego.Task) error {
        logrus.WithField("payload", string(task.Payload())).Info("processing email")
        
        // Application-level idempotency check (recommended!)
        // if db.EmailAlreadySent(emailID) { return nil }
        
        // Your email sending logic here
        // SendEmail(...)
        
        // Record in database
        // db.MarkEmailAsSent(emailID)
        
        return nil
    })

    // Start processing (blocks until shutdown)
    if err := server.Start(); err != nil {
        logger.WithError(err).Fatal("server error")
    }
}

With Web UI

import "github.com/erennakbas/strego/ui"

// Create UI server
uiServer, _ := ui.NewServer(ui.Config{
    Addr:   ":8080",
    Broker: b,
    Store:  pgStore, // optional PostgreSQL store
    Logger: logger,
})

go uiServer.Start()
// Visit http://localhost:8080

Task Options

task := strego.NewTaskFromBytes("task:type", payload,
    strego.WithQueue("critical"),          // Queue name
    strego.WithMaxRetry(5),                // Max retry attempts
    strego.WithTimeout(30*time.Second),    // Processing timeout
    strego.WithProcessIn(10*time.Minute),  // Delay execution
    strego.WithPriority(5),                // Priority (0-10)
    strego.WithUniqueKey("key", 1*time.Hour), // Deduplication
    strego.WithLabels(map[string]string{   // Custom labels
        "user_id": "123",
    }),
)

Custom Serializers

Strego uses JSON by default for wide compatibility and debuggability. For performance-critical workloads, you can use custom serializers:

// Define custom serializer (e.g., Protobuf)
type ProtobufSerializer struct{}

func (p ProtobufSerializer) Marshal(v interface{}) ([]byte, error) {
    return proto.Marshal(v.(proto.Message))
}

func (p ProtobufSerializer) Unmarshal(data []byte, v interface{}) error {
    return proto.Unmarshal(data, v.(proto.Message))
}

func (p ProtobufSerializer) Name() string { return "protobuf" }

// Client: Enqueue with custom serializer
task, _ := strego.NewTaskWith("payment", payment, protobufSerializer)
client.Enqueue(ctx, task)

// Server: Handler with custom serializer
strego.HandleWith(server, "payment", protobufSerializer,
    func(ctx context.Context, payment *PaymentProto, task *strego.Task) error {
        return processPayment(payment)
    })

Performance:

  • Protobuf: 3-5x faster, 2-10x smaller
  • Msgpack: 2-3x faster, 2x smaller
  • JSON (default): Human-readable, debuggable

Tradeoff: Binary payloads lose debuggability in Redis/PostgreSQL.

See examples/custom-serializer for complete example.

Broker Configuration

b := brokerRedis.NewBroker(redisClient, brokerRedis.WithConsumerConfig(broker.ConsumerConfig{
    Group:                  "my-workers",         // Consumer group name
    Consumer:               "",                   // Auto-generated if empty
    BatchSize:              10,                   // Tasks to fetch per batch
    BlockDuration:          5 * time.Second,      // Wait time for new messages
    ClaimStaleAfter:        5 * time.Minute,      // Claim orphaned tasks from crashed workers
    ClaimCheckInterval:     30 * time.Second,     // How often to check for stale tasks
    SkipOwnPendingRecovery: false,                // Recover own pending messages on startup
    Concurrency:            1,                    // Concurrent tasks per batch (1=sequential)
}))

Configuration Options:

  • Group: Consumer group name. Multiple workers in the same group share the workload. **⚠️ IMPORTANT: Us
View on GitHub
GitHub Stars12
CategoryData
Updated2mo ago
Forks0

Languages

Go

Security Score

90/100

Audited on Jan 27, 2026

No findings