SkillAgentSearch skills...

Bunqueue

⚡ High-performance job queue for Bun. SQLite persistence, DLQ, cron jobs, S3 backups. Built for AI agents and automation

Install / Use

/learn @egeominotti/Bunqueue

README

<p align="center"> <a href="https://bunqueue.dev/"> <img src=".github/banner.svg" alt="bunqueue" width="400" /> </a> </p> <p align="center"> <a href="https://www.npmjs.com/package/bunqueue"><img src="https://img.shields.io/npm/v/bunqueue?style=flat-square" alt="npm version"></a> <a href="https://www.npmjs.com/package/bunqueue"><img src="https://img.shields.io/npm/dm/bunqueue?style=flat-square" alt="npm downloads"></a> <a href="https://github.com/egeominotti/bunqueue/actions"><img src="https://img.shields.io/github/actions/workflow/status/egeominotti/bunqueue/ci.yml?style=flat-square&label=CI" alt="CI"></a> <a href="https://github.com/egeominotti/bunqueue/stargazers"><img src="https://img.shields.io/github/stars/egeominotti/bunqueue?style=flat-square" alt="GitHub Stars"></a> <a href="https://github.com/egeominotti/bunqueue/blob/main/LICENSE"><img src="https://img.shields.io/github/license/egeominotti/bunqueue?style=flat-square" alt="License"></a> </p> <p align="center"> High-performance job queue for Bun. Built for AI agents and automation.<br/> Zero external dependencies. MCP-native. TypeScript-first. </p> <p align="center"> <a href="https://bunqueue.dev/">Documentation</a> &middot; <a href="https://bunqueue.dev/guide/benchmarks/">Benchmarks</a> &middot; <a href="https://www.npmjs.com/package/bunqueue">npm</a> </p>

Quickstart

bun add bunqueue
import { Bunqueue } from 'bunqueue/client';

const app = new Bunqueue('emails', {
  embedded: true,
  processor: async (job) => {
    console.log(`Sending to ${job.data.to}`);
    return { sent: true };
  },
});

await app.add('send', { to: 'alice@example.com' });

That's it. Queue + Worker in one object. No Redis, no config, no setup.


Simple Mode

Simple Mode gives you a Queue and a Worker in a single object. Add jobs, process them, add middleware, schedule crons — all from one place.

Use Bunqueue when producer and consumer are in the same process. For distributed systems, use Queue + Worker separately. For AI agent workflows, use the MCP Server instead — agents control queues via natural language without writing code.

<details> <summary><b>Architecture</b></summary>
new Bunqueue('emails', opts)
    │
    ├── this.queue  = new Queue('emails', ...)
    ├── this.worker = new Worker('emails', ...)
    │
    └── Subsystems (all optional):
        ├── RetryEngine         ── jitter, fibonacci, exponential, custom
        ├── CircuitBreaker      ── pauses worker after N failures
        ├── BatchAccumulator    ── groups N jobs into one call
        ├── TriggerManager      ── "on complete → create job B"
        ├── TtlChecker          ── rejects expired jobs
        ├── PriorityAger        ── boosts old jobs' priority
        ├── CancellationManager ── AbortController per job
        └── DedupDebounceMerger ── deduplication & debounce

Processing pipeline per job:

Job → Circuit Breaker → TTL check → AbortController → Retry → Middleware → Processor
</details>

Routes

Route jobs to different handlers by name:

const app = new Bunqueue('notifications', {
  embedded: true,
  routes: {
    'send-email': async (job) => {
      await sendEmail(job.data.to);
      return { channel: 'email' };
    },
    'send-sms': async (job) => {
      await sendSMS(job.data.to);
      return { channel: 'sms' };
    },
  },
});

await app.add('send-email', { to: 'alice' });
await app.add('send-sms', { to: 'bob' });

Note: Use one of processor, routes, or batch. Passing multiple or none throws an error.

Middleware

Wraps every job execution. Execution order is onion-style: mw1 → mw2 → processor → mw2 → mw1. When no middleware is added, zero overhead.

// Timing middleware
app.use(async (job, next) => {
  const start = Date.now();
  const result = await next();
  console.log(`${job.name}: ${Date.now() - start}ms`);
  return result;
});

// Error recovery middleware
app.use(async (job, next) => {
  try {
    return await next();
  } catch (err) {
    return { recovered: true, error: err.message };
  }
});

Batch Processing

Accumulates N jobs and processes them together. Flushes when buffer reaches size or timeout expires. On close(), remaining jobs are flushed.

const app = new Bunqueue('db-inserts', {
  embedded: true,
  batch: {
    size: 50,
    timeout: 2000,
    processor: async (jobs) => {
      const rows = jobs.map(j => j.data.row);
      await db.insertMany('table', rows);
      return jobs.map(() => ({ inserted: true }));
    },
  },
});

Advanced Retry

5 strategies + retry predicate:

const app = new Bunqueue('api-calls', {
  embedded: true,
  processor: async (job) => {
    const res = await fetch(job.data.url);
    if (!res.ok) throw new Error(`HTTP ${res.status}`);
    return { status: res.status };
  },
  retry: {
    maxAttempts: 5,
    delay: 1000,
    strategy: 'jitter',
    retryIf: (error) => error.message.includes('503'),
  },
});

