SkillAgentSearch skills...

Brod

Apache Kafka client library for Erlang/Elixir

Install / Use

/learn @kafka4beam/Brod
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

NOTICE

This product includes software developed by Klarna Bank AB (publ)

Brod - Apache Kafka Client for Erlang/Elixir

brod

Brod is an Erlang implementation of the Apache Kafka protocol, providing support for both producers and consumers.

Why "brod"? http://en.wikipedia.org/wiki/Max_Brod

Features

  • Supports Apache Kafka v0.8+
  • Robust producer implementation supporting in-flight requests and asynchronous acknowledgements
  • Both consumer and producer handle leader re-election and other cluster disturbances internally
  • Opens max 1 tcp connection to a broker per brod_client, one can create more clients if needed
  • Producer: will start to batch automatically when number of unacknowledged (in flight) requests exceeds configurable maximum
  • Producer: will try to re-send buffered messages on common errors like "Not a leader for partition", errors are resolved automatically by refreshing metadata
  • Simple consumer: The poller, has a configurable "prefetch count" - it will continue sending fetch requests as long as total number of unprocessed messages (not message-sets) is less than "prefetch count"
  • Group subscriber: Support for consumer groups with options to have Kafka as offset storage or a custom one
  • Topic subscriber: Subscribe on messages from all or selected topic partitions without using consumer groups
  • Pick latest supported version when sending requests to kafka.
  • Direct APIs for message send/fetch and cluster inspection/management without having to start clients/producers/consumers.
  • A escriptized command-line tool for message send/fetch and cluster inspection/management.
  • Configurable compression library. No compression is supported by default for both producers and consumers. For more compression options, see kafka_protocol/README

Building and testing

  • Min Erlang/OTP version 24
  • CMake 4 which is required to build NIF for crc32cer
make compile
make test-env t # requires docker-compose in place

Working With Kafka 0.9.x or Earlier

Make sure {query_api_versions, false} exists in client config. This is because ApiVersionRequest was introduced in kafka 0.10, sending such request to older version brokers will cause connection failure.

e.g. in sys.config:

[{brod,
   [ { clients
     , [ { brod_client_1 %% registered name
         , [ { endpoints, [{"localhost", 9092}]}
           , { query_api_versions, false} %% <---------- here
           ]}]}]}]

Quick Demo

Assuming kafka is running at localhost:9092 and there is a topic named test-topic.

