Pulsar4s
Idiomatic, typesafe, and reactive Scala client for Apache Pulsar
Install / Use
/learn @CleverCloud/Pulsar4sREADME
pulsar4s - Apache Pulsar Scala Client
<img src="https://img.shields.io/maven-central/v/com.clever-cloud.pulsar4s/pulsar4s-core_3.svg?label=latest%20release%20for%203"/>
<img src="https://img.shields.io/maven-central/v/com.clever-cloud.pulsar4s/pulsar4s-core_2.13.svg?label=latest%20release%20for%202.13"/>
<img src="https://img.shields.io/maven-central/v/com.clever-cloud.pulsar4s/pulsar4s-core_2.12.svg?label=latest%20release%20for%202.12"/>
<img src="https://img.shields.io/nexus/s/https/oss.sonatype.org/com.clever-cloud.pulsar4s/pulsar4s-core_3.svg?label=latest%20snapshot&style=plastic"/>
<img src="https://img.shields.io/nexus/s/https/oss.sonatype.org/com.clever-cloud.pulsar4s/pulsar4s-core_2.13.svg?label=latest%20snapshot&style=plastic"/>
pulsar4s is a concise, idiomatic, reactive, type safe Scala client for Apache Pulsar. As a simple wrapper over the Java client, we benefit from the reliability and performance of that client while providing better integration with the Scala ecosystem and idioms.
- Supports different effects - scala.concurrent.Future, monix.eval.Task, cats.effect.IO, scalaz.concurrent.Task
- Uses scala.concurrent.duration.Duration
- Provides case classes rather than Java beans
- Akka Streams source and sink
- FS2 Reader and Writer
- Circe, SprayJson, PlayJson and Jackson implementations of Schema typeclass
Warning!!
(This disclaimer was written on 2023-01-05.)
Starting in version 2.9.0, we support scala 3. This means we had to perform some "aggressive" bumps on libs:
Libs that were bumped for everyone:
- play-json 2.10 (Currently in RC7)
- cats-effect 3.3 (was 2.x)
- ZIO 2.0 (was 1.x) & zio-cats-interop 23.0.0
Libs that come in different versions across scala versions:
- avro4s
- for Scala 3: 5.0+
- for Scala 2: 4.1+
- scala-java8-compat
- for Scala ≥2.13: 1.0.2
- for Scala 2.12: 0.8.0 (Was already the case before scala 3)
Check carefully that bumping pulsar4s will not break, especially with cats-effect!
Using the client
The first step is to create a client attached to the pulsar cluster, providing the service url.
val client = PulsarClient("pulsar://localhost:6650")
Alternatively, you can use an instance of PulsarClientConfig if you need to set further configuration
options such as authentication, tls, timeouts and so on.
val config = PulsarClientConfig("pulsar://localhost:6650", ...)
val client = PulsarClient(config)
Then we can create either a producer or a consumer from the client. We need an implicit schema in scope - more on that later.
To create a producer, we need the topic, and an instance of ProducerConfig.
We can set further options on the config object, such as max pending messages, router mode, producer name and so on.
implicit val schema: Schema[String] = Schema.STRING
val topic = Topic("persistent://sample/standalone/ns1/b")
val producerConfig = ProducerConfig(topic, ...)
val producer = client.producer[String](producerConfig)
To create a consumer, we need one or more topics to subscribe to, the subscription name, and an instance of ConsumerConfig.
We can set further options on the config object, such as subscription type, consumer name, queue size and so on.
implicit val schema: Schema[String] = Schema.STRING
val topic = Topic("persistent://sample/standalone/ns1/b")
val consumerConfig = ConsumerConfig(Seq(topic), Subscription("mysub"), ...)
val consumerFn = client.consumer[String](ConsumerConfig(, )
Note: Call close() on the client, producer, and consumer once you are finished. The client and producer also implement AutoCloseable and Closeable.
Schemas
A message must be the correct type for the producer or consumer. When a producer or consumer is created,
an implicit Schema typeclass must be available. In the earlier examples, you saw that we added an implicit schema for String using implicit val schema: Schema[String] = Schema.STRING.
There are built in schemas for bytes and strings, but other complex types required a custom schema. Some people prefer to write custom typeclasses manually for the types they need to support. Other people like to just have it done automagically. For those people, pulsar4s provides extensions for the well known Scala Json libraries that can be used to generate messages where the body is a JSON representation of the class.
An example of creating a producer for a complex type using the circe json library to generate the schema:
import io.circe.generic.auto._
import com.sksamuel.pulsar4s.circe._
val topic = Topic("persistent://sample/standalone/ns1/b")
val producer = client.producer[Food](ProducerConfig(topic))
producer.send(Food("pizza", "ham and pineapple"))
Note: The imports bring into scope a method that will generate an implicit schema when required.
The following extension modules can be used for automatic schemas
| Library | Module | Import | |---------|------------------|--------| |Circe|pulsar4s-circe|import io.circe.generic.auto._ <br/>import com.sksamuel.pulsar4s.circe.| |Jackson|pulsar4s-jackson|import com.sksamuel.pulsar4s.jackson.| |Json4s|pulsar4s-json4s|import com.sksamuel.pulsar4s.json4s.| |Spray Json|pulsar4s-spray-json|import com.sksamuel.pulsar4s.sprayjson.| |Play Json|pulsar4s-play-json|import com.sksamuel.pulsar4s.playjson._|
Producing
There are two ways to send a message - either with a plain value, or with an instance of ProducerMessage.
If you do not need to specify extra options on the message - such as key, event time, headers, etc - then you can just send
a plain value, and the client will wrap the value in a pulsar message. Alternatively, you can create an instance of ProducerMessage
to specify extra options.
Each method can be synchronous or asynchronous. The asynchronous methods return a scala.concurrent.Future.
If you are using another effect library, such as cats, scalaz or monix, then pulsar4s
also supports those effects. See the section on #effects.
If the send method is successful, you will receive the MessageId of the generated message. If an exception is generated, then in the synchronous methods, you will receive a Failure with the error. In the asynchronous
methods the exception will be surfaced as a failed Future.
To send a plain value, we just invoke send with the value:
producer.send("wibble")
Or to send a message, we first create an instance of ProducerMessage.
val message = DefaultProducerMessage(Some("mykey"), "wibble", eventTime = Some(EventTime(System.currentTimeMillis)))
producer.send(message)
Consuming
To receive a message, create a consumer and invoke either the receive, receive(Duration), or the receiveAsync methods.
The first two are synchronous and return an instance of ConsumerMessage, blocking if necessary, and the latter is asynchronous, returning
a Future (or other effect) with the ConsumerMessage once ready.
val message: Message = consumer.receive
or
val message: Future[T] = consumer.receiveAsync
Once a message has been consumed, it is important to acknowledge the message by using the message id with the ack methods.
consumer.acknowledge(message.messageId)
Akka Streams
Pulsar4s integrates with the outstanding akka-streams library - it provides both a source and a sink.
To use this, you need to add a dependency on the pulsar4s-akka-streams module.
Sources
To create a source all that is required is a function that will create a consumer on demand and the message id to seek.
The function must return a fresh consumer each time it is invoked.
The consumer is just a regular pulsar4s Consumer and can be created in the normal way, for example.
val topic = Topic("persistent://sample/standalone/ns1/b")
val consumerFn = () => client.consumer(ConsumerConfig(topic, subscription))
We pass that function into the source method, providing the seek. Note the imports.
import com.sksamuel.pulsar4s.akka.streams._
val pulsarSource = source(consumerFn, Some(MessageId.earliest))
The materialized value of the source is an instance of Control which provides a method called 'close' which can be used to stop consuming messages.
Once the akka streams source is completed (or fails) the consumer will be automatically closed.
Sinks
To create a sink, we need a producer function similar to the source's consumer function.
Again, the producer used is just a regular pulsar4s Producer.
The function must return a fresh producer each time it is invoked.
val topic = Topic("persistent://sample/standalone/ns1/b")
val producerFn = () => client.produc
Related Skills
node-connect
335.9kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
82.7kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
335.9kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
commit-push-pr
82.7kCommit, push, and open a PR
