SkillAgentSearch skills...

Vorq

Distributed task queue for TypeScript with type-safe workflows, pluggable transports (Redis, RabbitMQ), and optional persistence.

Install / Use

/learn @baccaraaa/Vorq

README

<p align="center"> <img src="banner.png" alt="Vorq — Distributed Task Queue for TypeScript" width="100%" /> </p> <p align="center"> Distributed task queue for TypeScript with <strong>type-safe workflows</strong>, pluggable transports (Redis, RabbitMQ), and optional persistence. </p>

npm version License: MIT CI TypeScript

Features

  • Priority queues -- critical, high, medium, and low priority levels
  • Delayed tasks -- schedule tasks to execute after a specified delay
  • Retry with exponential backoff -- configurable retry policies with fixed, exponential, and jitter strategies
  • Dead letter queue -- automatic routing of permanently failed tasks
  • Task dependencies (DAG) -- define execution order with directed acyclic graphs
  • Type-safe workflows -- multi-step pipelines with compile-time validation between steps
  • Cron scheduler -- recurring tasks with cron expressions
  • Pluggable transports -- Redis and RabbitMQ adapters, in-memory for testing
  • Optional PostgreSQL persistence -- task history and metrics via Prisma
  • NestJS integration -- first-class module with @Worker, @Task, and @Scheduled decorators
  • Framework-agnostic core -- use with any Node.js application

Quick Start

Install the core package and a transport adapter:

npm install @vorq/core @vorq/redis

Create a queue, register a worker, and enqueue a task:

import { Vorq, Priority } from "@vorq/core";
import { RedisTransport } from "@vorq/redis";

const vorq = new Vorq({
  transport: new RedisTransport({ host: "localhost", port: 6379 }),
});

await vorq.createQueue("emails");

vorq.registerWorker("emails", async (ctx) => {
  console.log(`Sending email to ${ctx.payload.to}`);
  // send the email...
});

await vorq.start();

await vorq.enqueue("emails", {
  name: "welcome-email",
  payload: { to: "user@example.com", subject: "Welcome" },
});

Packages

| Package | Description | npm | |---------|-------------|-----| | @vorq/core | Queue engine, workers, retry/DLQ, DAG, scheduler, persistence | npm | | @vorq/redis | Redis transport adapter | npm | | @vorq/rabbitmq | RabbitMQ transport adapter | npm | | @vorq/nestjs | NestJS module with decorators | npm |

API Overview

Creating Queues

const queue = await vorq.createQueue("notifications", {
  maxPriority: 10,
  deadLetterQueue: "dlq:notifications",
  messageTtl: 60000,
  maxLength: 10000,
  rateLimiter: { maxPerInterval: 100, interval: 1000 },
});

// Enqueue directly through the queue handle
await queue.enqueue({ name: "push", payload: { userId: "abc" } });

// Pause and resume processing
await queue.pause();
await queue.resume();

Enqueueing Tasks

// Basic task
await vorq.enqueue("emails", {
  name: "send-receipt",
  payload: { orderId: "12345" },
});

// With priority and delay
await vorq.enqueue("emails", {
  name: "send-receipt",
  payload: { orderId: "12345" },
  options: {
    priority: Priority.HIGH,
    delay: 5000, // 5 second delay
    maxRetries: 5,
    timeout: 30000,
  },
});

// Batch enqueue
await vorq.enqueueBatch("emails", [
  { name: "send-receipt", payload: { orderId: "001" } },
  { name: "send-receipt", payload: { orderId: "002" } },
  { name: "send-receipt", payload: { orderId: "003" } },
]);

Workers

Workers process tasks from a queue. The handler receives a TaskContext with the task payload, metadata, and utilities:

vorq.registerWorker<{ url: string }>(
  "image-processing",
  async (ctx) => {
    ctx.log(`Processing image: ${ctx.payload.url}`);

    // Report progress
    await ctx.progress(25);
    const thumbnail = await generateThumbnail(ctx.payload.url);

    await ctx.progress(75);
    await uploadThumbnail(thumbnail);

    await ctx.progress(100);
    return { thumbnailUrl: thumbnail.url };
  },
  {
    concurrency: 4,
    pollInterval: 500,
    lockDuration: 60000,
    batchSize: 1,
  },
);

// Pause and resume a worker
const worker = vorq.registerWorker("emails", handler);
worker.pause();
worker.resume();
console.log(worker.isRunning());

Task Dependencies (DAG)

Define execution order so that a task only runs after its dependencies complete:

const taskA = await vorq.enqueue("pipeline", {
  name: "extract",
  payload: { source: "s3://bucket/data.csv" },
});

const taskB = await vorq.enqueue("pipeline", {
  name: "transform",
  payload: { format: "parquet" },
  options: { dependsOn: [taskA] },
});

await vorq.enqueue("pipeline", {
  name: "load",
  payload: { destination: "warehouse" },
  options: { dependsOn: [taskB] },
});

