Queue
Create task queues, add and take jobs, monitor failed tasks
Install / Use
/learn @tarantool/QueueREADME
[![fast_testing][testing-actions-badge]][testing-actions-url] [![packaging][packaging-actions-badge]][packaging-actions-url] [![publish][publish-actions-badge]][publish-actions-url]
A collection of persistent queue implementations for Tarantool
Table of contents
- Queue types
- The underlying spaces
- Task state diagram
- Queue state diagram
- Installing
- Using the queue module
- Initialization
- Get the module version
- Creating a new queue
- Set queue settings
- Session identify
- Putting a task in a queue
- Taking a task from the queue ("consuming")
- Acknowledging the completion of a task
- Releasing a task
- Peeking at a task
- Burying a task
- Kicking a number of tasks
- Deleting a task
- Dropping a queue
- Releasing all taken tasks
- Getting statistics
- Queue and replication
- Implementation details
Queue types
fifo - a simple queue
Features:
- If there is only one consumer, tasks are scheduled in strict FIFO order.
- If there are many concurrent consumers, FIFO order is preserved on average, but is less strict: concurrent consumers may complete tasks in a different order.
The following options can be specified when creating a fifo queue:
temporary- boolean - if true, the contents do not persist on disk (the queue is in-memory only)if_not_exists- boolean - if true, no error will be returned if the tube already existson_task_change- function name - a callback to be executed on every operation; the expected function syntax isfunction(task, stats_data), wherestats_datais the operation type, andtaskis normalized task data. NOTE: It's better to use:on_task_change()function.
fifo queue does not support:
- task priorities (
pri) - task time to live (
ttl) - task time to execute (
ttr) - delayed execution (
delay)
Example:
-- add a log record on task completion
local function otc_cb(task, stats_data)
if stats_data == 'delete' then
log.info("task %s is done", task[1])
end
end
queue.create_tube('tube_name', 'fifo', {temporary = true, on_task_change = otc_cb})
queue.tube.tube_name:put('my_task_data_1')
queue.tube.tube_name:put('my_task_data_2')
In the example above, the otc_cb function will be called 2 times, on each task
completion. Values for the callback arguments will be taken from the queue.
fifottl - a simple priority queue with support for task time to live
The following options can be specified when creating a fifottl queue:
temporary- boolean - if true, the contents of the queue do not persist on diskif_not_exists- boolean - if true, no error will be returned if the tube already existson_task_change- function name - a callback to be executed on every operation
The following options can be specified when putting a task in a fifottl queue:
pri- task priority (0is the highest priority and is the default)ttl- numeric - time to live for a task put into the queue, in seconds. ifttlis not specified, it is set to infinity (if a task exists in a queue for longer than ttl seconds, it is removed)ttr- numeric - time allotted to the worker to work on a task, in seconds; ifttris not specified, it is set to the same asttl(if a task is being worked on for more thanttrseconds, its status is changed to 'ready' so another worker may take it)delay- time to wait before starting to execute the task, in seconds
Example:
queue.create_tube('tube_name', 'fifottl', {temporary = true})
queue.tube.tube_name:put('my_task_data', { ttl = 60.1, delay = 80 })
In the example above, the task has 60.1 seconds to live, but the start of execution is delayed for 80 seconds. Thus the task actually will exist for up to (60.1 + 80) 140.1 seconds.
A smaller priority value indicates a higher priority, so a task with priority 1 will be executed after a task with priority 0, if all other options are equal.
limfifottl - a simple size-limited priority queue with support for task time to live
Works same as fifottl, but has limitied size and put operation timeout.
The following options can be specified when creating a fifottl queue:
temporary- boolean - if true, the contents of the queue do not persist on diskif_not_exists- boolean - if true, no error will be returned if the tube already existson_task_change- function name - a callback to be executed on every operationcapacity- number - limit size of the queue
The following options can be specified when putting a task in a fifottl queue:
pri- task priority (0is the highest priority and is the default)ttl- numeric - time to live for a task put into the queue, in seconds. ifttlis not specified, it is set to infinity (if a task exists in a queue for longer than ttl seconds, it is removed)ttr- numeric - time allotted to the worker to work on a task, in seconds; ifttris not specified, it is set to the same asttl(if a task is being worked on for more thanttrseconds, its status is changed to 'ready' so another worker may take it)delay- time to wait before starting to execute the task, in secondstimeout- numeric - seconds to wait until queue has free space; iftimeoutis not specified or time is up, and queue has no space, method return Nil
utube - a queue with sub-queues inside
The main idea of this queue backend is the same as in a fifo queue:
the tasks are executed in FIFO order.
However, tasks may be grouped into sub-queues.
It is advised not to use utube methods inside transactions with
read-confirmed isolation level. It can lead to errors when trying to make
parallel tube methods calls with mvcc enabled.
The following options can be specified when creating a utube queue:
-
temporary- boolean - if true, the contents of the queue do not persist on disk -
if_not_exists- boolean - if true, no error will be returned if the tube already exists -
on_task_change- function name - a callback to be executed on every operation -
storage_mode- string - one ofqueue.driver.utube.STORAGE_MODE_DEFAULT("default") - default implementation ofutubequeue.driver.utube.STORAGE_MODE_READY_BUFFER("ready_buffer") - allows processingtakerequests faster, but by the cost ofputoperations speed. Right now this option is supported only formemtxengine. WARNING: this is an experimental storage mode.
Here is a benchmark comparison of these two modes:
-
Benchmark for simple
putandtakemethods. 30k utubes are created with a single task each. Task creation time is calculated. After that 30k consumers are callingtake+ack, each in the separate fiber. Time to ack all tasks is calculated. The results are as follows:| | put (30k) | take+ack | |---------|-----------|----------| | default | 180ms | 1.6s | | ready | 270ms | 1.7s |
-
Benchmark for the busy utubes. 10 tubes are created. Each contains 1000 tasks. After that, 10 consumers are created (each works on his tube only, one tube — one consumer). Each consumer will
take, thenyieldand thenackevery task from their utube (1000 tasks each). After that, we can also run this benchmark with 10k tasks on each utube, 100k tasks and 150k tasks. But all that with 10 utubes and 10 consumers. The results are as follows:| | 1k | 10k | 50k | 150k | |---------|-------|------|------|-------| | default | 53s | 1.5h | 100h | 1000h | | ready | 450ms | 4.7s | 26s | 72s |
The following options can be specified when putting a task in a utube
queue:
utube- the name of the sub-queue. Sub-queues split the task stream according to the sub-queue name: it is not possible to take two tasks out of a sub-queue concurrently, each sub-queue is executed in strict FIFO order, one task at a time.
utube queue does not support:
- task priorities (
pri) - task time to live (
ttl) - task time to execute (
ttr) - delayed execution (
delay)
Example:
Imagine a web crawler, fetching pages from the Internet and finding URLs to fetch more pages. The web crawler is based on a queue, and ea
