Mop
MQTT on Pulsar implemented using Pulsar Protocol Handler
Install / Use
/learn @streamnative/MopREADME
MQTT on Pulsar (MoP)
MQTT-on-Pulsar (aka MoP) is developed to support MQTT protocol natively on Apache Pulsar.
Get started
Download or build MoP protocol handler
-
Clone the MoP project from GitHub to your local.
git clone https://github.com/streamnative/mop.git cd mop -
Build the project.
mvn clean install -DskipTests -
The NAR file can be found at this location.
./mqtt-broker/target/pulsar-protocol-handler-mqtt-${version}.nar
Install MoP protocol handler
Configure the Pulsar broker to run the MoP protocol handler as a plugin by adding configurations to the Pulsar configuration file, such as broker.conf or standalone.conf.
-
Set the configuration of the MoP protocol handler.
Add the following properties and set their values in the Pulsar configuration file, such as
conf/broker.conforconf/standalone.conf.| Property | Suggested value | Default value | |---|---|---
messagingProtocols| mqtt | nullprotocolHandlerDirectory| Location of MoP NAR file | ./protocolsExample
messagingProtocols=mqtt protocolHandlerDirectory=./protocols -
Set the MQTT server listeners.
Example
mqttListeners=mqtt://127.0.0.1:1883 advertisedAddress=127.0.0.1Note
The default hostname of
advertisedAddressis InetAddress.getLocalHost().getHostName(). If you'd like to config this, please keep the same as Pulsar broker'sadvertisedAddress.
Load MoP protocol handler
After you install the MoP protocol handler to Pulsar broker, you can restart the Pulsar brokers to load the MoP protocol handler.
How to use Proxy
To use the proxy, follow the following steps. For detailed steps, refer to Deploy a cluster on bare metal.
-
Prepare a ZooKeeper cluster.
-
Initialize the cluster metadata.
-
Prepare a BookKeeper cluster.
-
Copy the
pulsar-protocol-handler-mqtt-${version}.narto the$PULSAR_HOME/protocolsdirectory. -
Start the Pulsar broker.
Here is an example of the Pulsar broker configuration.
messagingProtocols=mqtt protocolHandlerDirectory=./protocols brokerServicePort=6651 mqttListeners=mqtt://127.0.0.1:1883 advertisedAddress=127.0.0.1 mqttProxyEnabled=true mqttProxyPort=5682
Verify MoP protocol handler
There are many MQTT client that can be used to verify the MoP protocol handler, such as MQTTBox, MQTT Toolbox. You can choose a CLI tool or interface tool to verify the MoP protocol handler.
The following example shows how to verify the MoP protocol handler with FuseSource MqttClient.
-
Add the dependency.
<dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId> <version>1.16</version> </dependency> -
Publish messages and consume messages.
MQTT mqtt = new MQTT(); mqtt.setHost("127.0.0.1", 1883); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); Topic[] topics = { new Topic("persistent://public/default/my-topic", QoS.AT_LEAST_ONCE) }; connection.subscribe(topics); // publish message connection.publish("persistent://public/default/my-topic", "Hello MOP!".getBytes(), QoS.AT_LEAST_ONCE, false); // receive message Message received = connection.receive();
Security
Enabling Authentication
MoP currently supports basic and token authentication methods. The token authentication method works
with any of the token based Pulsar authentication providers such as the built-in JWT provider and external token
authentication providers like biscuit-pulsar.
To use authentication for MQTT connections your Pulsar cluster must already have authentication enabled with your chosen authentication provider(s) configured.
You can then enable MQTT authentication with the following configuration properties:
mqttAuthenticationEnabled=true
mqttAuthenticationMethods=token
mqttAuthenticationMethods can be set to a comma delimited list if you wish to enable multiple authentication providers.
MoP will attempt each in order when authenticating client connections.
With authentication enabled MoP will not allow anonymous connections currently.
Authenticating client connections
Basic Authentication
Set the MQTT username and password client settings.
Token Authentication
Set the MQTT password to the token body, currently username will be disregarded but MUST be set to some value as this is required by the MQTT specification.
Enabling Authorization
MoP currently supports authorization. When authorization enabled, MoP will check the authenticated role if it has the ability to pub/sub topics, eg: When sending messages, you need to have the produce permission of the topic. When subscribing to a topic, you need to have the consume permission of the topic. You can reference here to grant permissions.
You can then enable MQTT authorization with the following configuration properties:
mqttAuthorizationEnabled=true
If MoP proxy enabled, following configuration needs to be configured and brokerClientAuthenticationParameters should configure lookup permission at least:
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationBasic
brokerClientAuthenticationParameters={"userId":"superUser","password":"superPass"}
Enabling TLS
MoP currently supports TLS transport encryption.
Generate crt and key file :
openssl genrsa 2048 > server.key
chmod 400 server.key
openssl req -new -x509 -nodes -sha256 -days 365 -key server.key -out server.crt
TLS with broker
-
Config mqtt broker to load tls config.
mqttListeners=mqtt+ssl://127.0.0.1:8883 mqttTlsCertificateFilePath=/xxx/server.crt mqttTlsKeyFilePath=/xxx/server.key -
Config client to load tls config.
MQTT mqtt = new MQTT(); // default tls port mqtt.setHost(URI.create("ssl://127.0.0.1:8883")); File crtFile = new File("server.crt"); Certificate certificate = CertificateFactory.getInstance("X.509").generateCertificate(new FileInputStream(crtFile)); KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); keyStore.load(null, null); keyStore.setCertificateEntry("server", certificate); TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); trustManagerFactory.init(keyStore); SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, trustManagerFactory.getTrustManagers(), null); mqtt.setSslContext(sslContext); BlockingConnection connection = mqtt.blockingConnection(); connection.connect();
TLS with proxy
-
Config mqtt broker to load tls config.
mqttProxyEnable=true mqttProxyTlsEnabled=true mqttTlsCertificateFilePath=/xxx/server.crt mqttTlsKeyFilePath=/xxx/server.key -
Config client to load tls config.
MQTT mqtt = new MQTT(); // default proxy tls port mqtt.setHost(URI.create("ssl://127.0.0.1:5683")); File crtFile = new File("server.crt"); Certificate certificate = CertificateFactory.getInstance("X.509").generateCertificate(new FileInputStream(crtFile)); KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); keyStore.load(null, null); keyStore.setCertificateEntry("server", certificate); TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); trustManagerFactory.init(keyStore); SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, trustManagerFactory.getTrustManagers(), null); mqtt.setSslContext(sslContext); BlockingConnection connection = mqtt.blockingConnection(); connection.connect();
TLS PSK with broker
Please reference here to learn more about TLS-PSK.
-
Config mqtt broker to load tls psk config.
mqttTlsPskEnabled=true mqttListeners=mqtt+ssl+psk://127.0.0.1:8884 // any string can be specified mqttTlsPskIdentityHint=alpha // identity is semicolon list of string with identity:secret format mqttTlsPskIdentity=mqtt:mqtt123Optional configs
| Config key | Comment | |:----------------------:| -------- | | mqttTlsP
