SkillAgentSearch skills...

RabbitMqBundle

RabbitMQ Bundle for the Symfony web framework

Install / Use

/learn @php-amqplib/RabbitMqBundle
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

RabbitMqBundle

Latest Version Test Scrutinizer Code Quality Code Coverage PHPStan Join the chat at https://gitter.im/php-amqplib/RabbitMqBundle

About

The RabbitMqBundle incorporates messaging in your application via RabbitMQ using the php-amqplib library.

The bundle implements several messaging patterns as seen on the Thumper library. Therefore publishing messages to RabbitMQ from a Symfony controller is as easy as:

$msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
$this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));

Later when you want to consume 50 messages out of the upload_pictures queue, you just run on the CLI:

$ ./app/console rabbitmq:consumer -m 50 upload_picture

All the examples expect a running RabbitMQ server.

This bundle was presented at Symfony Live Paris 2011 conference. See the slides here.

Version 2

Due to the breaking changes happened caused by Symfony >=4.4, a new tag was released, making the bundle compatible with Symfony >=4.4.

Installation

For Symfony Framework >= 4.4

Require the bundle and its dependencies with composer:

$ composer require php-amqplib/rabbitmq-bundle

Register the bundle:

// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
    );
}

Enjoy !

For a console application that uses Symfony Console, Dependency Injection and Config components

If you have a console application used to run RabbitMQ consumers, you do not need Symfony HttpKernel and FrameworkBundle. From version 1.6, you can use the Dependency Injection component to load this bundle configuration and services, and then use the consumer command.

Require the bundle in your composer.json file:

{
    "require": {
        "php-amqplib/rabbitmq-bundle": "^2.0",
    }
}

Register the extension and the compiler pass:

use OldSound\RabbitMqBundle\DependencyInjection\OldSoundRabbitMqExtension;
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass;

// ...

$containerBuilder->registerExtension(new OldSoundRabbitMqExtension());
$containerBuilder->addCompilerPass(new RegisterPartsPass());

Warning - BC Breaking Changes

  • Since 2012-06-04 Some default options for exchanges declared in the "producers" config section have changed to match the defaults of exchanges declared in the "consumers" section. The affected settings are:

    • durable was changed from false to true,
    • auto_delete was changed from true to false.

    Your configuration must be updated if you were relying on the previous default values.

  • Since 2012-04-24 The ConsumerInterface::execute method signature has changed

  • Since 2012-01-03 the consumers execute method gets the whole AMQP message object and not just the body. See the CHANGELOG file for more details.

Usage

Add the old_sound_rabbit_mq section in your configuration file:

old_sound_rabbit_mq:
    connections:
        default:
            host:     'localhost'
            port:     5672
            user:     'guest'
            password: 'guest'
            vhost:    '/'
            lazy:     false
            connection_timeout: 3
            read_write_timeout: 3
            
            # the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
            channel_rpc_timeout: 0.0

            # requires php-amqplib v2.4.1+ and PHP5.4+
            keepalive: false

            # requires php-amqplib v2.4.1+
            heartbeat: 0

            #requires php_sockets.dll
            use_socket: true # default false
            
            login_method: 'AMQPLAIN' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
            
        another:
            # A different (unused) connection defined by an URL. One can omit all parts,
            # except the scheme (amqp:). If both segment in the URL and a key value (see above)
            # are given the value from the URL takes precedence.
            # See https://www.rabbitmq.com/uri-spec.html on how to encode values.
            url: 'amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6'
    producers:
        upload_picture:
            connection:            default
            exchange_options:      {name: 'upload-picture', type: direct}
            service_alias:         my_app_service # no alias by default
            default_routing_key:   'optional.routing.key' # defaults to '' if not set
            default_content_type:  'content/type' # defaults to 'text/plain'
            default_delivery_mode: 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
    consumers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
            queue_options:    {name: 'upload-picture'}
            callback:         upload_picture_service
            options:
                no_ack:       false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.

Here we configure the connection service and the message endpoints that our application will have. In this example your service container will contain the service old_sound_rabbit_mq.upload_picture_producer and old_sound_rabbit_mq.upload_picture_consumer. The later expects that there's a service called upload_picture_service.

If you don't specify a connection for the client, the client will look for a connection with the same alias. So for our upload_picture the service container will look for an upload_picture connection.

If you need to add optional queue arguments, then your queue options can be something like this:

queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}

another example with message TTL of 20 seconds:

queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}

The argument value must be a list of datatype and value. Valid datatypes are:

  • S - String
  • I - Integer
  • D - Decimal
  • T - Timestamps
  • F - Table
  • A - Array
  • t - Bool

Adapt the arguments according to your needs.

If you want to bind queue with specific routing keys you can declare it in producer or consumer config:

queue_options:
    name: "upload-picture"
    routing_keys:
      - 'android.#.upload'
      - 'iphone.upload'

Important notice - Lazy Connections

In a Symfony environment all services are fully bootstrapped for each request, from version >= 4.3 you can declare a service as lazy (Lazy Services). This bundle still doesn't support new Lazy Services feature but you can set lazy: true in your connection configuration to avoid unnecessary connections to your message broker in every request. It's extremely recommended to use lazy connections because performance reasons, nevertheless lazy option is disabled by default to avoid possible breaks in applications already using this bundle.

Important notice - Heartbeats

It's a good idea to set the read_write_timeout to 2x the heartbeat so your socket will be open. If you don't do this, or use a different multiplier, there's a risk the consumer socket will timeout.

Please bear in mind, that you can expect problems, if your tasks are generally running longer than the heartbeat period, to which there are no good solutions (link). Consider using either a big value for the heartbeat or leave the heartbeat disabled in favour of the tcp's keepalive (both on the client and server side) and the graceful_max_execution_timeout feature.

Multiple Hosts

You can provide multiple hosts for a connection. This will allow you to use RabbitMQ cluster with multiple nodes.

  old_sound_rabbit_mq:
      connections:
          default:
              hosts:
                - host: host1
                  port: 3672
                  user: user1
                  password: password1
                  vhost: vhost1
                - url: 'amqp://guest:password@localhost:5672/vhost'
              connection_timeout: 3
              read_write_timeout: 3

Pay attention that you can not specify

  connection_timeout 
  read_write_timeout
  use_socket
  ssl_context
  keepalive
  heartbeat
  connection_parameters_provider 

parameters to each host separately.

Dynamic Connection Param

View on GitHub
GitHub Stars1.2k
CategoryDevelopment
Updated2d ago
Forks470

Languages

PHP

Security Score

100/100

Audited on Mar 22, 2026

No findings