Start Erlang shell by make compile; erl -pa _build/default/lib/*/ebin, then paste lines below into shell:

rr(brod),
{ok, _} = application:ensure_all_started(brod),
KafkaBootstrapEndpoints = [{"localhost", 9092}],
Topic = <<"test-topic">>,
Partition = 0,
ok = brod:start_client(KafkaBootstrapEndpoints, client1),
ok = brod:start_producer(client1, Topic, _ProducerConfig = []),
{ok, FirstOffset} = brod:produce_sync_offset(client1, Topic, Partition, <<"key1">>, <<"value1">>),
ok = brod:produce_sync(client1, Topic, Partition, <<"key2">>, <<"value2">>),
SubscriberCallbackFun = fun(Partition, Msg, ShellPid = CallbackState) -> ShellPid ! Msg, {ok, ack, CallbackState} end,
Receive = fun() -> receive Msg -> Msg after 1000 -> timeout end end,
brod_topic_subscriber:start_link(client1, Topic, Partitions=[Partition],
                                 _ConsumerConfig=[{begin_offset, FirstOffset}],
                                 _CommittedOffsets=[], message, SubscriberCallbackFun,
                                 _CallbackState=self()),
AckCb = fun(Partition, BaseOffset) -> io:format(user, "\nProduced to partition ~p at base-offset ~p\n", [Partition, BaseOffset]) end,
ok = brod:produce_cb(client1, Topic, Partition, <<>>, [{<<"key3">>, <<"value3">>}], AckCb).
Receive().
Receive().
{ok, {_, [Msg]}} = brod:fetch(KafkaBootstrapEndpoints, Topic, Partition, FirstOffset + 2), Msg.

Example outputs:

#kafka_message{offset = 0,key = <<"key1">>,
               value = <<"value1">>,ts_type = create,ts = 1531995555085,
               headers = []}
#kafka_message{offset = 1,key = <<"key2">>,
               value = <<"value2">>,ts_type = create,ts = 1531995555107,
               headers = []}
Produced to partition 0 at base-offset 406
#kafka_message{offset = 2,key = <<"key3">>,
               value = <<"value3">>,ts_type = create,ts = 1531995555129,
               headers = []}

Overview

Brod supervision (and process link) tree.

brod supervision architecture

Clients

A brod_client in brod is a gen_server responsible for establishing and maintaining tcp sockets connecting to kafka brokers. It also manages per-topic-partition producer and consumer processes under two-level supervision trees.

To use producers or consumers, you have to start at least one client that will manage them.

Compression

Brod does not dependent on any compression/decompression implementation by default. To enable them, you must add the compression application as dependency in your project's rebar.config.

For example:

{deps, [
    {snappyer, "1.2.9"}
]}.

Start clients by default

You may include client configs in sys.config have them started by default (by application controller)

Example of configuration (for sys.config):

[{brod,
   [ { clients
     , [ { brod_client_1 %% registered name
         , [ { endpoints, [{"localhost", 9092}]}
           , { reconnect_cool_down_seconds, 10} %% socket error recovery
           ]
         }
       ]
     }
     %% start another client for another kafka cluster
     %% or if you think it's necessary to start another set of tcp connections
   ]
}]

Example of configuration in Elixir (for config/dev.exs or config/prod.exs, etc.):

config :brod,
  clients: [
    # :brod_client_1 is the registered name of the client
    brod_client_1: [
      endpoints: [{"localhost", 9092}],
      reconnect_cool_down_seconds: 10
    ]
  ]

Start brod client on demand

You may also call brod:start_client/1,2,3 to start a client on demand, which will be added to brod supervision tree.

ClientConfig = [{reconnect_cool_down_seconds, 10}],
ok = brod:start_client([{"localhost", 9092}], brod_client_1, ClientConfig).

Extra socket options could be passed as {extra_sock_opts, ExtraSockOpts}, e.g.

ExtraSockOpts = [{sndbuf, 1024*1024}],
ok = brod:start_client([{"localhost", 9092}], brod_client_1, [{extra_sock_opts, ExtraSockOpts}]).

Producers

A brod_producer is a gen_server that is responsible for producing messages to a given partition of a given topic.

Producers may be started either manually or automatically in the moment you call brod:produce but did not call brod:start_producer beforehand.

Auto start producer with default producer config

Put below configs to client config in sys.config or app env if you start client statically:

{auto_start_producers, true}
{default_producer_config, []}

Or pass the {auto_start_producers, true} option to brod:start_client if you start the client dynamically.

Start a Producer on Demand

brod:start_producer(_Client         = brod_client_1,
                    _Topic          = <<"brod-test-topic-1">>,
                    _ProducerConfig = []).

Supported Message Input Format

Brod supports below produce APIs:

The Value arg in these APIs can be:

  • binary(): One single message
  • {brod:msg_ts(), binary()}: One single message with its create-time timestamp
  • #{ts => brod:msg_ts(), value => binary(), headers => [{_, _}]}: One single message. If this map does not have a key field, the Key argument is used.
  • [{K, V} | {T, K, V}]: A batch, where V could be a nested list of such representation.
  • [#{key => K, value => V, ts => T, headers => [{_, _}]}]: A batch.

When Value is a batch, the Key argument is only used as partitioner input and all messages are written on the same partition. All messages are unified into a batch format of below spec: [#{key => K, value => V, ts => T, headers => [{_, _}]}]. ts field is dropped for kafka prior to version 0.10 (produce API version 0, magic version 0). headers field is dropped for kafka prior to version 0.11 (produce API version 0-2, magic version 0-1).

Synchronized Produce API

brod:produce_sync(_Client    = brod_client_1,
                  _Topic     = <<"brod-test-topic-1">>,
                  _Partition = 0,
                  _Key       = <<"some-key">>,
                  _Value     = <<"some-value">>).

Or block calling process until Kafka confirmed the message:

{ok, CallRef} =
  brod:produce(_Client    = brod_client_1,
               _Topic     = <<"brod-test-topic-1">>,
               _Partition = 0,
               _Key       = <<"some-key">>,
               _Valu
View on GitHub
GitHub Stars685
CategoryDevelopment
Updated24d ago
Forks215

Languages

Erlang

Security Score

100/100

Audited on Mar 10, 2026

No findings