RDKafka.jl
Wrapper for librdkafka
Install / Use
/learn @dfdx/RDKafka.jlREADME
RDKafka
Julia wrapper for librdkafka.
Build
using Pkg
Pkg.add("RDKafka")
Prebuilt binaries of the librdkafka native library is downloaded. The binaries are available for all supported Julia platforms.
Usage
Start Kafka server
If you don't have one already started, download Kafka server and run it according to the official QuickStart Guide. Here's a short version of that guide:
cd to the kafka folder and in run the following commands in 2 different terminals:
# start ZooKeeper server
bin/zookeeper-server-start.sh config/zookeeper.properties
# start Kafka broker
bin/kafka-server-start.sh config/server.properties
In yet another terminal create a topic:
# create topic
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
# describe it
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Produce and consume some messages
Now in Julia console start polling using KafkaConsumer:
using RDKafka
c = KafkaConsumer("localhost:9092", "my-consumer-group")
parlist = [("quickstart-events", 0)]
subscribe(c, parlist)
timeout_ms = 1000
while true
msg = poll(String, String, c, timeout_ms)
@show(msg)
end
And produce a few messages using KafkaProducer
using RDKafka
import RDKafka.produce
p = KafkaProducer("localhost:9092")
partition = 0
produce(p, "quickstart-events", partition, "message key", "message payload")
In the consumer window you should see something like:
msg = nothing
msg = Message(message key: message payload)
msg = nothing
msg = nothing
where nothing means that there were no new messages in that polling interval while Message(...) is actual message we sent from producer.
Configuration
librdkafka is highly customizable, see CONFIGURATION.md for the list of supported properties. To set a particular property, pass a conf object to KafkaProducer or KafkaConsumer, e.g.:
conf = Dict("socket.timeout.ms" => 300000)
p = KafkaProducer("localhost:9092", conf)
Error handling
Add the err_cb argument to either KafkaConsumer or KafkaProducer.
c = KafkaConsumer("localhost:9092", "my-consumer-group", err_cb=(err::Int, reason::String) -> throw(error(reason)))
Delivery reports
Add the dr_cb argument to a KafkaProducer.
p = KafkaProducer("localhost:9092", dr_cb=msg -> if msg.err != 0 throw(error("Delivery failed") end))
Seeking
c = KafkaConsumer("localhost:9092", "my-consumer-group")
RDKafka.seek(c, timestamp_ms, timeout_ms)
Related Skills
node-connect
352.5kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
111.3kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
352.5kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
qqbot-media
352.5kQQBot 富媒体收发能力。使用 <qqmedia> 标签,系统根据文件扩展名自动识别类型(图片/语音/视频/文件)。
