Brod
Apache Kafka client library for Erlang/Elixir
Install / Use
/learn @kafka4beam/BrodREADME
NOTICE
This product includes software developed by Klarna Bank AB (publ)
Brod - Apache Kafka Client for Erlang/Elixir
Brod is an Erlang implementation of the Apache Kafka protocol, providing support for both producers and consumers.
Why "brod"? http://en.wikipedia.org/wiki/Max_Brod
Features
- Supports Apache Kafka v0.8+
- Robust producer implementation supporting in-flight requests and asynchronous acknowledgements
- Both consumer and producer handle leader re-election and other cluster disturbances internally
- Opens max 1 tcp connection to a broker per
brod_client, one can create more clients if needed - Producer: will start to batch automatically when number of unacknowledged (in flight) requests exceeds configurable maximum
- Producer: will try to re-send buffered messages on common errors like "Not a leader for partition", errors are resolved automatically by refreshing metadata
- Simple consumer: The poller, has a configurable "prefetch count" - it will continue sending fetch requests as long as total number of unprocessed messages (not message-sets) is less than "prefetch count"
- Group subscriber: Support for consumer groups with options to have Kafka as offset storage or a custom one
- Topic subscriber: Subscribe on messages from all or selected topic partitions without using consumer groups
- Pick latest supported version when sending requests to kafka.
- Direct APIs for message send/fetch and cluster inspection/management without having to start clients/producers/consumers.
- A escriptized command-line tool for message send/fetch and cluster inspection/management.
- Configurable compression library. No compression is supported by default for both producers and consumers. For more compression options, see kafka_protocol/README
Building and testing
- Min Erlang/OTP version 24
- CMake 4 which is required to build NIF for crc32cer
make compile
make test-env t # requires docker-compose in place
Working With Kafka 0.9.x or Earlier
Make sure {query_api_versions, false} exists in client config.
This is because ApiVersionRequest was introduced in kafka 0.10,
sending such request to older version brokers will cause connection failure.
e.g. in sys.config:
[{brod,
[ { clients
, [ { brod_client_1 %% registered name
, [ { endpoints, [{"localhost", 9092}]}
, { query_api_versions, false} %% <---------- here
]}]}]}]
Quick Demo
Assuming kafka is running at localhost:9092 and there is a topic named test-topic.
Start Erlang shell by make compile; erl -pa _build/default/lib/*/ebin, then paste lines below into shell:
rr(brod),
{ok, _} = application:ensure_all_started(brod),
KafkaBootstrapEndpoints = [{"localhost", 9092}],
Topic = <<"test-topic">>,
Partition = 0,
ok = brod:start_client(KafkaBootstrapEndpoints, client1),
ok = brod:start_producer(client1, Topic, _ProducerConfig = []),
{ok, FirstOffset} = brod:produce_sync_offset(client1, Topic, Partition, <<"key1">>, <<"value1">>),
ok = brod:produce_sync(client1, Topic, Partition, <<"key2">>, <<"value2">>),
SubscriberCallbackFun = fun(Partition, Msg, ShellPid = CallbackState) -> ShellPid ! Msg, {ok, ack, CallbackState} end,
Receive = fun() -> receive Msg -> Msg after 1000 -> timeout end end,
brod_topic_subscriber:start_link(client1, Topic, Partitions=[Partition],
_ConsumerConfig=[{begin_offset, FirstOffset}],
_CommittedOffsets=[], message, SubscriberCallbackFun,
_CallbackState=self()),
AckCb = fun(Partition, BaseOffset) -> io:format(user, "\nProduced to partition ~p at base-offset ~p\n", [Partition, BaseOffset]) end,
ok = brod:produce_cb(client1, Topic, Partition, <<>>, [{<<"key3">>, <<"value3">>}], AckCb).
Receive().
Receive().
{ok, {_, [Msg]}} = brod:fetch(KafkaBootstrapEndpoints, Topic, Partition, FirstOffset + 2), Msg.
Example outputs:
#kafka_message{offset = 0,key = <<"key1">>,
value = <<"value1">>,ts_type = create,ts = 1531995555085,
headers = []}
#kafka_message{offset = 1,key = <<"key2">>,
value = <<"value2">>,ts_type = create,ts = 1531995555107,
headers = []}
Produced to partition 0 at base-offset 406
#kafka_message{offset = 2,key = <<"key3">>,
value = <<"value3">>,ts_type = create,ts = 1531995555129,
headers = []}
Overview
Brod supervision (and process link) tree.

Clients
A brod_client in brod is a gen_server responsible for establishing and
maintaining tcp sockets connecting to kafka brokers.
It also manages per-topic-partition producer and consumer processes under
two-level supervision trees.
To use producers or consumers, you have to start at least one client that will manage them.
Compression
Brod does not dependent on any compression/decompression implementation by default. To enable them, you must add the compression application as dependency in your project's rebar.config.
For example:
{deps, [
{snappyer, "1.2.9"}
]}.
Start clients by default
You may include client configs in sys.config have them started by default
(by application controller)
Example of configuration (for sys.config):
[{brod,
[ { clients
, [ { brod_client_1 %% registered name
, [ { endpoints, [{"localhost", 9092}]}
, { reconnect_cool_down_seconds, 10} %% socket error recovery
]
}
]
}
%% start another client for another kafka cluster
%% or if you think it's necessary to start another set of tcp connections
]
}]
Example of configuration in Elixir (for config/dev.exs or config/prod.exs, etc.):
config :brod,
clients: [
# :brod_client_1 is the registered name of the client
brod_client_1: [
endpoints: [{"localhost", 9092}],
reconnect_cool_down_seconds: 10
]
]
Start brod client on demand
You may also call brod:start_client/1,2,3 to start a client on demand,
which will be added to brod supervision tree.
ClientConfig = [{reconnect_cool_down_seconds, 10}],
ok = brod:start_client([{"localhost", 9092}], brod_client_1, ClientConfig).
Extra socket options
could be passed as {extra_sock_opts, ExtraSockOpts}, e.g.
ExtraSockOpts = [{sndbuf, 1024*1024}],
ok = brod:start_client([{"localhost", 9092}], brod_client_1, [{extra_sock_opts, ExtraSockOpts}]).
Producers
A brod_producer is a gen_server that is responsible for producing messages to a given
partition of a given topic.
Producers may be started either manually or automatically in the moment you call brod:produce
but did not call brod:start_producer beforehand.
Auto start producer with default producer config
Put below configs to client config in sys.config or app env if you start client statically:
{auto_start_producers, true}
{default_producer_config, []}
Or pass the {auto_start_producers, true} option to brod:start_client if you start the client
dynamically.
Start a Producer on Demand
brod:start_producer(_Client = brod_client_1,
_Topic = <<"brod-test-topic-1">>,
_ProducerConfig = []).
Supported Message Input Format
Brod supports below produce APIs:
brod:produce: Async produce with ack message sent back to caller.brod:produce_cb: Async produce with a callback evaluated when ack is received.brod:produce_sync: Sync produce that returnsok.brod:produce_sync_offset: Sync produce that returns{ok, BaseOffset}.brod:produce_no_ack: Async produce without backpressure (use with care!).
The Value arg in these APIs can be:
binary(): One single message{brod:msg_ts(), binary()}: One single message with its create-time timestamp#{ts => brod:msg_ts(), value => binary(), headers => [{_, _}]}: One single message. If this map does not have akeyfield, theKeyargument is used.[{K, V} | {T, K, V}]: A batch, whereVcould be a nested list of such representation.[#{key => K, value => V, ts => T, headers => [{_, _}]}]: A batch.
When Value is a batch, the Key argument is only used as partitioner input and all messages are written on the same partition.
All messages are unified into a batch format of below spec:
[#{key => K, value => V, ts => T, headers => [{_, _}]}].
ts field is dropped for kafka prior to version 0.10 (produce API version 0, magic version 0).
headers field is dropped for kafka prior to version 0.11 (produce API version 0-2, magic version 0-1).
Synchronized Produce API
brod:produce_sync(_Client = brod_client_1,
_Topic = <<"brod-test-topic-1">>,
_Partition = 0,
_Key = <<"some-key">>,
_Value = <<"some-value">>).
Or block calling process until Kafka confirmed the message:
{ok, CallRef} =
brod:produce(_Client = brod_client_1,
_Topic = <<"brod-test-topic-1">>,
_Partition = 0,
_Key = <<"some-key">>,
_Valu
