SkillAgentSearch skills...

DtcQueueBundle

Symfony2/3/4/5 Queue Bundle (for background jobs) supporting Mongo (Doctrine ODM), Mysql (and any Doctrine ORM), RabbitMQ, Beanstalkd, Redis, and ... {write your own}

Install / Use

/learn @mmucklo/DtcQueueBundle

README

DtcQueueBundle

Build Status Scrutinizer Code Quality Code Coverage SymfonyInsight

Allow symfony developers to create background job as easily as: $worker->later()->process(1,2,3)

6.0 Release

See changes

Upgrading from 5.0: see UPGRADING-6.0.md

Supported Queues

  • MongoDB via Doctrine-ODM
  • Mysql / Doctrine 2 supported databases via Doctrine-ORM
  • Beanstalkd via pheanstalk
  • RabbitMQ via php-amqplib
  • Redis support via Predis or PhpRedis, or through SncRedisBundle

Trends

Introduction

This bundle provides a way to easily create and manage queued background jobs

Basic Features:

  • Ease of Use
    • Kickoff background tasks with a line of code or two
    • Easily add background worker services
      • Turn any code into background task with a few lines
  • Atomic operation for jobs
    • For ORM-based queues this is done without relying on transactions.
  • Admin interface
    • Web-based Admin interface with an optional performance graph
  • Command Line Interface
    • Commands to run, manage and debug jobs from console
  • Job Archival
    • ORM and ODM managers have built-in job-archival for finished jobs
  • Logs errors from worker
  • Various safety checks for things such as stalled jobs, exception jobs
    • Allows for reseting stalled and exception jobs via console commands
  • Built in Event Dispatcher

Job-specific Features:

  • Auto-retry on failure, exception
    • If a job exits with a failure code, it can auto-retry
    • Same for Exception if desired
  • Priority
    • Jobs can have levels of priority so that higher priority jobs can get processed first even if they were added
    • to the queue later.
  • Future Jobs (ODM / ORM / Redis)
    • Jobs can be scheduled to run at some time in the future
  • Batch
    • Jobs can be "batched" so that only one job runs, even if multiple are queued of the same type
  • Expires
    • Jobs can have an "expires" time so that they wont run after a certain point
      • (useful if the queue gets backed up and a job is worthless after a certain time)
  • Stalls (ODM / ORM)
    • Jobs that crash the interpreter, or get terminated for some other reason can be detected
      • These can be re-queued to run in the future.

Installation

Symfony 2/3

see /Resources/doc/symfony2-3.md

Symfony 4/5

see /Resources/doc/symfony4-5.md

Troubleshooting

see /Resources/doc/troubleshooting.md

Usage

Create a worker class that will work on the background job.

Example:

  • src/Worker/Fibonacci.php: (symfony 4/5)
  • src/AppBundle/Worker/Fibonacci.php: (symfony 2/3)
<?php
namespace App\Worker; // for symfony 2/3, the namespace would typically be AppBundle\Worker

class Fibonacci
    extends \Dtc\QueueBundle\Model\Worker
{
    private $filename;
    public function __construct() {
        $this->filename = '/tmp/fib-result.txt';
    }

    public function fibonacciFile($n) {
        $fib = $this->fibonacci($n);
        file_put_contents($this->filename, "{$n}: {$fib}");
    }


    public function fibonacci($n)
    {
        if($n == 0)
            return 0; //F0
        elseif ($n == 1)
            return 1; //F1
        else
            return $this->fibonacci($n - 1) + $this->fibonacci($n - 2);
    }

    public function getName() {
        return 'fibonacci';
    }

    public function getFilename()
    {
        return $this->filename;
    }
}

Create a DI service for the job, and tag it as a background worker.

YAML:

Symfony 5, 4 and 3.3, 3.4:

services:
    # for symfony 3 the class name would likely be AppBundle\Worker\FibonacciWorker
    App\Worker\Fibonacci:
        # public: false is possible if you completely use DependencyInjection for access to the service
        public: true
        tags:
            - { name: "dtc_queue.worker" }

Symfony 2, and 3.0, 3.1, 3.2:

services:
    app.worker.fibonacci:
        class: AppBundle\Worker\Fibonacci:
        tags:
            - { name: "dtc_queue.worker" }
XML:
<services>
	<!-- ... -->
	<service id="fibonacci" class="Fibonacci">
	    <tag name="dtc_queue.worker" />
	</service>
	<!-- ... -->
</services>

Create a job

