SkillAgentSearch skills...

Aop

AMQP on Pulsar protocol handler

Install / Use

/learn @streamnative/Aop
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->

LICENSE

Notes

The AoP plugin supports only the 0-9-1 protocol with basic produce and consume functionalities, and does not include advanced features such as transactions. It is available as an open-source plugin and is only offered as a private preview feature in the Private Cloud distribution as an experimental feature. It is not available on StreamNative Cloud. Please use it with caution.

AMQP on Pulsar (AoP)

AoP stands for AMQP on Pulsar. AoP broker supports AMQP0-9-1 protocol, and is backed by Pulsar.

AoP is implemented as a Pulsar ProtocolHandler with protocol name "amqp". ProtocolHandler is built as a nar file, and is loaded when Pulsar Broker starts.

Limitations

AoP is implemented based on Pulsar features. However, the methods of using Pulsar and using AMQP are different. The following are some limitations of AoP.

  • Currently, the AoP protocol handler supports AMQP0-9-1 protocol and only supports durable exchange and durable queue.
  • A Vhost is backed by a namespace which can only have one bundle. You need to create a namespace in advance for the Vhost.
  • AoP is supported on Pulsar 2.6.1 or later releases.

Get started

In this guide, you will learn how to use the Pulsar broker to serve requests from AMQP client.

Download Pulsar

Download Pulsar 2.6.1 binary package apache-pulsar-2.6.1-bin.tar.gz. and unzip it.

Download and Build AoP Plugin

You can download aop nar file from the AoP releases.

To build from code, complete the following steps:

  1. Clone the project from GitHub to your local.
git clone https://github.com/streamnative/aop.git
cd aop
  1. Build the project.
mvn clean install -DskipTests

You can find the nar file in the following directory.

./amqp-impl/target/pulsar-protocol-handler-amqp-${version}.nar

Configuration

|Name|Description|Default| |---|---|---| amqpTenant|AMQP on Pulsar broker tenant|public amqpListeners|AMQP service port|amqp://127.0.0.1:5672 amqpMaxNoOfChannels|The maximum number of channels which can exist concurrently on a connection|64 amqpMaxFrameSize|The maximum frame size on a connection|4194304 (4MB) amqpHeartBeat|The default heartbeat timeout of AoP connection|60 (s) amqpProxyPort|The AMQP proxy service port|5682 amqpProxyEnable|Whether to start proxy service|false

Configure Pulsar broker to run AoP protocol handler as Plugin

As mentioned above, AoP module is loaded with Pulsar broker. You need to add configs in Pulsar's config file, such as broker.conf or standalone.conf.

  1. Protocol handler configuration

You need to add messagingProtocols(the default value is null) and protocolHandlerDirectory (the default value is "./protocols"), in Pulsar configuration files, such as broker.conf or standalone.conf. For AoP, the value for messagingProtocols is amqp; the value for protocolHandlerDirectory is the directory of AoP nar file.

The following is an example.

messagingProtocols=amqp
protocolHandlerDirectory=./protocols
  1. Set AMQP service listeners

Set AMQP service listeners. Note that the hostname value in listeners is the same as Pulsar broker's advertisedAddress.

The following is an example.

amqpListeners=amqp://127.0.0.1:5672
advertisedAddress=127.0.0.1

Run Pulsar broker

With the above configuration, you can start your Pulsar broker. For details, refer to Pulsar Get started guides.

cd apache-pulsar-2.6.1
bin/pulsar standalone

Run AMQP Client to verify

Log level configuration

In Pulsar log4j2.yaml config file, you can set AoP log level.

The following is an example.

    Logger:
      - name: io.streamnative.pulsar.handlers.amqp
        level: debug
        additivity: false
        AppenderRef:
          - ref: Console

AoP configuration

There is also other configs that can be changed and placed into Pulsar broker config file.

<!what's the "other configs"?>

Contribute

Prerequisite

If you want to make contributions to AMQP on Pulsar, follow the following steps.

  1. Install system dependency.

From version 2.11.0, the AoP need JDK 17.

