Rmqcpp
A batteries included C++ RabbitMQ Client Library/API.
Install / Use
/learn @bloomberg/RmqcppREADME
rmqcpp - A C++ library for RabbitMQ
Menu
- Rationale
- Quick Start
- Basic Usage
- Documentation
- Building
- Installation
- Contributions
- License
- Code of Conduct
- Security Vulnerability Reporting
Rationale
rmqcpp provides a testable, data-safety-focused and async-capable API that strives to be easy to use.
Out of the box, rmqcpp will never silently drop messages, and will always reconnect.
Features
This library has been built from experience learned while supporting other RabbitMQ libraries for many years. Using rmqcpp straightaway ticks a lot of boxes in our internal RabbitMQ best practices, as well as those suggested by CloudAMQP, and AWS.
- ❤ Heartbeats are always enabled, and they are handled in the background.
- Heartbeats catch many network faults, yet some applications avoid them because existing libraries make heartbeats hard to implement.
rmqcpphas these implemented out of the box.
- Heartbeats catch many network faults, yet some applications avoid them because existing libraries make heartbeats hard to implement.
- 🔁 Out-of-the-box reconnection logic
- Clients using
rmqcppdo not need to implement any retry logic. The library will attempt to reconnect in the background forever. This aims to improve recovery times after network outages.
- Clients using
- 🏔 Topology declaration as part of Connection
rmqcppalways declares topology when creating consumers & producers, as per RabbitMQ best practices.
- ✉ Reliable Message Delivery 'on' by default
- Publisher confirmations. This ensures clients are aware when messages are owned by RabbitMQ, and avoids messages being silently black holed.
- Consumer acknowledgements. Switching these on manually helps avoid messages being silently dropped during restart/outages, as would be the case with 'autoack'.
- Durable queues and persistent delivery mode ensure messages always persist during broker restarts and total datacenter shutdowns.
- Mandatory flag is defaulted to 'true' for all messages to ensure none are silently dropped due to missed bindings.
- All of the above properties are used by default. Publisher confirms and consumer acknowledgements are required.
- 🚦 Publishing and Consuming happens on different connections
- A common application pitfall is to consume & produce on the same connection. This can cause slow-downs in consumption rate, as RabbitMQ applies backpressure to fast publishers - depending on the exact queues being consumed/published from this can cause a vicious cycle.
- ✂ Reconnect cancelled consumers
- Very occasionally, RabbitMQ consumers are cancelled by the broker. This case is rarely implemented properly when using other libraries.
rmqcppwill redeclare any consumers that have been cancelled.
- Very occasionally, RabbitMQ consumers are cancelled by the broker. This case is rarely implemented properly when using other libraries.
Library structure
rmqcpp is made up of a library which builds to librmq.a. Internally, this library contains a hierarchy of packages, of which rmqa, rmqt, and rmqp are the public-facing packages.
Internal packages
graph TD;
rmqamqpt-->rmqt;
rmqamqp-->rmqamqpt;
rmqamqp-->rmqio;
rmqamqp-->rmqt;
rmqio-->rmqamqpt;
rmqa-->rmqamqp;
rmqa-->rmqt;
rmqa-->rmqp;
rmqp-->rmqt;
Library | Purpose | Examples
:--- | :---- | :----
rmqp | RabbitMQ library interface (protocol) | Interfaces used to allow testing/mocking of rmqcpp applications
rmqa | RabbitMQ Library interface implementation | The main concrete objects used by applications
rmqt | Data types | The currency(data types used) between rmqa/b and rmqamqp layers
rmqtestmocks | gmock types | Objects useful for testing applications using rmqcpp
rmqamqp | AMQP abstraction layer | All logic relating to amqp communication, framing, classes and state machines - connection, channel queue
rmqamqpt | Low-level AMQP data types (from 0-9-1 spec) | Primitives of the AMQP standard, methods, frame, amqp types to C++ types mapping
rmqio | IO abstraction layer | Raw socket connections management, reads/writes AMQP frames from/to the wire. Contains a set of async io interfaces, and an implementation using boost::asio
Quick Start
The quickest way to get started is to take a look at our integration tests and sample 'hello world' program, which is possible by following the Docker Build steps and then: from the interactive shell window running ./build/examples/helloworld/rmqhelloworld_producer
Usage
// Holds required threads, and other resources.
// The context must live longer than objects created from it.
rmqa::RabbitContext rabbit;
// Create topology to be declared on every reconnection
rmqa::Topology topology;
rmqt::QueueHandle q1 = topology.addQueue("queue-name");
rmqt::ExchangeHandle e1 = topology.addExchange("exch-name");
// Bind e1 and q1 using binding key 'key'
topology.bind(e1, q1, "key");
// To create an auto-generated queue
rmqt::QueueHandle q2 = topology.addQueue();
topology.bind(e1, q2, "key2");
// Each `Producer`/`Consumer` has a reference to a `Topology` object
// which can be updated with updateTopology.
bsl::shared_ptr<rmqa::VHost> vhost = rabbit.createVHostConnection(
"my-connection",
bsl::make_shared<rmqt::SimpleEndpoint>(
"localhost", "rmqcpp", 5762),
bsl::make_shared<rmqt::PlainCredentials>(
"guest", "guest")); // returns immediately
// Get a producer
// How many messages can be awaiting confirmation before `send` blocks
const uint16_t maxOutstandingConfirms = 10;
rmqt::Result<rmqa::Producer> producerResult = vhost->createProducer(topology, e1, maxOutstandingConfirms);
if (!producerResult) {
// handle errors.
bsl::cout << "Error creating connection: " << producerResult.error();
return -1;
}
bsl::shared_ptr<rmqa::Producer> producer = producerResult.value();
void receiveConfirmation(const rmqt::Message& message,
const bsl::string& routingKey,
const rmqt::ConfirmResponse& response)
{
if (response.status() == rmqt::ConfirmResponse::ACK) {
// Message is now guaranteed to be safe with the broker
}
else {
// REJECT / RETURN indicate problem with the send request (bad routing
// key?)
}
}
bsl::string json = "[5, 3, 1]";
rmqt::Message message(
bsl::make_shared<bsl::vector<uint8_t> >(json.cbegin(), json.cend()));
// `send` returns immediately unless there are `maxOutstandingConfirms`
// oustanding messages already. In which case it waits until at least one
// confirm comes back.
// User must wait until the confirm callback is executed before considering
// the send to be committed.
const rmqp::Producer::SendStatus sendResult =
producer->send(message, "key", &receiveConfirmation);
if (sendResult != rmqp::Producer::SENDING) {
// Unable to enqueue this send
return -1;
}
// Consumer callback
class MessageConsumer {
private:
bool processMessage(const rmqt::Message& message)
{
// process Message here
return true;
}
public:
void operator()(rmqp::MessageGuard& messageGuard)
{
if (processMessage(messageGuard.message())) {
messageGuard.ack();
}
else {
messageGuard.nack();
// Would automatically nack if it goes out of scope
}
}
};
rmqt::Result<rmqa::Consumer> consumerResult =
vhost->createConsumer(
topology, // topology
q1, // queue
MessageConsumer(), // Consumer callback invoked on each message
"my consumer label", // Consumer Label (shows in Management UI)
500 // prefetch count
);
if (!consumerResult) {
// An argument passed to the consumer was bad, retrying will have no effect
return -1;
}
bsl::shared_ptr<rmqa::Consumer> consumer = consumerResult.value();
// Shutdown
// Blocks until `timeout` expires or all confirmations have been received
// Note this could block forever if a separate thread continues publishing
if (!producer->waitForConfirms(/* timeout */)) {
// Timeout expired
}
consumer->cancelAndDrain();
Documentation
Doxygen generated API documentation can be found here
Building
Prerequisites
These prerequisites can be skipped when using the docker build environment. Otherwise it is important to set these up.
vcpkg is the primary prerequisite for building rmqcpp. Follow the instructions here and set the environment variable VCPKG_ROOT to the install location, i.e. export VCPKG_ROOT=/build/vcpkg.
There are build configuration options which can be specified using the environment variable CMAKE_PRESET (choose from configurations in CMakePresets.json) - eg. export CMAKE_PRESET=macos-arm64-vcpkg.
zstd compression support is enabled by default, and requires the zstd library to be installed. To disable this, pass the -DENABLE_COMPRESSION=OFF option to cmake.
Build Steps
Once the prerequisites are configured:
Build the project and run the tests from your source root directory using the following commands:
make init- Initialize your build environment using the configured cmake and vcpkg setup - see above.make- Incremental build and run the tests.make build- Incremental build.make unit- Run built unit tests.
Docker Build
We also provide Dockerfiles for building and running this in an isolated environment. If you don't wish to get vcpkg set up on your build machine, this can be an alte
