SkillAgentSearch skills...

Groupmq

A fast, reliable Redis-backed per-group FIFO queue for Node + TypeScript with guaranteed job ordering and parallel processing across groups.

Install / Use

/learn @Openpanel-dev/Groupmq
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

<p align="center"> <img src="website/public/favicon/web-app-manifest-512x512.png" width="200px" height="200px" /> <h1 align="center"><b>GroupMQ, Redis Group Queue</b></h1> <p align="center"> A fast, reliable Redis-backed per-group FIFO queue for Node + TypeScript with guaranteed job ordering and parallel processing across groups. <br /> <br /> <a href="https://openpanel-dev.github.io/groupmq/">Website</a> · <a href="https://openpanel.dev">Created by OpenPanel.dev</a> </p> <br /> <br /> </p>

Install

npm i groupmq ioredis

Quick start

import Redis from "ioredis";
import { Queue, Worker } from "groupmq";

const redis = new Redis("redis://127.0.0.1:6379");

const queue = new Queue({
  redis,
  namespace: "orders", // Will be prefixed with 'groupmq:'
  jobTimeoutMs: 30_000, // How long before job times out
  logger: true, // Enable logging (optional)
});

await queue.add({
  groupId: "user:42",
  data: { type: "charge", amount: 999 },
  orderMs: Date.now(), // or event.createdAtMs
  maxAttempts: 5,
});

const worker = new Worker({
  queue,
  concurrency: 1, // Process 1 job at a time (can increase for parallel processing)
  handler: async (job) => {
    console.log(`Processing:`, job.data);
  },
});

worker.run();

Key Features

Key Features

  • Per-group FIFO ordering - Jobs within the same group process in strict order, perfect for user workflows, data pipelines, and sequential operations
  • Parallel processing across groups - Process multiple groups simultaneously while maintaining order within each group
  • BullMQ-compatible API - Familiar interface with enhanced group-based capabilities
  • High performance - High throughput with low latency (see benchmarks)
  • Built-in ordering strategies - Handle out-of-order job arrivals with 'none', 'scheduler', or 'in-memory' methods
  • Automatic recovery - Stalled job detection and connection error handling with exponential backoff
  • Production ready - Atomic operations, graceful shutdown, and comprehensive logging
  • Zero polling - Efficient blocking operations prevent wasteful Redis calls

Inspiration from BullMQ

GroupMQ is heavily inspired by BullMQ, a fantastic library and one of the most popular Redis-based job queue libraries for Node.js. We've taken many core concepts and design patterns from BullMQ while adapting them for our specific use case of per-group FIFO processing.

Key differences from BullMQ:

  • Per-group FIFO ordering, jobs within the same group are processed in strict order
  • Group-based concurrency, only one job per group can be active at a time
  • Ordered processing, built-in support for orderMs timestamp-based ordering
  • Cross-group parallelism, multiple groups can be processed simultaneously
  • No job types, simplified to a single job, instead use union typed data { type: 'paint', data: { ... } } | { type: 'repair', data: { ... } }

We're grateful to the BullMQ team for their excellent work and the foundation they've provided for the Redis job queue ecosystem.

Third-Party Code Attribution

While GroupMQ is inspired by BullMQ's design and concepts, we have also directly copied some code from BullMQ:

  • src/async-fifo-queue.ts - This file contains code copied from BullMQ's AsyncFifoQueue implementation. BullMQ's implementation is well-designed and fits our needs perfectly, so we've used it directly rather than reimplementing it.

This code is used under the MIT License. The original copyright notice and license can be found at:

  • BullMQ Repository: https://github.com/taskforcesh/bullmq
  • BullMQ License: https://github.com/taskforcesh/bullmq/blob/main/LICENSE

Original copyright: Copyright (c) Taskforce.sh and contributors

Queue Options

type QueueOptions = {
  redis: Redis;                    // Redis client instance (required)
  namespace: string;                // Unique queue name, gets 'groupmq:' prefix (required)
  logger?: boolean | LoggerInterface; // Enable logging (default: false)
  jobTimeoutMs?: number;            // Job processing timeout (default: 30000ms)
  maxAttempts?: number;             // Default max retry attempts (default: 3)
  reserveScanLimit?: number;        // Groups to scan when reserving (default: 20)
  keepCompleted?: number;           // Number of completed jobs to retain (default: 0)
  keepFailed?: number;              // Number of failed jobs to retain (default: 0)
  schedulerLockTtlMs?: number;      // Scheduler lock TTL (default: 1500ms)
  orderingMethod?: OrderingMethod;  // Ordering strategy (default: 'none')
  orderingWindowMs?: number;        // Time window for ordering (required for non-'none' methods)
  orderingMaxWaitMultiplier?: number; // Max grace period multiplier for in-memory (default: 3)
  orderingGracePeriodDecay?: number;  // Grace period decay factor for in-memory (default: 1.0)
  orderingMaxBatchSize?: number;      // Max jobs to collect in batch for in-memory (default: 10)
};