Dependency | Installation guide |---|--- Java 17 | https://openjdk.java.net/install/ Maven | https://maven.apache.org/

  1. Clone code to your machine.

    git@github.com:streamnative/aop.git
    
  2. Build the project.

    mvn install -DskipTests
    

Contribution workflow

Step 1: Fork

  1. Visit https://github.com/streamnative/aop
  2. Click Fork button (top right) to establish a cloud-based fork.

Step 2: Clone fork to local machine

Create your clone.

$ cd $working_dir
$ git clone https://github.com/$user/aop

Set your clone to track upstream repository.

$ cd $working_dir/aop
$ git remote add upstream https://github.com/streamnative/aop.git

Use the git remote -v command, you find the output looks as follows:

origin    https://github.com/$user/aop.git (fetch)
origin    https://github.com/$user/aop.git (push)
upstream  https://github.com/streamnative/aop (fetch)
upstream  https://github.com/streamnative/aop (push)

Step 3: Keep your branch in sync

Get your local master up to date.

$ cd $working_dir/aop
$ git checkout master
$ git fetch upstream
$ git rebase upstream/master
$ git push origin master 

Step 4: Create your branch

Branch from master.

$ git checkout -b myfeature

Step 5: Edit the code

You can now edit the code on the myfeature branch.

Step 6: Commit

Commit your changes.

$ git add <filename>
$ git commit -m "$add a comment"

Likely you'll go back and edit-build-test in a few cycles.

The following commands might be helpful for you.

$ git add <filename> (used to add one file)
git add -A (add all changes, including new/delete/modified files)
git add -a -m "$add a comment" (add and commit modified and deleted files)
git add -u (add modified and deleted files, not include new files)
git add . (add new and modified files, not including deleted files)

Step 7: Push

When your commit is ready for review (or just to establish an offsite backup of your work), push your branch to your fork on github.com:

$ git push origin myfeature

Step 8: Create a pull request

  1. Visit your fork at https://github.com/$user/aop (replace $user obviously).
  2. Click the Compare & pull request button next to your myfeature branch.

Step 9: Get a code review

Once you open your pull request, at least two reviewers will participate in reviewing. Those reviewers will conduct a thorough code review, looking for correctness, bugs, opportunities for improvement, documentation and comments, and style.

Commit changes made in response to review comments to the same branch on your fork.

Very small PRs are easy to review. Very large PRs are very difficult to review.

How to use Pulsar standalone

  1. Clone this project from GitHub to your local.

    git clone https://github.com/streamnative/aop.git
    cd aop
    
  2. Build the project.

    mvn clean install -DskipTests
    
  3. Copy the nar package to Pulsar protocols directory.

    cp ./amqp-impl/target/pulsar-protocol-handler-amqp-${version}.nar $PULSAR_HOME/protocols/pulsar-protocol-handler-amqp-${version}.nar
    
  4. Modify Pulsar standalone configuration

    # conf file: $PULSAR_HOME/conf/standalone.conf
    
    # add amqp configs
    messagingProtocols=amqp
    protocolHandlerDirectory=./protocols
    
    amqpListeners=amqp://127.0.0.1:5672
    advertisedAddress=127.0.0.1
    
  5. Start Pulsar in standalone mode.

    $PULSAR_HOME/bin/pulsar standalone
    
  6. Add namespace for vhost.

    # for example, the vhost name is `vhost1`
    bin/pulsar-admin namespaces create -b 1 public/vhost1
    # set retention for the namespace
    bin/pulsar-admin namespaces set-retention -s 100M -t 2d public/vhost1
    
  7. Use RabbitMQ client test

    # add RabbitMQ client dependency in your project
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
    </dependency>
    
    // Java Code
    
    // create connection
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setVirtualHost("vhost1");
    connectionFactory.setHost("127.0.0.1");
    connectionFactory.setPort(5672);
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    
    String exchan
    

Related Skills

View on GitHub
GitHub Stars123
CategoryDevelopment
Updated8d ago
Forks43

Languages

Java

Security Score

100/100

Audited on Mar 22, 2026

No findings