Asyncjobs
Asynchronous Job Processor
Install / Use
/learn @choria-io/AsyncjobsREADME

Overview
This is an Asynchronous Job Queue system that relies on NATS JetStream for storage and general job life cycle management. It is compatible with any NATS JetStream based system like a private hosted JetStream, Choria Streams or a commercial SaaS.
Each Task is stored in JetStream by a unique ID and Work Queue item is made referencing that Task. JetStream will handle dealing with scheduling, retries, acknowledgements and more of the Work Queue item. The stored Task will be updated during the lifecycle.
Multiple processes can process jobs concurrently, thus job processing is both horizontally and vertically scalable. Job handlers are implemented in Go with one process hosting one or many handlers. Other languages can implement Job Handlers using NATS Request-Reply services. Per process concurrency and overall per-queue concurrency controls exist.
This package heavily inspired by hibiken/asynq.
- Documentation
- Community
- Examples
Status
This is a brand-new project, under heavy development. The core Task handling is in good shape and reasonably stable. Task Scheduler is still subject to some change.
Synopsis
Tasks are published to Work Queues:
// establish a connection to the EMAIL work queue using a NATS context
client, _ := asyncjobs.NewClient(asyncjobs.NatsConn(nc), asyncjobs.BindWorkQueue("EMAIL"))
// create a task with the type 'email:new' and body from newEmail()
task, _ := asyncjobs.NewTask("email:new", newEmail())
// store it in the Work Queue
client.EnqueueTask(ctx, task)
Tasks are processes by horizontally and vertically scalable processes. Typically, a Handler handles one type of Task. We have Prometheus integration, concurrency and backoffs configured.
// establish a connection to the EMAIL work queue using a
// NATS context, with concurrency, prometheus stats and backoff
client, _ := asyncjobs.NewClient(
asyncjobs.NatsContext("EMAIL"),
asyncjobs.BindWorkQueue("EMAIL"),
asyncjobs.ClientConcurrency(10),
asyncjobs.PrometheusListenPort(8080),
asyncjobs.RetryBackoffPolicy(asyncjobs.RetryLinearTenMinutes))
router := asyncjobs.NewTaskRouter()
router.Handler("email:new", func(ctx context.Context, log asyncjobs.Logger, task *asyncjobs.Task) (any, error) {
log.Printf("Processing task %s", task.ID)
// do work here using task.Payload
return "sent", nil
})
client.Run(ctx, router)
See our documentation for a deep dive into the use cases, architecture, abilities and more.
Requirements
NATS 2.8.0 or newer with JetStream enabled.
Features
See the Feature List page for a full feature break down.
Related Skills
node-connect
347.2kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
xurl
347.2kA CLI tool for making authenticated requests to the X (Twitter) API. Use this skill when you need to post tweets, reply, quote, search, read posts, manage followers, send DMs, upload media, or interact with any X API v2 endpoint.
frontend-design
108.0kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
347.2kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
