Bunqueue
⚡ High-performance job queue for Bun. SQLite persistence, DLQ, cron jobs, S3 backups. Built for AI agents and automation
Install / Use
/learn @egeominotti/BunqueueREADME
Quickstart
bun add bunqueue
import { Bunqueue } from 'bunqueue/client';
const app = new Bunqueue('emails', {
embedded: true,
processor: async (job) => {
console.log(`Sending to ${job.data.to}`);
return { sent: true };
},
});
await app.add('send', { to: 'alice@example.com' });
That's it. Queue + Worker in one object. No Redis, no config, no setup.
Simple Mode
Simple Mode gives you a Queue and a Worker in a single object. Add jobs, process them, add middleware, schedule crons — all from one place.
Use Bunqueue when producer and consumer are in the same process. For distributed systems, use Queue + Worker separately. For AI agent workflows, use the MCP Server instead — agents control queues via natural language without writing code.
new Bunqueue('emails', opts)
│
├── this.queue = new Queue('emails', ...)
├── this.worker = new Worker('emails', ...)
│
└── Subsystems (all optional):
├── RetryEngine ── jitter, fibonacci, exponential, custom
├── CircuitBreaker ── pauses worker after N failures
├── BatchAccumulator ── groups N jobs into one call
├── TriggerManager ── "on complete → create job B"
├── TtlChecker ── rejects expired jobs
├── PriorityAger ── boosts old jobs' priority
├── CancellationManager ── AbortController per job
└── DedupDebounceMerger ── deduplication & debounce
Processing pipeline per job:
Job → Circuit Breaker → TTL check → AbortController → Retry → Middleware → Processor
</details>
Routes
Route jobs to different handlers by name:
const app = new Bunqueue('notifications', {
embedded: true,
routes: {
'send-email': async (job) => {
await sendEmail(job.data.to);
return { channel: 'email' };
},
'send-sms': async (job) => {
await sendSMS(job.data.to);
return { channel: 'sms' };
},
},
});
await app.add('send-email', { to: 'alice' });
await app.add('send-sms', { to: 'bob' });
Note: Use one of
processor,routes, orbatch. Passing multiple or none throws an error.
Middleware
Wraps every job execution. Execution order is onion-style: mw1 → mw2 → processor → mw2 → mw1. When no middleware is added, zero overhead.
// Timing middleware
app.use(async (job, next) => {
const start = Date.now();
const result = await next();
console.log(`${job.name}: ${Date.now() - start}ms`);
return result;
});
// Error recovery middleware
app.use(async (job, next) => {
try {
return await next();
} catch (err) {
return { recovered: true, error: err.message };
}
});
Batch Processing
Accumulates N jobs and processes them together. Flushes when buffer reaches size or timeout expires. On close(), remaining jobs are flushed.
const app = new Bunqueue('db-inserts', {
embedded: true,
batch: {
size: 50,
timeout: 2000,
processor: async (jobs) => {
const rows = jobs.map(j => j.data.row);
await db.insertMany('table', rows);
return jobs.map(() => ({ inserted: true }));
},
},
});
Advanced Retry
5 strategies + retry predicate:
const app = new Bunqueue('api-calls', {
embedded: true,
processor: async (job) => {
const res = await fetch(job.data.url);
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return { status: res.status };
},
retry: {
maxAttempts: 5,
delay: 1000,
strategy: 'jitter',
retryIf: (error) => error.message.includes('503'),
},
});
| Strategy | Formula | Use case |
| --- | --- | --- |
| fixed | delay every time | Rate-limited APIs |
| exponential | delay × 2^attempt | General purpose |
| jitter | delay × 2^attempt × random(0.5-1.0) | Thundering herd prevention |
| fibonacci | delay × fib(attempt) | Gradual backoff |
| custom | customBackoff(attempt, error) → ms | Anything |
This is in-process retry — the job stays active. Different from core
attempts/backoffwhich re-queues.
Graceful Cancellation
Cancel running jobs with AbortController:
const app = new Bunqueue('encoding', {
embedded: true,
processor: async (job) => {
const signal = app.getSignal(job.id);
for (const chunk of chunks) {
if (signal?.aborted) throw new Error('Cancelled');
await encode(chunk);
}
return { done: true };
},
});
const job = await app.add('video', { file: 'big.mp4' });
app.cancel(job.id); // cancel immediately
app.cancel(job.id, 5000); // cancel after 5s grace period
The signal works with fetch too: await fetch(url, { signal }).
Circuit Breaker
Pauses the worker after too many consecutive failures:
CLOSED ──→ failures ≥ threshold ──→ OPEN (worker paused)
│
←── success ──── HALF-OPEN ←── timeout expires
const app = new Bunqueue('payments', {
embedded: true,
processor: async (job) => paymentGateway.charge(job.data),
circuitBreaker: {
threshold: 5,
resetTimeout: 30000,
onOpen: () => alert('Gateway down!'),
onClose: () => alert('Gateway recovered'),
},
});
app.getCircuitState(); // 'closed' | 'open' | 'half-open'
app.resetCircuit(); // force close + resume worker
Event Triggers
Create follow-up jobs automatically when a job completes or fails:
const app = new Bunqueue('orders', {
embedded: true,
routes: {
'place-order': async (job) => ({ orderId: job.data.id, total: 99 }),
'send-receipt': async (job) => ({ sent: true }),
'fraud-alert': async (job) => ({ alerted: true }),
},
});
app.trigger({
on: 'place-order',
create: 'send-receipt',
data: (result, job) => ({ id: job.data.id }),
});
// Conditional trigger (only for large orders)
app.trigger({
on: 'place-order',
create: 'fraud-alert',
data: (result) => ({ amount: result.total }),
condition: (result) => result.total > 1000,
});
// Chain triggers
app
.trigger({ on: 'step-1', create: 'step-2', data: (r) => r })
.trigger({ on: 'step-2', create: 'step-3', data: (r) => r });
Job TTL
Expire unprocessed jobs. Checked when the worker picks up the job:
const app = new Bunqueue('otp', {
embedded: true,
processor: async (job) => verifyOTP(job.data.code),
ttl: {
defaultTtl: 300000,
perName: {
'verify-otp': 60000,
'daily-report': 0,
},
},
});
app.setDefaultTtl(120000);
app.setNameTtl('flash-sale', 30000);
Resolution: perName[job.name] → defaultTtl → 0 (no TTL).
Priority Aging
Automatically boosts priority of old waiting jobs to prevent starvation:
const app = new Bunqueue('tasks', {
embedded: true,
processor: async (job) => ({ done: true }),
priorityAging: {
interval: 60000,
minAge: 300000,
boost: 2,
maxPriority: 100,
maxScan: 200,
},
});
A job with priority 1, after 5 min: 3, after 10 min: 5, … capped at 100.
Deduplication
Prevent duplicate jobs. Jobs with the same name + data get the same dedup ID:
const app = new Bunqueue('webhooks', {
embedded: true,
processor: async (job) => processWebhook(job.data),
deduplication: {
ttl: 60000,
extend: false,
replace: false,
},
});
await app.add('hook', { event: 'user.created', userId: '123' });
await app.add('hook', { event: 'user.created', userId: '123' }); // deduplicated!
await app.add('hook', { event: 'user.updated', userId: '123' }); // different data → new job
Override per-job: await app.add('task', data, { deduplication: { id: 'my-id', ttl: 5000 } }).
Debouncing
Coalesce rapid same-name jobs. Only the last one in the TTL window gets processed:
const app = new Bunqueue('search', {
embedded: true,
processor: async (job) => executeSearch(job.data.query),
debounce: { ttl: 500 },
});
await app.add('search', { query: 'h' });
await app.add('search', { query: 'he' });
await app.add('search', { query: 'hello' }); // only this one processes
Rate Limiting
const app = new Bunqueue('api', {
embedded: true,
processor: async (job) => callExternalAPI(job.data),
rateLimit: { max: 100, duration: 1000 },
});
// Per-group rate limiting (e.g., per customer)
const app2 = new Bunqueue('api', {
embedded: true,
processor: async (job) => callAPI(job.data),
rateLimit: { max: 10, duration: 1000, group