If a dependency fails and is sent to the dead letter queue, all downstream tasks are automatically abandoned.

Type-safe Workflows

Define multi-step pipelines where TypeScript validates at compile time that each step's output matches downstream expectations:

const etl = vorq
  .workflow<{ url: string }>("etl-pipeline")
  .step("fetch", { timeout: 30_000, maxRetries: 3 }, async (ctx) => {
    const res = await fetch(ctx.input.url);
    return { data: await res.json() };
  })
  .step("transform", async (ctx) => {
    // ctx.results.fetch is fully typed -- { data: unknown }
    const rows = normalize(ctx.results.fetch.data);
    return { rows, count: rows.length };
  })
  .step("load", { maxRetries: 2 }, async (ctx) => {
    // ctx.results.transform is fully typed -- { rows: ..., count: number }
    await db.insertMany(ctx.results.transform.rows);
    return { inserted: ctx.results.transform.count };
  })
  .build();

const result = await etl.run({ url: "https://api.example.com/data" });

if (result.status === "completed") {
  console.log(`Inserted ${result.results.load.inserted} rows`);
  //                      ^^^ No optional chaining -- TS knows it's complete
}

Compile-time guarantees:

  • Accessing a non-existent step result -> TypeScript error
  • Duplicate step names -> TypeScript error
  • Discriminated union on result: completed gives full types, failed gives partial

Scheduling (Cron)

Schedule recurring tasks with cron expressions:

vorq.schedule("reports", "0 9 * * MON", {
  name: "weekly-report",
  payload: { type: "sales" },
});

// Prevent overlapping runs
vorq.schedule(
  "cleanup",
  "*/5 * * * *",
  { name: "temp-cleanup", payload: {} },
  { overlap: false },
);

Dead Letter Queue

Tasks that exhaust their retries are moved to the dead letter queue:

// Retrieve dead-lettered tasks
const deadLetters = await vorq.getDLQ("emails");

for (const record of deadLetters) {
  console.log(record.taskId, record.error, record.attempts);
}

// Retry individual tasks or the entire DLQ
await vorq.retryFromDLQ(deadLetters[0].taskId);
await vorq.retryAllFromDLQ("emails");

// Purge the DLQ
await vorq.purgeDLQ("emails");

Events

Listen to lifecycle events for monitoring, logging, or custom integrations:

vorq.on("task.enqueued", (data) => {
  console.log(`Task ${data.taskId} enqueued to ${data.queue}`);
});

vorq.on("task.completed", (data) => {
  console.log(`Task ${data.taskId} completed in ${data.duration}ms`);
});

vorq.on("task.failed", (data) => {
  console.error(`Task ${data.taskId} failed on attempt ${data.attempt}:`, data.error);
});

vorq.on("task.retrying", (data) => {
  console.log(`Retrying ${data.taskId}, attempt ${data.attempt}, next delay ${data.nextDelay}ms`);
});

vorq.on("task.deadLettered", (data) => {
  alertOps(`Task ${data.taskId} dead-lettered after ${data.attempts} attempts`);
});

vorq.on("task.progress", (data) => {
  console.log(`Task ${data.taskId}: ${data.percent}%`);
});

Retry and Backoff

Configure retry behavior globally or per task:

import {
  Vorq,
  ExponentialBackoff,
  ExponentialJitterBackoff,
  FixedBackoff,
} from "@vorq/core";

// Global defaults
const vorq = new Vorq({
  transport,
  defaults: {
    retryPolicy: {
      maxRetries: 5,
      backoff: new ExponentialBackoff(1000, 60000), // 1s base, 60s max
      retryableErrors: ["TimeoutError", "NetworkError"],
    },
  },
});

// Available backoff strategies:
new FixedBackoff(2000);                        // constant 2s delay
new ExponentialBackoff(1000, 30000);            // 1s, 2s, 4s, ... up to 30s
new ExponentialJitterBackoff(1000, 30000);      // exponential with random jitter

// Per-task override
await vorq.enqueue("emails", {
  name: "send-notification",
  payload: { userId: "abc" },
  options: {
    maxRetries: 10,
    backoff: new FixedBackoff(5000),
  },
});

Transport Adapters

Vorq ships with two production-ready transports. Pick the one that fits your infrastructure:

| Capability | Redis (@vorq/redis) | RabbitMQ (@vorq/rabbitmq) | |---|---|---| | Priority mechanism | Sorted sets per priority level | Native priority queues (x-max-priority) | | Delayed tasks | Sorted set with score = timestamp | Dead-letter exchange + per-message TTL | | Scaling | Add replicas; partition by key prefix | Add consumers; built-in competing-consumer pattern | | Persistence | AOF / RDB snapshots | Durable queues + persistent

Related Skills

View on GitHub
GitHub Stars6
CategoryDevelopment
Updated9d ago
Forks0

Languages

TypeScript

Security Score

90/100

Audited on Mar 27, 2026

No findings