OroMessageQueueBundle
[DEPRECATED] This repository is no longer maintained. Please use the main https://github.com/oroinc/platform/ repository.
Install / Use
/learn @oroinc/OroMessageQueueBundleREADME
OroMessageQueue Bundle
Note: This article is published in the Oro documentation library.
OroMessageQueueBundle incorporates the OroMessageQueue component into OroPlatform and thereby provides message queue processing capabilities for all application components.
Table of Contents
- Overview
- Usage
- consumer options
- Supervisord
- Name prefix the for Message Queue
- Internals
- Unit and Functional tests
- Stale Jobs
- Consumer heartbeat
- Resetting Symfony Container in consumer
- Security Context in consumer
- Buffering Messages
Overview
The bundle integrates OroMessageQueue component. It adds easy to use configuration layer, register services and tie them together, register handy cli commands.
Jobs
The bundle provides an entity and a web gui for the jobs. So the jobs are created in the db and have a web gui where you can monitor jobs status and interrupt jobs.
Usage
First, you have to configure a transport layer and set one to be default. For the config settings
# config/config.yml
oro_message_queue:
transport:
default: '%message_queue_transport%'
'%message_queue_transport%': '%message_queue_transport_config%'
client: ~
we can configure one of the supported transports via parameters:
DBAL transport
# config/parameters.yml
message_queue_transport: DBAL
message_queue_transport_config: ~
Once you configured everything you can start producing messages:
<?php
/** @var Oro\Component\MessageQueue\Client\MessageProducer $messageProducer **/
$messageProducer = $container->get('oro_message_queue.message_producer');
$messageProducer->send('aFooTopic', 'Something has happened');
To consume messages you have to first create a message processor:
<?php
use Oro\Component\MessageQueue\Consumption\MessageProcessor;
class FooMessageProcessor implements MessageProcessor, TopicSubscriberInterface
{
public function process(Message $message, Session $session)
{
echo $message->getBody();
return self::ACK;
// return self::REJECT; // when the message is broken
// return self::REQUEUE; // the message is fine but you want to postpone processing
}
public static function getSubscribedTopics()
{
return ['aFooTopic'];
}
}
Register it as a container service and subscribe to the topic:
oro_channel.async.change_integration_status_processor:
class: 'FooMessageProcessor'
tags:
- { name: 'oro_message_queue.client.message_processor' }
Now you can start consuming messages:
./bin/console oro:message-queue:consume
Note: Add -vvv to find out what is going while you are consuming messages. There is a lot of valuable debug info there.
Consumer options
--message-limit=MESSAGE-LIMITConsume n messages and exit--time-limit=TIME-LIMITConsume messages during this time--memory-limit=MEMORY-LIMITConsume messages until process reaches this memory limit in MB
The --memory-limit option is recommended for the normal consumer usage. If the option is set a consumer checks
the used memory amount after each message processing and terminates if it is exceeded. For example if a consumer
was run:
./bin/console oro:message-queue:consume --memory-limit=700
then:
- The consumer processing a message
- The consumer checks the used memory amount
- If it exceeds the option value (i.e. 705 MB or 780Mb or 1300 Mb) the consumer terminates (and Supervisord re-runs it)
- Otherwise it continues message processing.
We recommend to always set this option to the value 2-3 times less than php memory limit. It will help to avoid php memory limit error during message processing.
We recommend to set the --time-limit option to 5-10 minutes if using the DBAL transport to avoid database connection issues
Consumer interruption
Consumers can normally interrupt the message procession by many reasons:
- Out of memory (if the option is set)
- Timeout (if the option is set)
- Messages limit exceeded (if the option is set)
- Forcefully by an event:
- If a cache was cleared
- If a schema was updated
- If a maintenance mode was turned off
The normal interruption occurs only after a message was processed. If an event was fired during a message processing a consumer completes the message processing and interrupts after the processing is done.
Also a consumer interrupts if an exception was thrown during a message processing.
Supervisord
As you read before consumers can normally interrupt the message procession by many reasons.
In the all cases above the interrupted consumer should be re-run. So you must keep running
oro:message-queue:consume command and to do this best we advise you to delegate this responsibility
to Supervisord. With next program configuration supervisord keeps running
four simultaneous instances of oro:message-queue:consume command and cares about relaunch if instance
has dead by any reason. Pay attention that the program name defined in the [program:oro_message_consumer] must be unique from any other instances deployed on the same supervisord server even if they are for staging purposes only. As an example, set the following programs [program:prod_oro_message_consumer] and [program:dev_oro_message_consumer].
[program:oro_message_consumer]
command=/path/to/bin/console --env=prod --no-debug oro:message-queue:consume
process_name=%(program_name)s_%(process_num)02d
numprocs=4
autostart=true
autorestart=true
startsecs=0
user=apache
redirect_stderr=true
Name prefix for the Message Queue
To use several independent Message Queues on single RabbitMQ instance, configure a name prefix for the Message Queue. For example:
# config/config.yml
oro_message_queue:
...
client:
prefix: mq_oro_platform_test
router_destination: queue_name
default_destination: queue_name
In router_destsination and default_destionation, put the names of the queue specific to your environment.
In the prefix option, provide a string that should be prepended to the queue name.
Internals
Structure
You can skip it if you are only going to use the component. The component is split into several layers:
- Transport - The transport API provides a common way for programs to create, send, receive and read messages. Inspired by Java Message Service
- Router - An implementation of RecipientList pattern.
- Consumption - the layer provides tools to simplify consumption of messages. It provides a cli command, a queue consumer, message processor and ways to extend it.
- Client - provides a high level abstraction. It provides easy to use abstraction for producing and processing messages. It also reduces a need to configure a broker.

Flow
The client's message producer sends a message to a router message processor. It takes the message and search for real recipients who is interested in such a message. Then, It sends a copy of a message for all of them. Each target message processor takes its copy of the message and process it.

The message itself has headers and body and they change this way while traveling through the system:

Custom transport
If you happen to need to implement a custom provider take a look at transport's interfaces. You have to provide an implementation for them
Key Classes
- MessageProducer - The client's message producer, you will use it all the time to send messages
- MessageProcessorInterface - Each class which does the job has to implement this interface
- TopicSubscriberInterface - Kind of EventSubscriberInterface. It allows you to keep a processing code and topics it is subscribed to in one place.
- MessageConsumeCommand - A command you use to consume messages.
- QueueConsumer - A class that works inside the command and watch for a new message and once it is get it pass it to a message processor.
Unit and Functional tests
To test that a message was sent in unit and functional tests, you can use MessageQueueExtension trait. There are two implementation of this trait, one for unit tests, another for functional tests:
- Oro\Bundle\MessageQueueBundle\Test\Unit\MessageQueueExtension for unit tests
- Oro\Bundle\MessageQueueBundle\Test\Functional\MessageQueueExtension for functional tests
Also, in case if you need custom logic for manage sent messages, you
