Tasktiger
Python task queue using Redis
Install / Use
/learn @closeio/TasktigerREADME
========= TaskTiger
.. image:: https://github.com/closeio/tasktiger/actions/workflows/test.yaml/badge.svg?event=push :target: https://github.com/closeio/tasktiger/actions/workflows/test.yaml
TaskTiger is a Python task queue using Redis.
(Interested in working on projects like this? Close_ is looking for great engineers_ to join our team)
.. _Close: http://close.com .. _great engineers: http://jobs.close.com
.. contents:: Contents
Features
-
Per-task fork or synchronous worker
By default, TaskTiger forks a subprocess for each task, This comes with several benefits: Memory leaks caused by tasks are avoided since the subprocess is terminated when the task is finished. A hard time limit can be set for each task, after which the task is killed if it hasn't completed. To ensure performance, any necessary Python modules can be preloaded in the parent process.
TaskTiger also supports synchronous workers, which allows for better performance due to no forking overhead, and tasks have the ability to reuse network connections. To prevent memory leaks from accumulating, workers can be set to shutdown after a certain amount of time, at which point a supervisor can restart them. Workers also automatically exit on on hard timeouts to prevent an inconsistent process state.
-
Unique queues
TaskTiger has the option to avoid duplicate tasks in the task queue. In some cases it is desirable to combine multiple similar tasks. For example, imagine a task that indexes objects (e.g. to make them searchable). If an object is already present in the task queue and hasn't been processed yet, a unique queue will ensure that the indexing task doesn't have to do duplicate work. However, if the task is already running while it's queued, the task will be executed another time to ensure that the indexing task always picks up the latest state.
-
Task locks
TaskTiger can ensure to never execute more than one instance of tasks with similar arguments by acquiring a lock. If a task hits a lock, it is requeued and scheduled for later executions after a configurable interval.
-
Task retrying
TaskTiger lets you retry exceptions (all exceptions or a list of specific ones) and comes with configurable retry intervals (fixed, linear, exponential, custom).
-
Flexible queues
Tasks can be easily queued in separate queues. Workers pick tasks from a randomly chosen queue and can be configured to only process specific queues, ensuring that all queues are processed equally. TaskTiger also supports subqueues which are separated by a period. For example, you can have per-customer queues in the form
process_emails.CUSTOMER_IDand start a worker to processprocess_emailsand any of its subqueues. Since tasks are picked from a random queue, all customers get equal treatment: If one customer is queueing many tasks it can't block other customers' tasks from being processed. A maximum queue size can also be enforced. -
Batch queues
Batch queues can be used to combine multiple queued tasks into one. That way, your task function can process multiple sets of arguments at the same time, which can improve performance. The batch size is configurable.
-
Scheduled and periodic tasks
Tasks can be scheduled for execution at a specific time. Tasks can also be executed periodically (e.g. every five seconds).
-
Structured logging
TaskTiger supports JSON-style logging via structlog, allowing more flexibility for tools to analyze the log. For example, you can use TaskTiger together with Logstash, Elasticsearch, and Kibana.
The structlog processor
tasktiger.logging.tasktiger_processorcan be used to inject the current task id into all log messages. -
Reliability
TaskTiger atomically moves tasks between queue states, and will re-execute tasks after a timeout if a worker crashes.
-
Error handling
If an exception occurs during task execution and the task is not set up to be retried, TaskTiger stores the execution tracebacks in an error queue. The task can then be retried or deleted manually. TaskTiger can be easily integrated with error reporting services like Rollbar.
-
Admin interface
A simple admin interface using flask-admin exists as a separate project (tasktiger-admin_).
.. _tasktiger-admin: https://github.com/closeio/tasktiger-admin
Quick start
It is easy to get started with TaskTiger.
Create a file that contains the task(s).
.. code:: python
tasks.py
def my_task(): print('Hello')
Queue the task using the delay method.
.. code:: python
In [1]: import tasktiger, tasks In [2]: tiger = tasktiger.TaskTiger() In [3]: tiger.delay(tasks.my_task)
Run a worker (make sure the task code can be found, e.g. using PYTHONPATH).
.. code:: bash
% PYTHONPATH=. tasktiger {"timestamp": "2015-08-27T21:00:09.135344Z", "queues": null, "pid": 69840, "event": "ready", "level": "info"} {"task_id": "6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79", "level": "debug", "timestamp": "2015-08-27T21:03:56.727051Z", "pid": 69840, "queue": "default", "child_pid": 70171, "event": "processing"} Hello {"task_id": "6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79", "level": "debug", "timestamp": "2015-08-27T21:03:56.732457Z", "pid": 69840, "queue": "default", "event": "done"}
Configuration
A TaskTiger object keeps track of TaskTiger's settings and is used to
decorate and queue tasks. The constructor takes the following arguments:
-
connectionRedis connection object. The connection should be initialized with
decode_responses=Trueto avoid encoding problems on Python 3. -
configDict with config options. Most configuration options don't need to be changed, and a full list can be seen within
TaskTiger's__init__method.Here are a few commonly used options:
-
ALWAYS_EAGERIf set to
True, all tasks except future tasks (whenis a future time) will be executed locally by blocking until the task returns. This is useful for testing purposes. -
BATCH_QUEUESSet up queues that will be processed in batch, i.e. multiple jobs are taken out of the queue at the same time and passed as a list to the worker method. Takes a dict where the key represents the queue name and the value represents the batch size. Note that the task needs to be declared as
batch=True. Also note that any subqueues will be automatically treated as batch queues, and the batch value of the most specific subqueue name takes precedence. -
ONLY_QUEUESIf set to a non-empty list of queue names, a worker only processes the given queues (and their subqueues), unless explicit queues are passed to the command line.
-
-
setup_structlogIf set to True, sets up structured logging using
structlogwhen initializing TaskTiger. This makes writing custom worker scripts easier since it doesn't require the user to set upstructlogin advance.
Example:
.. code:: python
import tasktiger from redis import Redis conn = Redis(db=1, decode_responses=True) tiger = tasktiger.TaskTiger(connection=conn, config={ 'BATCH_QUEUES': { # Batch up to 50 tasks that are queued in the my_batch_queue or any # of its subqueues, except for the send_email subqueue which only # processes up to 10 tasks at a time. 'my_batch_queue': 50, 'my_batch_queue.send_email': 10, }, })
Task decorator
TaskTiger provides a task decorator to specify task options. Note that simple tasks don't need to be decorated. However, decorating the task allows you to use an alternative syntax to queue the task, which is compatible with Celery:
.. code:: python
tasks.py
import tasktiger tiger = tasktiger.TaskTiger()
@tiger.task() def my_task(name, n=None): print('Hello', name)
.. code:: python
In [1]: import tasks
The following are equivalent. However, the second syntax can only be used
if the task is decorated.
In [2]: tasks.tiger.delay(my_task, args=('John',), kwargs={'n': 1}) In [3]: tasks.my_task.delay('John', n=1)
Task options
Tasks support a variety of options that can be specified either in the task
decorator, or when queueing a task. For the latter, the delay method must
be called on the TaskTiger object, and any options in the task decorator
are overridden.
.. code:: python
@tiger.task(queue='myqueue', unique=True) def my_task(): print('Hello')
.. code:: python
The task will be queued in "otherqueue", even though the task decorator
says "myqueue".
tiger.delay(my_task, queue='otherqueue')
When queueing a task, the task needs to be defined in a module other than the
Python file which is being executed. In other words, the task can't be in the
__main__ module. TaskTiger will give you back an error otherwise.
The following options are supported by both delay and the task decorator:
-
queueName of the queue where the task will be queued.
-
hard_timeoutIf the task runs longer than the given number of seconds, it will be killed and marked as failed.
-
uniqueBoolean to indicate whether the task will only be queued if there is no similar task with the same function, arguments, and keyword arguments in the queue. Note that multiple similar tasks may still be executed at the same time since the task will still be inserted into the queue if another one is being processed. Requeueing an already scheduled unique task will not change the time it was originally scheduled to execute at.
-
unique_keyIf set, this implies
unique=Trueand specifies the list of kwargs to use to construct the unique key. By default, all args and kwargs are serialized and hashed. -
lockBoo
