SkillAgentSearch skills...

Raquel

Distributed task queues for Python using SQL and no orchestrator. Simple. Reliable. Flexible. Transparent.

Install / Use

/learn @vduseev/Raquel
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

raquel

<p> <a href="https://pypi.org/pypi/raquel"><img alt="Package version" src="https://img.shields.io/pypi/v/raquel?logo=python&logoColor=white&color=blue"></a> <a href="https://pypi.org/pypi/raquel"><img alt="Supported python versions" src="https://img.shields.io/pypi/pyversions/raquel?logo=python&logoColor=white"></a> </p>

Simple and elegant Job Queues for Python using SQL.

Tired of complex job queues for distributed computing or event-based systems? Do you want full visibility and complete reliability of your job queue? Raquel is a perfect solution for a distributed task queue and background workers.

  • Simple: Use any existing or standalone SQL database. Requires a single table!
  • Flexible: Schedule whatever you want however you want. No frameworks, no restrictions.
  • Reliable: Uses SQL transactions and handles exceptions, retries, and "at least once" execution. SQL guarantees persistent jobs.
  • Transparent: Full visibility into which jobs are running, which failed and why, which are pending, etc. Query anything using SQL.

Table of contents

Installation

pip install raquel

To install with async support, specify the asyncio extra. This simply adds the greenlet package as a dependency.

pip install raquel[asyncio]

Usage

Schedule jobs

In order for the job to be scheduled it needs to be added to the jobs table in the database. As long as it has the right status and timestamp, it will be picked up by the workers.

Jobs can be scheduled using the library or by inserting a row into the jobs table directly.

Using enqueue()

The easiest way to schedule a job is using the enqueue() method. By default, the job is scheduled for immediate execution.

from raquel import Raquel

# Raquel uses SQLAlchemy to connect to most SQL databases. You can pass
# a connection string or a SQLAlchemy engine.
rq = Raquel("postgresql+psycopg2://postgres:postgres@localhost/postgres")

# Enqueing a job is as simple as this
rq.enqueue(queue="messages", payload="Hello, World!")
rq.enqueue(queue="tasks", payload={"data": [1, 2]})

Payload can be any JSON-serializable object or simply a string. It can even be empty. In database, the payload is stored as UTF-8 encoded text for maximum compatibility with all SQL databases, so anything that can be serialized to text can be used as a payload.

By default, jobs end up in the "default" queue. Use the queue parameter to place jobs into different queues.

Using SQL insert

We can also schedule jobs using plain SQL by simply inserting a row into the jobs table. For example, in PostgreSQL:

-- Schedule 3 jobs in the "my-jobs" queue for immediate processing
INSERT INTO jobs 
    (id, queue, status, payload)
VALUES
    (uuid_generate_v4(), 'my-jobs', 'queued', '{"my": "payload"}'),
    (uuid_generate_v4(), 'my-jobs', 'queued', '101'),
    (uuid_generate_v4(), 'my-jobs', 'queued', 'Is this the real life?');

Pick up jobs

While you can manually claim, process, and update the job, you'd also need to handle exceptions, retries and other edge cases. The library provides convenient ways to do this.

Using dequeue()

The dequeue() method is a context manager that yields a Job object for you to work with. If there is no job to process, it will yield None instead.

while True:
    with rq.dequeue("tasks") as job:
        if job:
            do_work(job.payload)
        else:
          time.sleep(1)

The dequeue() will find the next job and claim it. It will also handle the job status, exceptions, retries and everything else automatically.

Failed jobs

Jobs are retried when they fail. When an exception is caught by the dequeue() context manager, the job is rescheduled with an exponential backoff delay.

By default, the job will be retried indefinitely. You can set the max_retry_count or max_age fields to limit the number of retries or the maximum age of the job.

with rq.dequeue("my-queue") as job:
    # Let the context manager handle the exception for you.
    # The exception will be caught and the job will be retried.
    # Under the hood, context manager will call `job.fail()` for you.
    raise Exception("Oh no")
    do_work(job.payload)

You can always handle the exception manually:

with rq.dequeue("my-queue") as job:
    # Catch an exception manually
    try:
        do_work(job.payload)
    except Exception as e:
        # If you mark the job as failed, it will be retried.
        job.fail(str(e))

