Raquel
Distributed task queues for Python using SQL and no orchestrator. Simple. Reliable. Flexible. Transparent.
Install / Use
/learn @vduseev/RaquelREADME
raquel
<p> <a href="https://pypi.org/pypi/raquel"><img alt="Package version" src="https://img.shields.io/pypi/v/raquel?logo=python&logoColor=white&color=blue"></a> <a href="https://pypi.org/pypi/raquel"><img alt="Supported python versions" src="https://img.shields.io/pypi/pyversions/raquel?logo=python&logoColor=white"></a> </p>Simple and elegant Job Queues for Python using SQL.
Tired of complex job queues for distributed computing or event-based systems? Do you want full visibility and complete reliability of your job queue? Raquel is a perfect solution for a distributed task queue and background workers.
- Simple: Use any existing or standalone SQL database. Requires a single table!
- Flexible: Schedule whatever you want however you want. No frameworks, no restrictions.
- Reliable: Uses SQL transactions and handles exceptions, retries, and "at least once" execution. SQL guarantees persistent jobs.
- Transparent: Full visibility into which jobs are running, which failed and why, which are pending, etc. Query anything using SQL.
Table of contents
Installation
pip install raquel
To install with async support, specify the asyncio extra. This simply
adds the greenlet package as a dependency.
pip install raquel[asyncio]
Usage
Schedule jobs
In order for the job to be scheduled it needs to be added to the jobs table
in the database. As long as it has the right status and timestamp, it will be
picked up by the workers.
Jobs can be scheduled using the library or by inserting a row into the jobs
table directly.
Using enqueue()
The easiest way to schedule a job is using the enqueue() method. By default,
the job is scheduled for immediate execution.
from raquel import Raquel
# Raquel uses SQLAlchemy to connect to most SQL databases. You can pass
# a connection string or a SQLAlchemy engine.
rq = Raquel("postgresql+psycopg2://postgres:postgres@localhost/postgres")
# Enqueing a job is as simple as this
rq.enqueue(queue="messages", payload="Hello, World!")
rq.enqueue(queue="tasks", payload={"data": [1, 2]})
Payload can be any JSON-serializable object or simply a string. It can even be empty. In database, the payload is stored as UTF-8 encoded text for maximum compatibility with all SQL databases, so anything that can be serialized to text can be used as a payload.
By default, jobs end up in the "default" queue. Use the queue parameter
to place jobs into different queues.
Using SQL insert
We can also schedule jobs using plain SQL by simply inserting a row into the
jobs table. For example, in PostgreSQL:
-- Schedule 3 jobs in the "my-jobs" queue for immediate processing
INSERT INTO jobs
(id, queue, status, payload)
VALUES
(uuid_generate_v4(), 'my-jobs', 'queued', '{"my": "payload"}'),
(uuid_generate_v4(), 'my-jobs', 'queued', '101'),
(uuid_generate_v4(), 'my-jobs', 'queued', 'Is this the real life?');
Pick up jobs
While you can manually claim, process, and update the job, you'd also need to handle exceptions, retries and other edge cases. The library provides convenient ways to do this.
Using dequeue()
The dequeue() method is a context manager that yields a Job object for you
to work with. If there is no job to process, it will yield None instead.
while True:
with rq.dequeue("tasks") as job:
if job:
do_work(job.payload)
else:
time.sleep(1)
The dequeue() will find the next job and claim it. It will also handle
the job status, exceptions, retries and everything else automatically.
Failed jobs
Jobs are retried when they fail. When an exception is caught by the
dequeue() context manager, the job is rescheduled with an exponential
backoff delay.
By default, the job will be retried indefinitely. You can set the
max_retry_count or max_age fields to limit the number of retries or the
maximum age of the job.
with rq.dequeue("my-queue") as job:
# Let the context manager handle the exception for you.
# The exception will be caught and the job will be retried.
# Under the hood, context manager will call `job.fail()` for you.
raise Exception("Oh no")
do_work(job.payload)
You can always handle the exception manually:
with rq.dequeue("my-queue") as job:
# Catch an exception manually
try:
do_work(job.payload)
except Exception as e:
# If you mark the job as failed, it will be retried.
job.fail(str(e))
Whenever job fails, the error and the traceback are stored in the error and
error_trace columns. The job status is set to failed and the job will
be retried. The attempt number is incremented.
Reschedule jobs
The reschedule() method is used to reprocess the job at a later time.
The job will remain in the queue with a new scheduled execution time, and the
current attempt won't count towards the maximum number of retries.
This method should only be called inside the dequeue() context manager.
with rq.dequeue("my-queue") as job:
# Check if we have everything ready to process the job, and if not,
# reschedule the job to run 10 minutes from now
if not is_everything_ready_to_process(job.payload):
job.reschedule(delay=timedelta(minutes=10))
else:
# Otherwise, process the job
do_work(job.payload)
When you reschedule a job, its scheduled_at field is either updated with
the new at and delay values or left unchanged. And the finished_at field
is cleared. If the Job object had any error or error_trace values, they
are saved to the database. The attempts field is incremented.
Here are some fancy ways to reschedule a job using reschedule():
# Run when the next day starts
with rq.dequeue("my-queue") as job:
job.reschedule(
at=datetime.now().replace(
hour=0, minute=0, second=0, microsecond=0,
) + timedelta(days=1)
)
# Same but using the `delay` parameter
with rq.dequeue("my-queue") as job:
job.reschedule(
at=datetime.now().replace(hour=0, minute=0, second=0, microsecond=0),
delta=timedelta(days=1),
)
# Run in 500 milliseconds
with rq.dequeue("my-queue") as job:
job.reschedule(delay=500)
# Run in `min_retry_delay` milliseconds, as configured for this job
# (default is 1 second)
with rq.dequeue("my-queue") as job:
job.reschedule()
Reject jobs
In case your worker can't process the job for some reason, you can reject it, allowing it to be immediately claimed by another worker.
This method should only be called inside the dequeue() context manager.
It is very similar to rescheduling the job to run immediately. When you reject
the job, the scheduled_at field is left unchanged, but the claimed_at and
claimed_by fields are cleared. The job status is set to queued. And the
attempts field is incremented.
with rq.dequeue("my-queue") as job:
if job.payload.get("requires_admin"):
# Reject the job if the worker can't process it.
job.reject()
else:
# Otherwise, process the job
do_work(job.payload)
Async support
Everything in Raquel is designed to work with both sync and async code.
You can use the AsyncRaquel class to enqueue and dequeue jobs in an async
manner.
Just don't forget the asyncio extra when installing the package:
raquel[asyncio].
import asyncio
from raquel import AsyncRaquel
rq = AsyncRaquel("postgresql+asyncpg://postgres:postgres@localhost/postgres")
async def main():
await rq.enqueue("tasks", {'my': {'name_is': 'Slim Shady'}})
asyncio.run(main())
In async mode, the dequeue() context manager works the same way:
async def main():
async with rq.dequeue("tasks") as job:
if job:
await do_work(job.payload)
else:
await asyncio.sleep(1)
asyncio.run(main())
Stats
-
List of queues
>>> rq.queues() ['default', 'tasks']SELECT queue FROM jobs GROUP BY queue -
Number of jobs per queue
>>> rq.count("default") 10SELECT queue, COUNT(*) FROM jobs WHERE queue = 'default' GROUP BY queue -
Number of jobs per status
>>> rq.stats() {'default': QueueStats(name='default', total=10, queued=10, claimed=0, success=0, failed=0, expired=0, exhausted=0, cancelled=0)}SELECT queue, status, COUNT(*) FROM jobs GROUP BY queue, status -
Failed jobs
Note that the
failedjobs are still going to be picked up and reprocessed until they are marked assuccess,exhausted,expired, orcancelled.>>> rq.count("default", rq.FAILED) 5SELECT * FROM jobs WHERE queue = 'default' AND status = 'failed' -
Pending jobs, ready to be picked up by a worker
>>> rq.count("default", [rq.QUEUE
