SkillAgentSearch skills...

Rstream

A Python asyncio-based client for RabbitMQ Streams

Install / Use

/learn @rabbitmq-community/Rstream
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

RabbitMQ Stream Python Client

A Python asyncio-based client for RabbitMQ Streams

The RabbitMQ stream plug-in is required. See the documentation for enabling it.

Table of Contents

Installation

The RabbitMQ stream plug-in is required. See the documentation for enabling it.

The client is distributed via PIP:

 pip install rstream

Examples

Here you can find different examples.

Client Codecs

Before start using the client is important to read this section. The client supports two codecs to store the messages to the server:

  • AMQP 1.0
  • Binary

By default you should use AMQP 1.0 codec:

   amqp_message = AMQPMessage(
    body=bytes("hello: {}".format(i), "utf-8"),
  )

AMQP 1.0 codec vs Binary

You need to use the AMQP 1.0 codec to exchange messages with other stream clients like Java, .NET, Rust, Go or if you want to use the AMQP 0.9.1 clients.

You can use the Binary version if you need to exchange messages from Python to Python.

<b>Note</b>: The messages stored in Binary are not compatible with the other clients and with AMQP 0.9.1 clients. <br /> Once the messages are stored to the server, you can't change them.

Read also the Client Performances section

Publishing messages

You can publish messages with four different methods:

  • send: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires.
  • send_batch: synchronous, the user buffers the messages and sends them. This is the fastest publishing method.
  • send_wait: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method.
  • send_sub_entry: asynchronous. See Sub-entry batching and compression.

On the examples directory you can find diffent way to send the messages:

Publishing with confirmation

The Send method takes as parameter an handle function that will be called asynchronously when the message sent will be notified from the server to have been published.

Example:

With send_wait instead will wait until the confirmation from the server is received.

Sub-Entry Batching and Compression

RabbitMQ Stream provides a special mode to publish, store, and dispatch messages: sub-entry batching. This mode increases throughput at the cost of increased latency and potential duplicated messages even when deduplication is enabled. It also allows using compression to reduce bandwidth and storage if messages are reasonably similar, at the cost of increasing CPU usage on the client side.

Sub-entry batching consists in squeezing several messages – a batch – in the slot that is usually used for one message. This means outbound messages are not only batched in publishing frames, but in sub-entries as well.


  # sending with compression
   await producer.send_sub_entry(
        STREAM, compression_type=CompressionType.Gzip, sub_entry_messages=messages
   )

Full example producer using sub-entry batch

Consumer side is automatic, so no need configurations.

The client is shipped with No Compression (CompressionType.No) and Gzip Compression (CompressionType.Gzip) the other compressions (Snappy, Lz4, Zstd) can be used implementing the ICompressionCodec class.

Deduplication

RabbitMQ Stream can detect and filter out duplicated messages, based on 2 client-side elements: the producer name and the message publishing ID. All the producer methods to send messages (send, send_batch, send_wait) takes a publisher_name parameter while the message publishing id can be set in the AMQP message.

Example:

Consuming messages

See consumer examples for basic consumer and consumers with different offsets.

Server-side offset tracking

RabbitMQ Streams provides server-side offset tracking for consumers. This features allows a consuming application to restart consuming where it left off in a previous run. You can use the store_offset (to store an offset in the server) and query_offset (to query it) methods of the consumer class like in this example:

Superstreams

A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes.

See the blog post for more info.

You can use superstream_producer and superstream_consumer classes which internally uses producers and consumers to operate on the componsing streams.

See the Super Stream example

Single Active Consumer

Single active consumer provides exclusive consumption and consumption continuity on a stream. <br /> See the blog post for more info. See examples in:

See the single active consumer example

Filtering

Filtering is a new streaming feature enabled from RabbitMQ 3.13 based on Bloom filter. RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side. This helps to save network bandwidth when a consuming application needs only a subset of messages.

https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#filtering

See the filtering examples

Connecting with SSL

You can enable ssl/tls. See example here: tls example

Sasl Mechanisms

You can use the following sasl mechanisms:

  • PLAIN
  • EXTERNAL

The client uses PLAIN mechanism by default.

The EXTERNAL mechanism is used to authenticate a user based on a certificate presented by the client. Example:

    ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    # put the root certificate of the ca
    ssl_context.load_verify_locations("certs/ca_certificate.pem")
    ssl_context.load_cert_chain(
        "certs/client_HOSTNAME_certificate.pem",
        "certs/client_HOSTNAME_key.pem",
    )

    async with Producer(
        "HOSTNAME",
        username="not_important",
        password="not_important",
        port=5551,
        ssl_context=ssl_context,
        sasl_configuration_mechanism=SlasMechanism.MechanismExternal ## <--- here EXTERNAL configuration

The plugin rabbitmq_auth_mechanism_ssl needs to be enabled on the server side, and ssl_options.fail_if_no_peer_cert needs to set to true config example:

auth_mechanisms.3 = PLAIN
auth_mechanisms.2 = AMQPLAIN
auth_mechanisms.1 = EXTERNAL

ssl_options.cacertfile = certs/ca_certificate.pem
ssl_options.certfile = certs/server_certificate.pem
ssl_options.keyfile = certs/server_key.pem
listeners.ssl.default = 5671
stream.listeners.ssl.default = 5551
ssl_options.verify           

Related Skills

View on GitHub
GitHub Stars101
CategoryDevelopment
Updated13d ago
Forks19

Languages

Python

Security Score

100/100

Audited on Mar 18, 2026

No findings