SkillAgentSearch skills...

Queue

Create task queues, add and take jobs, monitor failed tasks

Install / Use

/learn @tarantool/Queue
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

<a href="http://tarantool.org"> <img src="https://avatars2.githubusercontent.com/u/2344919?v=2&s=250" align="right"> </a>

[![fast_testing][testing-actions-badge]][testing-actions-url] [![packaging][packaging-actions-badge]][packaging-actions-url] [![publish][publish-actions-badge]][publish-actions-url]

A collection of persistent queue implementations for Tarantool

Table of contents

Queue types

fifo - a simple queue

Features:

  • If there is only one consumer, tasks are scheduled in strict FIFO order.
  • If there are many concurrent consumers, FIFO order is preserved on average, but is less strict: concurrent consumers may complete tasks in a different order.

The following options can be specified when creating a fifo queue:

  • temporary - boolean - if true, the contents do not persist on disk (the queue is in-memory only)
  • if_not_exists - boolean - if true, no error will be returned if the tube already exists
  • on_task_change - function name - a callback to be executed on every operation; the expected function syntax is function(task, stats_data), where stats_data is the operation type, and task is normalized task data. NOTE: It's better to use :on_task_change() function.

fifo queue does not support:

  • task priorities (pri)
  • task time to live (ttl)
  • task time to execute (ttr)
  • delayed execution (delay)

Example:


-- add a log record on task completion
local function otc_cb(task, stats_data)
    if stats_data == 'delete' then
        log.info("task %s is done", task[1])
    end
end

queue.create_tube('tube_name', 'fifo', {temporary = true, on_task_change = otc_cb})
queue.tube.tube_name:put('my_task_data_1')
queue.tube.tube_name:put('my_task_data_2')

In the example above, the otc_cb function will be called 2 times, on each task completion. Values for the callback arguments will be taken from the queue.

fifottl - a simple priority queue with support for task time to live

The following options can be specified when creating a fifottl queue:

  • temporary - boolean - if true, the contents of the queue do not persist on disk
  • if_not_exists - boolean - if true, no error will be returned if the tube already exists
  • on_task_change - function name - a callback to be executed on every operation

The following options can be specified when putting a task in a fifottl queue:

  • pri - task priority (0 is the highest priority and is the default)
  • ttl - numeric - time to live for a task put into the queue, in seconds. if ttl is not specified, it is set to infinity (if a task exists in a queue for longer than ttl seconds, it is removed)
  • ttr - numeric - time allotted to the worker to work on a task, in seconds; if ttr is not specified, it is set to the same as ttl (if a task is being worked on for more than ttr seconds, its status is changed to 'ready' so another worker may take it)
  • delay - time to wait before starting to execute the task, in seconds

Example:


queue.create_tube('tube_name', 'fifottl', {temporary = true})
queue.tube.tube_name:put('my_task_data', { ttl = 60.1, delay = 80 })

In the example above, the task has 60.1 seconds to live, but the start of execution is delayed for 80 seconds. Thus the task actually will exist for up to (60.1 + 80) 140.1 seconds.

A smaller priority value indicates a higher priority, so a task with priority 1 will be executed after a task with priority 0, if all other options are equal.

limfifottl - a simple size-limited priority queue with support for task time to live

Works same as fifottl, but has limitied size and put operation timeout.

The following options can be specified when creating a fifottl queue:

  • temporary - boolean - if true, the contents of the queue do not persist on disk
  • if_not_exists - boolean - if true, no error will be returned if the tube already exists
  • on_task_change - function name - a callback to be executed on every operation
  • capacity - number - limit size of the queue

The following options can be specified when putting a task in a fifottl queue:

  • pri - task priority (0 is the highest priority and is the default)
  • ttl - numeric - time to live for a task put into the queue, in seconds. if ttl is not specified, it is set to infinity (if a task exists in a queue for longer than ttl seconds, it is removed)
  • ttr - numeric - time allotted to the worker to work on a task, in seconds; if ttr is not specified, it is set to the same as ttl (if a task is being worked on for more than ttr seconds, its status is changed to 'ready' so another worker may take it)
  • delay - time to wait before starting to execute the task, in seconds
  • timeout - numeric - seconds to wait until queue has free space; if timeout is not specified or time is up, and queue has no space, method return Nil

utube - a queue with sub-queues inside

The main idea of this queue backend is the same as in a fifo queue: the tasks are executed in FIFO order. However, tasks may be grouped into sub-queues.

It is advised not to use utube methods inside transactions with read-confirmed isolation level. It can lead to errors when trying to make parallel tube methods calls with mvcc enabled.

The following options can be specified when creating a utube queue:

  • temporary - boolean - if true, the contents of the queue do not persist on disk

  • if_not_exists - boolean - if true, no error will be returned if the tube already exists

  • on_task_change - function name - a callback to be executed on every operation

  • storage_mode - string - one of

    • queue.driver.utube.STORAGE_MODE_DEFAULT ("default") - default implementation of utube
    • queue.driver.utube.STORAGE_MODE_READY_BUFFER ("ready_buffer") - allows processing take requests faster, but by the cost of put operations speed. Right now this option is supported only for memtx engine. WARNING: this is an experimental storage mode.

    Here is a benchmark comparison of these two modes:

    • Benchmark for simple put and take methods. 30k utubes are created with a single task each. Task creation time is calculated. After that 30k consumers are calling take + ack, each in the separate fiber. Time to ack all tasks is calculated. The results are as follows:

      | | put (30k) | take+ack | |---------|-----------|----------| | default | 180ms | 1.6s | | ready | 270ms | 1.7s |

    • Benchmark for the busy utubes. 10 tubes are created. Each contains 1000 tasks. After that, 10 consumers are created (each works on his tube only, one tube — one consumer). Each consumer will take, then yield and then ack every task from their utube (1000 tasks each). After that, we can also run this benchmark with 10k tasks on each utube, 100k tasks and 150k tasks. But all that with 10 utubes and 10 consumers. The results are as follows:

      | | 1k | 10k | 50k | 150k | |---------|-------|------|------|-------| | default | 53s | 1.5h | 100h | 1000h | | ready | 450ms | 4.7s | 26s | 72s |

The following options can be specified when putting a task in a utube queue:

  • utube - the name of the sub-queue. Sub-queues split the task stream according to the sub-queue name: it is not possible to take two tasks out of a sub-queue concurrently, each sub-queue is executed in strict FIFO order, one task at a time.

utube queue does not support:

  • task priorities (pri)
  • task time to live (ttl)
  • task time to execute (ttr)
  • delayed execution (delay)

Example:

Imagine a web crawler, fetching pages from the Internet and finding URLs to fetch more pages. The web crawler is based on a queue, and ea

View on GitHub
GitHub Stars244
CategoryDevelopment
Updated7mo ago
Forks56

Languages

Lua

Security Score

77/100

Audited on Aug 5, 2025

No findings