Whenever job fails, the error and the traceback are stored in the error and error_trace columns. The job status is set to failed and the job will be retried. The attempt number is incremented.

Reschedule jobs

The reschedule() method is used to reprocess the job at a later time. The job will remain in the queue with a new scheduled execution time, and the current attempt won't count towards the maximum number of retries.

This method should only be called inside the dequeue() context manager.

with rq.dequeue("my-queue") as job:
    # Check if we have everything ready to process the job, and if not,
    # reschedule the job to run 10 minutes from now
    if not is_everything_ready_to_process(job.payload):
        job.reschedule(delay=timedelta(minutes=10))
    else:
        # Otherwise, process the job
        do_work(job.payload)

When you reschedule a job, its scheduled_at field is either updated with the new at and delay values or left unchanged. And the finished_at field is cleared. If the Job object had any error or error_trace values, they are saved to the database. The attempts field is incremented.

Here are some fancy ways to reschedule a job using reschedule():

# Run when the next day starts
with rq.dequeue("my-queue") as job:
    job.reschedule(
        at=datetime.now().replace(
            hour=0, minute=0, second=0, microsecond=0,
        ) + timedelta(days=1)
    )

# Same but using the `delay` parameter
with rq.dequeue("my-queue") as job:
    job.reschedule(
        at=datetime.now().replace(hour=0, minute=0, second=0, microsecond=0),
        delta=timedelta(days=1),
    )

# Run in 500 milliseconds
with rq.dequeue("my-queue") as job:
    job.reschedule(delay=500)

# Run in `min_retry_delay` milliseconds, as configured for this job
# (default is 1 second)
with rq.dequeue("my-queue") as job:
    job.reschedule()

Reject jobs

In case your worker can't process the job for some reason, you can reject it, allowing it to be immediately claimed by another worker.

This method should only be called inside the dequeue() context manager.

It is very similar to rescheduling the job to run immediately. When you reject the job, the scheduled_at field is left unchanged, but the claimed_at and claimed_by fields are cleared. The job status is set to queued. And the attempts field is incremented.

with rq.dequeue("my-queue") as job:
    if job.payload.get("requires_admin"):
        # Reject the job if the worker can't process it.
        job.reject()
    else:
        # Otherwise, process the job
        do_work(job.payload)

Async support

Everything in Raquel is designed to work with both sync and async code. You can use the AsyncRaquel class to enqueue and dequeue jobs in an async manner.

Just don't forget the asyncio extra when installing the package: raquel[asyncio].

import asyncio
from raquel import AsyncRaquel

rq = AsyncRaquel("postgresql+asyncpg://postgres:postgres@localhost/postgres")

async def main():
    await rq.enqueue("tasks", {'my': {'name_is': 'Slim Shady'}})

asyncio.run(main())

In async mode, the dequeue() context manager works the same way:

async def main():
    async with rq.dequeue("tasks") as job:
        if job:
            await do_work(job.payload)
        else:
            await asyncio.sleep(1)

asyncio.run(main())

Stats

  • List of queues

    >>> rq.queues()
    ['default', 'tasks']
    
    SELECT queue FROM jobs GROUP BY queue
    
  • Number of jobs per queue

    >>> rq.count("default")
    10
    
    SELECT queue, COUNT(*) FROM jobs WHERE queue = 'default' GROUP BY queue
    
  • Number of jobs per status

    >>> rq.stats()
    {'default': QueueStats(name='default', total=10, queued=10, claimed=0, success=0, failed=0, expired=0, exhausted=0, cancelled=0)}
    
    SELECT queue, status, COUNT(*) FROM jobs GROUP BY queue, status
    
  • Failed jobs

    Note that the failed jobs are still going to be picked up and reprocessed until they are marked as success, exhausted, expired, or cancelled.

    >>> rq.count("default", rq.FAILED)
    5
    
    SELECT * FROM jobs WHERE queue = 'default' AND status = 'failed'
    
  • Pending jobs, ready to be picked up by a worker

    >>> rq.count("default", [rq.QUEUE
    
View on GitHub
GitHub Stars49
CategoryData
Updated1mo ago
Forks3

Languages

Python

Security Score

95/100

Audited on Feb 3, 2026

No findings