SkillAgentSearch skills...

Tasktiger

Python task queue using Redis

Install / Use

/learn @closeio/Tasktiger
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

========= TaskTiger

.. image:: https://github.com/closeio/tasktiger/actions/workflows/test.yaml/badge.svg?event=push :target: https://github.com/closeio/tasktiger/actions/workflows/test.yaml

TaskTiger is a Python task queue using Redis.

(Interested in working on projects like this? Close_ is looking for great engineers_ to join our team)

.. _Close: http://close.com .. _great engineers: http://jobs.close.com

.. contents:: Contents

Features

  • Per-task fork or synchronous worker

    By default, TaskTiger forks a subprocess for each task, This comes with several benefits: Memory leaks caused by tasks are avoided since the subprocess is terminated when the task is finished. A hard time limit can be set for each task, after which the task is killed if it hasn't completed. To ensure performance, any necessary Python modules can be preloaded in the parent process.

    TaskTiger also supports synchronous workers, which allows for better performance due to no forking overhead, and tasks have the ability to reuse network connections. To prevent memory leaks from accumulating, workers can be set to shutdown after a certain amount of time, at which point a supervisor can restart them. Workers also automatically exit on on hard timeouts to prevent an inconsistent process state.

  • Unique queues

    TaskTiger has the option to avoid duplicate tasks in the task queue. In some cases it is desirable to combine multiple similar tasks. For example, imagine a task that indexes objects (e.g. to make them searchable). If an object is already present in the task queue and hasn't been processed yet, a unique queue will ensure that the indexing task doesn't have to do duplicate work. However, if the task is already running while it's queued, the task will be executed another time to ensure that the indexing task always picks up the latest state.

  • Task locks

    TaskTiger can ensure to never execute more than one instance of tasks with similar arguments by acquiring a lock. If a task hits a lock, it is requeued and scheduled for later executions after a configurable interval.

  • Task retrying

    TaskTiger lets you retry exceptions (all exceptions or a list of specific ones) and comes with configurable retry intervals (fixed, linear, exponential, custom).

  • Flexible queues

    Tasks can be easily queued in separate queues. Workers pick tasks from a randomly chosen queue and can be configured to only process specific queues, ensuring that all queues are processed equally. TaskTiger also supports subqueues which are separated by a period. For example, you can have per-customer queues in the form process_emails.CUSTOMER_ID and start a worker to process process_emails and any of its subqueues. Since tasks are picked from a random queue, all customers get equal treatment: If one customer is queueing many tasks it can't block other customers' tasks from being processed. A maximum queue size can also be enforced.

  • Batch queues

    Batch queues can be used to combine multiple queued tasks into one. That way, your task function can process multiple sets of arguments at the same time, which can improve performance. The batch size is configurable.

  • Scheduled and periodic tasks

    Tasks can be scheduled for execution at a specific time. Tasks can also be executed periodically (e.g. every five seconds).

  • Structured logging

    TaskTiger supports JSON-style logging via structlog, allowing more flexibility for tools to analyze the log. For example, you can use TaskTiger together with Logstash, Elasticsearch, and Kibana.

    The structlog processor tasktiger.logging.tasktiger_processor can be used to inject the current task id into all log messages.

  • Reliability

    TaskTiger atomically moves tasks between queue states, and will re-execute tasks after a timeout if a worker crashes.

  • Error handling

    If an exception occurs during task execution and the task is not set up to be retried, TaskTiger stores the execution tracebacks in an error queue. The task can then be retried or deleted manually. TaskTiger can be easily integrated with error reporting services like Rollbar.

  • Admin interface

    A simple admin interface using flask-admin exists as a separate project (tasktiger-admin_).

.. _tasktiger-admin: https://github.com/closeio/tasktiger-admin

Quick start

It is easy to get started with TaskTiger.

Create a file that contains the task(s).

.. code:: python

tasks.py

def my_task(): print('Hello')

Queue the task using the delay method.

.. code:: python

In [1]: import tasktiger, tasks In [2]: tiger = tasktiger.TaskTiger() In [3]: tiger.delay(tasks.my_task)

Run a worker (make sure the task code can be found, e.g. using PYTHONPATH).

.. code:: bash

