SkillAgentSearch skills...

Kaffe

An opinionated Elixir wrapper around brod, the Erlang Kafka client, that supports encrypted connections to Heroku Kafka out of the box.

Install / Use

/learn @spreedly/Kaffe
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Kaffe

Module Version Hex Docs Total Download License Last Updated

An opinionated, highly specific, Elixir wrapper around Brod: the Erlang Kafka client. :coffee:

NOTE: Although we're using this in production at Spreedly it is still under active development. The API may change and there may be serious bugs we've yet to encounter.

<!-- START doctoc generated TOC please keep comment here to allow auto update --> <!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->

Table of Contents generated with DocToc

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

Installation

  1. Add kaffe to your list of dependencies in mix.exs:

    def deps do
      [{:kaffe, "~> 1.0"}]
    end
    
  2. Configure a Kaffe Consumer and/or Producer

Kaffe Consumer Usage

Consumers receive a list of messages and work as part of the :brod_group_member behavior. This has a few important benefits:

  1. Group members assign a "subscriber" to each partition in the topic. Because Kafka topics scale with partitions, having a worker per partition usually increases throughput.
  2. Group members correctly handle partition assignments across multiple clients in a consumer group. This means that this mode of operation will scale horizontally (e.g., multiple dynos on Heroku).
  3. Downstream processing that benefits from batching (like writing to another Kafka topic) is more easily supported.

There is also legacy support for single message consumers, which process one message at a time using the :brod_group_subscriber behavior. This was the original mode of operation for Kaffe but is slow and does not scale. For this reason it is considered deprecated.

Kaffe GroupMember - Batch Message Consumer

  1. Define a handle_messages/1 function in the provided module implementing the Kaffe.MessageHandler behaviour.

    handle_messages/1 will be called with a list of messages, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.

    defmodule MessageProcessor do
      @behaviour Kaffe.MessageHandler
    
      @impl Kaffe.MessageHandler
      def handle_messages(messages) do
        for %{key: key, value: value} = message <- messages do
          IO.inspect message
          IO.puts "#{key}: #{value}"
        end
        :ok # Important!
      end
    end
    
  2. The configuration options for the GroupMember consumer are a superset of those for Kaffe.Consumer. Additional options can be found in Kaffe.Config.Consumer.

    config :kaffe,
      consumers: [
        subscriber_1: [
          endpoints: [kafka: 9092],
          topics: ["interesting-topic"],
          consumer_group: "your-app-consumer-group",
          message_handler: MessageProcessor,
          offset_reset_policy: :reset_to_latest,
          max_bytes: 100_000,
          min_bytes: 10_000,
          max_wait_time: 1_000,
          worker_allocation_strategy: :worker_per_topic_partition
          
          # optional
          sasl: %{
            mechanism: :plain,
            login: System.get_env("KAFFE_PRODUCER_USER"),
            password: System.get_env("KAFFE_PRODUCER_PASSWORD")
          }
        ],
        subscriber_2: [
          endpoints: [kafka: 9092],
          topics: ["topic-2"],
          consumer_group: "your-app-consumer-group",
          message_handler: AnotherMessageHandler,
          offset_reset_policy: :reset_to_latest,
          max_bytes: 50_000,
          worker_allocation_strategy: :worker_per_topic_partition
        ]
      ]
    
  3. Add Kaffe.GroupMemberSupervisor as a supervisor in your supervision tree.

    defmodule MyApp.Application do
      use Application
    
      def start(_type, _args) do
        children = [
          %{
            id: Kaffe.GroupMemberSupervisor.Subscriber1,
            start: {Kaffe.GroupMemberSupervisor, :start_link, [:subscriber_1]},
            type: :supervisor
          },
          %{
            id: Kaffe.GroupMemberSupervisor.Subscriber2,
            start: {Kaffe.GroupMemberSupervisor, :start_link, [:subscriber_2]},
            type: :supervisor
          }
        ]
    
        opts = [strategy: :one_for_one, name: MyApp.Application.Supervisor]
        Supervisor.start_link(children, opts)
      end
    end
    

Managing how offsets are committed

In some cases you may not want to commit back the most recent offset after processing a list of messages. For example, if you're batching messages to be sent elsewhere and want to ensure that a batch can be rebuilt should there be an error further downstream. In that example you might want to keep the offset of the first message in your batch so your consumer can restart back at that point to reprocess and rebatch the messages.

Your message handler can respond in the following ways to manage how offsets are committed back:

:ok - commit back the most recent offset and request more messages {:ok, :no_commit} - do not commit back the most recent offset and request more messages from the offset of the last message {:ok, offset} - commit back at the offset specified and request messages from that point forward

Example:

defmodule MessageProcessor do
  @behaviour Kaffe.MessageHandler

  @impl Kaffe.MessageHandler
  def handle_messages(messages) do
    for %{key: key, value: value} = message <- messages do
      IO.inspect message
      IO.puts "#{key}: #{value}"
    end
    {:ok, :no_commit}
  end
end

Kaffe Consumer - Single Message Consumer (Deprecated)

For backward compatibility only! Kaffe.GroupMemberSupervisor is recommended instead!

  1. Add a handle_message/1 function to a local module (e.g. MessageProcessor). This function will be called with each Kafka message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.

    The module's handle_message/1 function must return :ok or Kaffe will throw an error. In normal (synchronous consumer) operation the Kaffe consumer will block until your handle_message/1 function returns :ok.

    Example

    defmodule MessageProcessor do
      def handle_message(%{key: key, value: value} = message) do
        IO.inspect message
        IO.puts "#{key}: #{value}"
        :ok # The handle_message function MUST return :ok
      end
    end
    

    Message Structure

    %{
      attributes: 0,
      crc: 1914336469,
      key: "kafka message key",
      magic_byte: 0,
      offset: 41,
      partition: 17,
      topic: "some-kafka-topic",
      value: "the actual kafka message value is here",
      ts: 1234567890123, # timestamp in milliseconds
      ts_type: :append  # timestamp type: :undefined | :create | :append
    }
    
  2. Configure your Kaffe Consumer in your mix config

    config :kaffe,
      consumers: [
        subscriber_1: [
          endpoints: [kafka: 9092], # that's [hostname: kafka_port]
          topics: ["interesting-topic"], # the topic(s) that will be consumed
          consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka
          message_handler: MessageProcessor, # the module from Step 1 that will process messages
    
          # optional
          async_message_ack: false, # see "async message acknowledgement" below
          start_with_earliest_message: true # default false
        ]
      ]
    

    The start_with_earliest_message field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted, your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages.

    Heroku Configuration

    To configure a Kaffe Consumer for a Heroku Kafka compatible environment including SSL omit the endpoint and instead set heroku_kafka_env: true

    config :kaffe,
      consumers: [
        subscriber_1: [
          heroku_kafka_env: true,
          topics: ["interesting-topic"],
          consumer_group: "your-app-consumer-group",
          message_handler: MessageProcessor
        ]
      ]
    

    With that setting in place Kaffe will automatically pull required info from the following ENV variables:

    • KAFKA_URL
    • KAFKA_CLIENT_CERT
    • KAFKA_CLIENT_CERT_KEY
    • KAFKA_TRUSTED_CERT (not used yet)
  3. Add Kaffe.Consumer as a worker in yo

Related Skills

View on GitHub
GitHub Stars164
CategoryCustomer
Updated2mo ago
Forks63

Languages

Elixir

Security Score

100/100

Audited on Jan 21, 2026

No findings