| Strategy | Formula | Use case | | --- | --- | --- | | fixed | delay every time | Rate-limited APIs | | exponential | delay × 2^attempt | General purpose | | jitter | delay × 2^attempt × random(0.5-1.0) | Thundering herd prevention | | fibonacci | delay × fib(attempt) | Gradual backoff | | custom | customBackoff(attempt, error) → ms | Anything |

This is in-process retry — the job stays active. Different from core attempts/backoff which re-queues.

Graceful Cancellation

Cancel running jobs with AbortController:

const app = new Bunqueue('encoding', {
  embedded: true,
  processor: async (job) => {
    const signal = app.getSignal(job.id);
    for (const chunk of chunks) {
      if (signal?.aborted) throw new Error('Cancelled');
      await encode(chunk);
    }
    return { done: true };
  },
});

const job = await app.add('video', { file: 'big.mp4' });
app.cancel(job.id);        // cancel immediately
app.cancel(job.id, 5000);  // cancel after 5s grace period

The signal works with fetch too: await fetch(url, { signal }).

Circuit Breaker

Pauses the worker after too many consecutive failures:

CLOSED ──→ failures ≥ threshold ──→ OPEN (worker paused)
                                       │
              ←── success ──── HALF-OPEN ←── timeout expires
const app = new Bunqueue('payments', {
  embedded: true,
  processor: async (job) => paymentGateway.charge(job.data),
  circuitBreaker: {
    threshold: 5,
    resetTimeout: 30000,
    onOpen: () => alert('Gateway down!'),
    onClose: () => alert('Gateway recovered'),
  },
});

app.getCircuitState();  // 'closed' | 'open' | 'half-open'
app.resetCircuit();     // force close + resume worker

Event Triggers

Create follow-up jobs automatically when a job completes or fails:

const app = new Bunqueue('orders', {
  embedded: true,
  routes: {
    'place-order': async (job) => ({ orderId: job.data.id, total: 99 }),
    'send-receipt': async (job) => ({ sent: true }),
    'fraud-alert': async (job) => ({ alerted: true }),
  },
});

app.trigger({
  on: 'place-order',
  create: 'send-receipt',
  data: (result, job) => ({ id: job.data.id }),
});

// Conditional trigger (only for large orders)
app.trigger({
  on: 'place-order',
  create: 'fraud-alert',
  data: (result) => ({ amount: result.total }),
  condition: (result) => result.total > 1000,
});

// Chain triggers
app
  .trigger({ on: 'step-1', create: 'step-2', data: (r) => r })
  .trigger({ on: 'step-2', create: 'step-3', data: (r) => r });

Job TTL

Expire unprocessed jobs. Checked when the worker picks up the job:

const app = new Bunqueue('otp', {
  embedded: true,
  processor: async (job) => verifyOTP(job.data.code),
  ttl: {
    defaultTtl: 300000,
    perName: {
      'verify-otp': 60000,
      'daily-report': 0,
    },
  },
});

app.setDefaultTtl(120000);
app.setNameTtl('flash-sale', 30000);

Resolution: perName[job.name]defaultTtl0 (no TTL).

Priority Aging

Automatically boosts priority of old waiting jobs to prevent starvation:

const app = new Bunqueue('tasks', {
  embedded: true,
  processor: async (job) => ({ done: true }),
  priorityAging: {
    interval: 60000,
    minAge: 300000,
    boost: 2,
    maxPriority: 100,
    maxScan: 200,
  },
});

A job with priority 1, after 5 min: 3, after 10 min: 5, … capped at 100.

Deduplication

Prevent duplicate jobs. Jobs with the same name + data get the same dedup ID:

const app = new Bunqueue('webhooks', {
  embedded: true,
  processor: async (job) => processWebhook(job.data),
  deduplication: {
    ttl: 60000,
    extend: false,
    replace: false,
  },
});

await app.add('hook', { event: 'user.created', userId: '123' });
await app.add('hook', { event: 'user.created', userId: '123' }); // deduplicated!
await app.add('hook', { event: 'user.updated', userId: '123' }); // different data → new job

Override per-job: await app.add('task', data, { deduplication: { id: 'my-id', ttl: 5000 } }).

Debouncing

Coalesce rapid same-name jobs. Only the last one in the TTL window gets processed:

const app = new Bunqueue('search', {
  embedded: true,
  processor: async (job) => executeSearch(job.data.query),
  debounce: { ttl: 500 },
});

await app.add('search', { query: 'h' });
await app.add('search', { query: 'he' });
await app.add('search', { query: 'hello' });  // only this one processes

Rate Limiting

const app = new Bunqueue('api', {
  embedded: true,
  processor: async (job) => callExternalAPI(job.data),
  rateLimit: { max: 100, duration: 1000 },
});

// Per-group rate limiting (e.g., per customer)
const app2 = new Bunqueue('api', {
  embedded: true,
  processor: async (job) => callAPI(job.data),
  rateLimit: { max: 10, duration: 1000, group
View on GitHub
GitHub Stars393
CategoryData
Updated13h ago
Forks11

Languages

TypeScript

Security Score

100/100

Audited on Apr 5, 2026

No findings