Vorq
Distributed task queue for TypeScript with type-safe workflows, pluggable transports (Redis, RabbitMQ), and optional persistence.
Install / Use
/learn @baccaraaa/VorqREADME
Features
- Priority queues -- critical, high, medium, and low priority levels
- Delayed tasks -- schedule tasks to execute after a specified delay
- Retry with exponential backoff -- configurable retry policies with fixed, exponential, and jitter strategies
- Dead letter queue -- automatic routing of permanently failed tasks
- Task dependencies (DAG) -- define execution order with directed acyclic graphs
- Type-safe workflows -- multi-step pipelines with compile-time validation between steps
- Cron scheduler -- recurring tasks with cron expressions
- Pluggable transports -- Redis and RabbitMQ adapters, in-memory for testing
- Optional PostgreSQL persistence -- task history and metrics via Prisma
- NestJS integration -- first-class module with
@Worker,@Task, and@Scheduleddecorators - Framework-agnostic core -- use with any Node.js application
Quick Start
Install the core package and a transport adapter:
npm install @vorq/core @vorq/redis
Create a queue, register a worker, and enqueue a task:
import { Vorq, Priority } from "@vorq/core";
import { RedisTransport } from "@vorq/redis";
const vorq = new Vorq({
transport: new RedisTransport({ host: "localhost", port: 6379 }),
});
await vorq.createQueue("emails");
vorq.registerWorker("emails", async (ctx) => {
console.log(`Sending email to ${ctx.payload.to}`);
// send the email...
});
await vorq.start();
await vorq.enqueue("emails", {
name: "welcome-email",
payload: { to: "user@example.com", subject: "Welcome" },
});
Packages
| Package | Description | npm |
|---------|-------------|-----|
| @vorq/core | Queue engine, workers, retry/DLQ, DAG, scheduler, persistence | |
|
@vorq/redis | Redis transport adapter | |
|
@vorq/rabbitmq | RabbitMQ transport adapter | |
|
@vorq/nestjs | NestJS module with decorators | |
API Overview
Creating Queues
const queue = await vorq.createQueue("notifications", {
maxPriority: 10,
deadLetterQueue: "dlq:notifications",
messageTtl: 60000,
maxLength: 10000,
rateLimiter: { maxPerInterval: 100, interval: 1000 },
});
// Enqueue directly through the queue handle
await queue.enqueue({ name: "push", payload: { userId: "abc" } });
// Pause and resume processing
await queue.pause();
await queue.resume();
Enqueueing Tasks
// Basic task
await vorq.enqueue("emails", {
name: "send-receipt",
payload: { orderId: "12345" },
});
// With priority and delay
await vorq.enqueue("emails", {
name: "send-receipt",
payload: { orderId: "12345" },
options: {
priority: Priority.HIGH,
delay: 5000, // 5 second delay
maxRetries: 5,
timeout: 30000,
},
});
// Batch enqueue
await vorq.enqueueBatch("emails", [
{ name: "send-receipt", payload: { orderId: "001" } },
{ name: "send-receipt", payload: { orderId: "002" } },
{ name: "send-receipt", payload: { orderId: "003" } },
]);
Workers
Workers process tasks from a queue. The handler receives a TaskContext with the task payload, metadata, and utilities:
vorq.registerWorker<{ url: string }>(
"image-processing",
async (ctx) => {
ctx.log(`Processing image: ${ctx.payload.url}`);
// Report progress
await ctx.progress(25);
const thumbnail = await generateThumbnail(ctx.payload.url);
await ctx.progress(75);
await uploadThumbnail(thumbnail);
await ctx.progress(100);
return { thumbnailUrl: thumbnail.url };
},
{
concurrency: 4,
pollInterval: 500,
lockDuration: 60000,
batchSize: 1,
},
);
// Pause and resume a worker
const worker = vorq.registerWorker("emails", handler);
worker.pause();
worker.resume();
console.log(worker.isRunning());
Task Dependencies (DAG)
Define execution order so that a task only runs after its dependencies complete:
const taskA = await vorq.enqueue("pipeline", {
name: "extract",
payload: { source: "s3://bucket/data.csv" },
});
const taskB = await vorq.enqueue("pipeline", {
name: "transform",
payload: { format: "parquet" },
options: { dependsOn: [taskA] },
});
await vorq.enqueue("pipeline", {
name: "load",
payload: { destination: "warehouse" },
options: { dependsOn: [taskB] },
});
If a dependency fails and is sent to the dead letter queue, all downstream tasks are automatically abandoned.
Type-safe Workflows
Define multi-step pipelines where TypeScript validates at compile time that each step's output matches downstream expectations:
const etl = vorq
.workflow<{ url: string }>("etl-pipeline")
.step("fetch", { timeout: 30_000, maxRetries: 3 }, async (ctx) => {
const res = await fetch(ctx.input.url);
return { data: await res.json() };
})
.step("transform", async (ctx) => {
// ctx.results.fetch is fully typed -- { data: unknown }
const rows = normalize(ctx.results.fetch.data);
return { rows, count: rows.length };
})
.step("load", { maxRetries: 2 }, async (ctx) => {
// ctx.results.transform is fully typed -- { rows: ..., count: number }
await db.insertMany(ctx.results.transform.rows);
return { inserted: ctx.results.transform.count };
})
.build();
const result = await etl.run({ url: "https://api.example.com/data" });
if (result.status === "completed") {
console.log(`Inserted ${result.results.load.inserted} rows`);
// ^^^ No optional chaining -- TS knows it's complete
}
Compile-time guarantees:
- Accessing a non-existent step result -> TypeScript error
- Duplicate step names -> TypeScript error
- Discriminated union on result:
completedgives full types,failedgives partial
Scheduling (Cron)
Schedule recurring tasks with cron expressions:
vorq.schedule("reports", "0 9 * * MON", {
name: "weekly-report",
payload: { type: "sales" },
});
// Prevent overlapping runs
vorq.schedule(
"cleanup",
"*/5 * * * *",
{ name: "temp-cleanup", payload: {} },
{ overlap: false },
);
Dead Letter Queue
Tasks that exhaust their retries are moved to the dead letter queue:
// Retrieve dead-lettered tasks
const deadLetters = await vorq.getDLQ("emails");
for (const record of deadLetters) {
console.log(record.taskId, record.error, record.attempts);
}
// Retry individual tasks or the entire DLQ
await vorq.retryFromDLQ(deadLetters[0].taskId);
await vorq.retryAllFromDLQ("emails");
// Purge the DLQ
await vorq.purgeDLQ("emails");
Events
Listen to lifecycle events for monitoring, logging, or custom integrations:
vorq.on("task.enqueued", (data) => {
console.log(`Task ${data.taskId} enqueued to ${data.queue}`);
});
vorq.on("task.completed", (data) => {
console.log(`Task ${data.taskId} completed in ${data.duration}ms`);
});
vorq.on("task.failed", (data) => {
console.error(`Task ${data.taskId} failed on attempt ${data.attempt}:`, data.error);
});
vorq.on("task.retrying", (data) => {
console.log(`Retrying ${data.taskId}, attempt ${data.attempt}, next delay ${data.nextDelay}ms`);
});
vorq.on("task.deadLettered", (data) => {
alertOps(`Task ${data.taskId} dead-lettered after ${data.attempts} attempts`);
});
vorq.on("task.progress", (data) => {
console.log(`Task ${data.taskId}: ${data.percent}%`);
});
Retry and Backoff
Configure retry behavior globally or per task:
import {
Vorq,
ExponentialBackoff,
ExponentialJitterBackoff,
FixedBackoff,
} from "@vorq/core";
// Global defaults
const vorq = new Vorq({
transport,
defaults: {
retryPolicy: {
maxRetries: 5,
backoff: new ExponentialBackoff(1000, 60000), // 1s base, 60s max
retryableErrors: ["TimeoutError", "NetworkError"],
},
},
});
// Available backoff strategies:
new FixedBackoff(2000); // constant 2s delay
new ExponentialBackoff(1000, 30000); // 1s, 2s, 4s, ... up to 30s
new ExponentialJitterBackoff(1000, 30000); // exponential with random jitter
// Per-task override
await vorq.enqueue("emails", {
name: "send-notification",
payload: { userId: "abc" },
options: {
maxRetries: 10,
backoff: new FixedBackoff(5000),
},
});
Transport Adapters
Vorq ships with two production-ready transports. Pick the one that fits your infrastructure:
| Capability | Redis (@vorq/redis) | RabbitMQ (@vorq/rabbitmq) |
|---|---|---|
| Priority mechanism | Sorted sets per priority level | Native priority queues (x-max-priority) |
| Delayed tasks | Sorted set with score = timestamp | Dead-letter exchange + per-message TTL |
| Scaling | Add replicas; partition by key prefix | Add consumers; built-in competing-consumer pattern |
| Persistence | AOF / RDB snapshots | Durable queues + persistent
Related Skills
node-connect
349.0kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
109.4kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
Writing Hookify Rules
109.4kThis skill should be used when the user asks to "create a hookify rule", "write a hook rule", "configure hookify", "add a hookify rule", or needs guidance on hookify rule syntax and patterns.
review-duplication
100.3kUse this skill during code reviews to proactively investigate the codebase for duplicated functionality, reinvented wheels, or failure to reuse existing project best practices and shared utilities.