type OrderingMethod = 'none' | 'scheduler' | 'in-memory';

Ordering Methods:

  • 'none' - No ordering guarantees (fastest, zero overhead, no extra latency)
  • 'scheduler' - Redis buffering for large windows (≥1000ms, requires scheduler, adds latency)
  • 'in-memory' - Worker collection for small windows (50-500ms, no scheduler, adds latency per batch)

See Ordering Methods for detailed comparison.

Worker Options

type WorkerOptions<T> = {
  queue: Queue<T>;                           // Queue instance to process jobs from (required)
  handler: (job: ReservedJob<T>) => Promise<unknown>; // Job processing function (required)
  name?: string;                             // Worker name for logging (default: queue.name)
  logger?: boolean | LoggerInterface;        // Enable logging (default: false)
  concurrency?: number;                      // Number of jobs to process in parallel (default: 1)
  heartbeatMs?: number;                      // Heartbeat interval (default: Math.max(1000, jobTimeoutMs/3))
  onError?: (err: unknown, job?: ReservedJob<T>) => void; // Error handler
  maxAttempts?: number;                      // Max retry attempts (default: queue.maxAttempts)
  backoff?: BackoffStrategy;                 // Retry backoff function (default: exponential with jitter)
  enableCleanup?: boolean;                   // Periodic cleanup (default: true)
  cleanupIntervalMs?: number;                // Cleanup frequency (default: 60000ms)
  schedulerIntervalMs?: number;              // Scheduler frequency (default: adaptive)
  blockingTimeoutSec?: number;               // Blocking reserve timeout (default: 5s)
  atomicCompletion?: boolean;                // Atomic completion + next reserve (default: true)
  stalledInterval?: number;                  // Check if stalled every N ms (default: 30000)
  maxStalledCount?: number;                  // Fail after N stalls (default: 1)
  stalledGracePeriod?: number;               // Grace period before considering stalled (default: 0)
};

type BackoffStrategy = (attempt: number) => number; // returns delay in ms

Job Options

When adding a job to the queue:

await queue.add({
  groupId: string;           // Required: Group ID for FIFO processing
  data: T;                   // Required: Job payload data
  orderMs?: number;          // Timestamp for ordering (default: Date.now())
  maxAttempts?: number;      // Max retry attempts (default: queue.maxAttempts)
  jobId?: string;            // Custom job ID (default: auto-generated UUID)
  delay?: number;            // Delay in ms before job becomes available
  runAt?: Date | number;     // Specific time to run the job
  repeat?: RepeatOptions;    // Repeating job configuration (cron or interval)
});

type RepeatOptions = 
  | { every: number }                    // Repeat every N milliseconds
  | { pattern: string };                 // Cron pattern (standard 5-field format)

Example with delay:

await queue.add({
  groupId: 'user:123',
  data: { action: 'send-reminder' },
  delay: 3600000, // Run in 1 hour
});

Example with specific time:

await queue.add({
  groupId: 'user:123',
  data: { action: 'scheduled-report' },
  runAt: new Date('2025-12-31T23:59:59Z'),
});

Worker Concurrency

Workers support configurable concurrency to process multiple jobs in parallel from different groups:

const worker = new Worker({
  queue,
  concurrency: 8, // Process up to 8 jobs simultaneously
  handler: async (job) => {
    // Jobs from different groups can run in parallel
    // Jobs from the same group still run sequentially
  },
});

Benefits:

  • Higher throughput for multi-group workloads
  • Efficient resource utilization
  • Still maintains per-group FIFO ordering

Considerations:

  • Each job consumes memory and resources
  • Set concurrency based on job duration and system resources
  • Monitor Redis connection pool (ioredis default: 10 connections)

Logging

Both Queue and Worker support optional logging for debugging and monitoring:

// Enable default logger
const queue = new Queue({
  redis,
  namespace: 'orders',
  logger: true, // Logs to console with queue name prefix
});

const worker = new Worker({
  queue,
  logger: true, // Logs to console with worker name prefix
  handler: async (job) => { /* ... */ },
});

Custom logger:

Works out of the box with both pino and winston

import type { LoggerInterface } from 'groupmq';

const customLogger: LoggerInterface = {
  debug: (msg: string, ...args: any[]) => { /* custom logging */ },
  info: (msg: string, ...args: any[]) => { /* custom logging */ },
  warn: (msg: string, ...args: any[]) => { /* custom logging */ },
  error: (msg: string, ...args: any[]) => { /* custom logging */ },
};

const q
View on GitHub
GitHub Stars361
CategoryDevelopment
Updated1d ago
Forks14

Languages

TypeScript

Security Score

85/100

Audited on Mar 26, 2026

No findings