Simple examples:

Command line:
bin/console dtc:queue:create_job <worker> <method> <arg>

bin/console dtc:queue:create_job fibonacci fibonacci 3

bin/console dtc:queue:create_job fibonacci fibonacciFile 8
Code:
// Dependency inject the worker or fetch it from the container
$fibonacci = $container->get('App\Worker\Fibonacci');

// For Symfony 3.3, 3.4
//     $fibonacci = $container->get('AppBundle\Worker\Fibonacci');
//

// For Symfony 2, 3.0, 3.1, 3.2:
//     $fibonacci = $container->get('app.worker.fibonacci');


// Basic Examples
$fibonacci->later()->fibonacci(20);
$fibonacci->later()->fibonacciFile(20);

// Batch Example
$fibonacci->batchLater()->fibonacci(20); // Batch up runs into a single run

// Timed Example
$fibonacci->later(90)->fibonacci(20); // Run 90 seconds later

// Priority
//    Note: whether 1 == High or Low priority is configurable, but by default it is High
$fibonacci->later(0, 1); // As soon as possible, High priority
$fibonacci->later(0, 125); // Medium priority
$fibonacci->later(0, 255); // Low priority

// Advanced Usage Example:
//  (If the job is not processed by $expireTime, then don't execute it ever...)
$expireTime = time() + 3600;
$fibonacci->later()->setExpiresAt(new \DateTime("@$expireTime"))->fibonacci(20); // Must be run within the hour or not at all
Create Jobs - Additional Information

For further instructions on creating jobs, including how to create a job from the command line using json-encoded arguments, see:

/Resources/doc/create-job.md

Running Jobs

It's recommended that you background the following console commands

bin/console dtc:queue:run -d 120
# the -d parameter is the number of seconds during which to keep executing jobs before ending. 
#  For example you could put the above command into cron or a cron-like system to run every 2 minutes
#
# There are a number of other parameters that could be passed to dtc:queue:run run this for a full list:
bin/console dtc:queue:run --help

Pruning Jobs

For ODM and ORM based stores, the archive tables and the regular job table (queue) can require periodic pruning.

The regular job table is for waiting and running jobs. If a job throws an exception that can't be caught or the process segfaults, machine crashes, etc. then jobs which never finished can remain in the job queue in the "Running" state.

The archive table is where finished and errored jobs end up after execution; this table can grow indefinitely.

For Mongo in production, it may be prudent to use a capped collection or TTL Indexes

For Mysql you could create an event to delete data periodically.

Nevertheless there are also several commands that exist that do similarly (and could be put into a periodic cron job as well):

bin/console dtc:queue:prune old --older 1m
# (deletes jobs older than one month from the Archive table)

# Clear out stalled jobs from the regular job table:
bin/console dtc:queue:prune stalled

# If you're recording runs...this is recommended:
bin/console dtc:queue:prune stalled_runs

# If you're recording runs...another recommendation
bin/console dtc:queue:prune old_runs --older 1m

# If you're recording timings
bin/console dtc:queue:prune old_job_timings --older 1m

# You can tune 1m to a smaller interval such as 10d (10 days) or even 1800s (1/2 hour)
#  if you have too many jobs flowing through the system.
bin/console dtc:queue:prune --help # lists other prune commands

Debugging

These commands may help with debugging issues with the queue:

bin/console dtc:queue:count # some status about the queue if available (ODM/ORM only)
bin/console dtc:queue:reset # resets errored and/or stalled jobs

# This is really only good for ORM/ODM based stores.
bin/console dtc:queue:run --id={jobId}

# (jobId could be obtained from mongodb / or your database, if using an ORM / ODM solution)

Tracking Runs

Each job run can be tracked in a table in an ORM / ODM backed datastore.

Ways to configure: app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)

dtc_queue:
    manager:
        # run defaults to whatever job is set to (which defaults to "odm", i.e. mongodb)
        #   If you set the job to rabbit_mq, or beanstalkd or something else, you need to set run
        #   to an ORM / ODM run_manager (or a custom such one) in order to get the runs to save
        #
        run: orm # other possible option is "odm" (i.e. mongodb)
    #
    # (optionally define your own run manager with id: dtc_queue.manager.run.{some_name} and put {some_name} under run:

MongoDB DocumentManager

Change the document manage

View on GitHub
GitHub Stars120
CategoryData
Updated4mo ago
Forks38

Languages

PHP

Security Score

97/100

Audited on Nov 2, 2025

No findings