% PYTHONPATH=. tasktiger {"timestamp": "2015-08-27T21:00:09.135344Z", "queues": null, "pid": 69840, "event": "ready", "level": "info"} {"task_id": "6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79", "level": "debug", "timestamp": "2015-08-27T21:03:56.727051Z", "pid": 69840, "queue": "default", "child_pid": 70171, "event": "processing"} Hello {"task_id": "6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79", "level": "debug", "timestamp": "2015-08-27T21:03:56.732457Z", "pid": 69840, "queue": "default", "event": "done"}

Configuration

A TaskTiger object keeps track of TaskTiger's settings and is used to decorate and queue tasks. The constructor takes the following arguments:

  • connection

    Redis connection object. The connection should be initialized with decode_responses=True to avoid encoding problems on Python 3.

  • config

    Dict with config options. Most configuration options don't need to be changed, and a full list can be seen within TaskTiger's __init__ method.

    Here are a few commonly used options:

    • ALWAYS_EAGER

      If set to True, all tasks except future tasks (when is a future time) will be executed locally by blocking until the task returns. This is useful for testing purposes.

    • BATCH_QUEUES

      Set up queues that will be processed in batch, i.e. multiple jobs are taken out of the queue at the same time and passed as a list to the worker method. Takes a dict where the key represents the queue name and the value represents the batch size. Note that the task needs to be declared as batch=True. Also note that any subqueues will be automatically treated as batch queues, and the batch value of the most specific subqueue name takes precedence.

    • ONLY_QUEUES

      If set to a non-empty list of queue names, a worker only processes the given queues (and their subqueues), unless explicit queues are passed to the command line.

  • setup_structlog

    If set to True, sets up structured logging using structlog when initializing TaskTiger. This makes writing custom worker scripts easier since it doesn't require the user to set up structlog in advance.

Example:

.. code:: python

import tasktiger from redis import Redis conn = Redis(db=1, decode_responses=True) tiger = tasktiger.TaskTiger(connection=conn, config={ 'BATCH_QUEUES': { # Batch up to 50 tasks that are queued in the my_batch_queue or any # of its subqueues, except for the send_email subqueue which only # processes up to 10 tasks at a time. 'my_batch_queue': 50, 'my_batch_queue.send_email': 10, }, })

Task decorator

TaskTiger provides a task decorator to specify task options. Note that simple tasks don't need to be decorated. However, decorating the task allows you to use an alternative syntax to queue the task, which is compatible with Celery:

.. code:: python

tasks.py

import tasktiger tiger = tasktiger.TaskTiger()

@tiger.task() def my_task(name, n=None): print('Hello', name)

.. code:: python

In [1]: import tasks

The following are equivalent. However, the second syntax can only be used

if the task is decorated.

In [2]: tasks.tiger.delay(my_task, args=('John',), kwargs={'n': 1}) In [3]: tasks.my_task.delay('John', n=1)

Task options

Tasks support a variety of options that can be specified either in the task decorator, or when queueing a task. For the latter, the delay method must be called on the TaskTiger object, and any options in the task decorator are overridden.

.. code:: python

@tiger.task(queue='myqueue', unique=True) def my_task(): print('Hello')

.. code:: python

The task will be queued in "otherqueue", even though the task decorator

says "myqueue".

tiger.delay(my_task, queue='otherqueue')

When queueing a task, the task needs to be defined in a module other than the Python file which is being executed. In other words, the task can't be in the __main__ module. TaskTiger will give you back an error otherwise.

The following options are supported by both delay and the task decorator:

  • queue

    Name of the queue where the task will be queued.

  • hard_timeout

    If the task runs longer than the given number of seconds, it will be killed and marked as failed.

  • unique

    Boolean to indicate whether the task will only be queued if there is no similar task with the same function, arguments, and keyword arguments in the queue. Note that multiple similar tasks may still be executed at the same time since the task will still be inserted into the queue if another one is being processed. Requeueing an already scheduled unique task will not change the time it was originally scheduled to execute at.

  • unique_key

    If set, this implies unique=True and specifies the list of kwargs to use to construct the unique key. By default, all args and kwargs are serialized and hashed.

  • lock

    Boo

View on GitHub
GitHub Stars1.5k
CategoryDevelopment
Updated2d ago
Forks93

Languages

Python

Security Score

100/100

Audited on Mar 30, 2026

No findings