Nq
Cancellable, Efficient and Reliable Distributed Task Queue in Go
Install / Use
/learn @DumbMachine/NqREADME
Reliable, Efficient and Cancellable Distributed Task Queue in Go
NQ ( Nats Queue ) is Go package for queuing and processing jobs in background with workers. Based on nats with a focus on cancel-ability of enqueued jobs.
NQ requires nats-server version that supports both jetstream support and key-value store
How does it work?:
|
|
| :-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: |
| This package was designed such that a task should always be cancellable by client. Workers can be configured to cancel and quit instantly upon network partision ( eg. disconnect from nats-server ). |
Features
<!-- - Retries of failed tasks // todo -->- Multiple task queues
- Deadline and Timeout for tasks
- Tasks can be cancelled via context
- Horizontally scalable workers
- Automatic failover
- Reconnection to nats-server for automatic failover
- Monitoring and Alerting
- CLI
Task Options Walkthrough
Watch for updates
( Introduced in v0.3 )
Listen for updates to task metadata
func main() {
client := nq.NewPublishClient(nq.NatsClientOpt{
Addr: "nats://127.0.0.1:4222",
}, nq.NoAuthentcation(),
)
defer client.Close()
bytesPayload1, err := json.Marshal(UrlPayload{Url: "https://httpstat.us/200?sleep=10000"})
if err != nil {
panic(err)
}
var wg sync.WaitGroup
task1 := nq.NewTask(QueueDev, bytesPayload1)
if ack, err := client.Enqueue(task1); err == nil {
log.Printf("Watching updates queue=%s taskID=%s payload=%s", ack.Queue, ack.ID, ack.Payload)
wg.Add(1)
updates, err := client.GetUpdates(ack.ID)
if err != nil {
panic(err)
}
// listening for updates
go func() {
defer wg.Done()
for {
msg, ok := <-updates
if !ok {
// channel closed
return
}
log.Printf("Change detected, status=%s", msg.GetStatus())
}
}()
} else {
log.Printf("err=%s", err)
}
wg.Wait()
}
2022/08/29 22:17:15 Watching updates queue=scrap-url-dev taskID=yzaKwBIcbGEt8sMGgMJcZ0 payload={"url":"https://httpstat.us/200?sleep=10000"}
2022/08/29 22:17:15 Change detected, status=pending
2022/08/29 22:17:16 Change detected, status=processing
2022/08/29 22:17:28 Change detected, status=completed
Retrying
By default task is submitted for retry, if it returns non-nil error.
// a task that will be retried 2 before being marked as `failed`
taskWithRetry := nq.NewTask("my-queue", bytesPayload, nq.Retry(2))
Custom filtering function for error, to mark task as failed only on specific error.
Here if a task fails due to ErrFailedDueToInvalidApiKeys, it will be consider as failure and will be retried
var ErrFailedDueToInvalidApiKeys = errors.New("failed to perform task, invalid api keys")
srv := nq.NewServer(nq.NatsClientOpt{Addr: nats.DefaultURL}, nq.Config{
IsFailureFn: func(err error) bool {
return errors.Is(err, ErrFailedDueToInvalidApiKeys)
},
ServerName: nq.GenerateServerName(),
})
Deadline / Timeout for tasks
// a task that executes till time.Now() + 1 hour
taskWithDeadline := nq.NewTask("my-queue", bytesPayload, nq.Deadline(time.Now().Add(time.Hour)), nq.TaskID("deadlineTaskID"))
// a task that executes for 10 minutes
taskWithTimeout := nq.NewTask("my-queue", bytesPayload, nq.Timeout(time.Minute * 10), nq.TaskID("timeoutTaskID"))
Task cancellations
Tasks that are either waiting for execution or being executed on any worker, can be cancelled. Cancellation of a task requires it's taskID.
// Cancel a task by ID
taskSignature := nq.NewTask("my-queue", []byte())
ack, err := client.Enqueue(taskSignature);
client.Cancel(ack.ID)
A Task can handle cancel like so:
func longRunningOperation(ctx context.Context, task *nq.TaskPayload) error {
if ctx.Err() != nil {
return ctx.Err()
}
for i := 0; i < 1000; i++ {
timeout := time.Millisecond * 20
println("sleeping for: ",timeout)
time.Sleep(timeout)
if ctx.Err() != nil {
return ctx.Err()
}
}
return nil
}
NOTE: Successful cancellation depends on task function respecting context.Done().
Automatic Failover
ShutdownOnNatsDisconnect option will shutdown workers and server is connection to nats-server is broken. Useful when tasks being cancellable at all times is required.
Note: When disconnect is observed, workers would stop processing new messages. The workers would be cancelled in shutdownTimeout duration. If any tasks is/are not completed after this, they will be cancelled and still be available in task queue for future / other workers to process.
Auto-shutdown of worker server if at any time server is incapable of respecting a cancel request. Eg. losing connection to nats-server
srv := nq.NewServer(nq.NatsClientOpt{
Addr: "nats://127.0.0.1:4222",
}, nq.Config{
ServerName: nq.GenerateServerName(),
Concurrency: 2,
LogLevel: nq.InfoLevel,
}, nq.ShutdownOnNatsDisconnect(),
)
$ go run examples/simple.go sub
nq: pid=24914 2022/08/21 15:43:45.650999 INFO: Registered queue=scrap-url-dev
nq: pid=24914 2022/08/21 15:43:45.652720 INFO: Started Server@DumbmachinePro-local/24914
nq: pid=24914 2022/08/21 15:43:45.652739 INFO: [*] Listening for messages
nq: pid=24914 2022/08/21 15:43:45.652742 INFO: cmd/ctrl + c to terminate the process
nq: pid=24914 2022/08/21 15:43:45.652744 INFO: cmd/ctrl + z to stop processing new tasks
nq: pid=24914 2022/08/21 15:43:48.363110 ERROR: Disconnected from nats
nq: pid=24914 2022/08/21 15:43:48.363173 INFO: Starting graceful shutdown
nq: pid=24914 2022/08/21 15:43:53.363535 INFO: Waiting for all workers to finish...
nq: pid=24914 2022/08/21 15:43:53.363550 INFO: All workers have finished
nq: pid=24914 2022/08/21 15:43:53.363570 INFO: Exiting
Reconnection
Server can configured to not shutdown and instead try to reconnect to nats, when disconnected.
srv := nq.NewServer(nq.NatsClientOpt{
Addr: "nats://127.0.0.1:4222",
ReconnectWait: time.Second * 5, // controls timeout between reconnects
MaxReconnects: 100, // controls total number of reconnects before giving up
}, nq.Config{ServerName: "local-serv-1"})
If nats-server is up again:
-
With previous state ( i.e with expected queue data )
nq: pid=7988 2022/08/22 17:24:44.349815 INFO: Registered queue=scrap-url-dev nq: pid=7988 2022/08/22 17:24:44.356378 INFO: Registered queue=another-one nq: pid=7988 2022/08/22 17:24:44.356393 INFO: Started Server@DumbmachinePro-local/7988 nq: pid=7988 2022/08/22 17:24:44.356444 INFO: [*] Listening for messages nq: pid=7988 2022/08/22 17:24:44.356455 INFO: cmd/ctrl + c to terminate the process nq: pid=7988 2022/08/22 17:24:44.356459 INFO: cmd/ctrl + z to stop processing new tasks disconnected from nats 2022/08/22 22:55:02 reconnection found nats://127.0.0.1:4222 nq: pid=7988 2022/08/22 17:25:02.860051 INFO: Re-registering subscriptions to nats-server nq: pid=7988 2022/08/22 17:25:02.864988 INFO: Registration successful[nats://127.0.0.1:4222] disconnected from nats -
Without previous state If registered queues are not found in nats-server, they will be created
nq: pid=7998 2022/08/22 17:26:44.349815 INFO: Registered queue=scrap-url-dev nq: pid=7998 2022/08/22 17:26:44.356378 INFO: Registered queue=another-one nq: pid=7998 2022/08/22 17:26:44.356393 INFO: Started Server@DumbmachinePro-local/7998 nq: pid=7998 2022/08/22 17:26:44.356444 INFO: [*] Listening for messages nq: pid=7998 2022/08/22 17:26:44.356455 INFO: cmd/ctrl + c to terminate the process nq: pid=7998 2022/08/22 17:26:44.356459 INFO: cmd/ctrl + z to stop processing new tasks disconnected from nats 2022/08/22 22:57:25 reconnection found nats://127.0.0.1:4222 nq: pid=7998 2022/08/22 17:27:25.518079 INFO: Re-registering subscriptions to nats-server nq: pid=7998 2022/08/22 17:27:25.524895 WARN: stream=scrap-url-dev re-registering nq: pid=7998 2022/08/22 17:27:25.542725 INFO: Registered queue=scrap-url-dev nq: pid=7998 2022/08/22 17:27:25.543668 WARN: stream=another-one re-registering nq: pid=7998 2022/08/22 17:27:25.554961 INFO: Registered queue=another-one nq: pid=7998 2022/08/22 17:27:25.555002 INFO: Registration successful[nats://127.0.0.1:4222]
Monitoring and Alerting
Refer [nats monitoring section](https://docs.nats.io/running-a-nats-service/configuration/monitoring
