SkillAgentSearch skills...

Zkafka

An efficient and scalable library for stateless Kafka message processing written in Go

Install / Use

/learn @zillow/Zkafka
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

zkafka

License GitHub Actions Codecov Go Report Card

Install

go get -u github.com/zillow/zkafka/v2

About

zkafka is built to simplify message processing in Kafka. This library aims to minimize boilerplate code, allowing the developer to focus on writing the business logic for each Kafka message. zkafka takes care of various responsibilities, including:

  1. Reading from the worker's configured topics
  2. Managing message offsets reliably - Kafka offset management can be complex, but zkafka handles it. Developers only need to write code to process a single message and indicate whether or not it encountered an error.
  3. Distributing messages to virtual partitions (details will be explained later)
  4. Implementing dead lettering for failed messages
  5. Providing inspectable and customizable behavior through lifecycle functions (Callbacks) - Developers can add metrics or logging at specific points in the message processing lifecycle.

zkafka provides stateless message processing semantics ( sometimes, called lambda message processing). This is a churched-up way of saying, "You write code which executes on each message individually (without knowledge of other messages)". It is purpose-built with this type of usage in mind. Additionally, the worker implementation guarantees at least once processing (Details of how that's achieved are shown in the Commit Strategy section)

NOTE: zkafka is built on top of confluent-kafka-go which is a CGO module. Therefore, so is zkafka. When building with zkafka, make sure to set CGO_ENABLED=1.

Features

The following subsections detail some useful features. To make the following sections more accessible, there are runnable examples in ./examples directory. The best way to learn is to experiment with the examples. Dive in!

Stateless Message Processing

zkafka makes stateless message processing easy. All you have to do is write a concrete processor implementation and wire it up (shown below).

type processor interface {
    Process(ctx context.Context, message *zkafka.Message) error
}

If you want to skip ahead and see a working processor check out the examples. Specifically example/worker/main.go.

The anatomy of that example is described here:

A zkafka.Client needs to be created which can connect to the kafka broker. Typically, authentication information must also be specified at this point (today that would include username/password).

    client := zkafka.NewClient(zkafka.Config{ BootstrapServers: []string{"localhost:29092"} })

Next, this client should be passed to create a zkafka.WorkFactory instance. The factory design, used by this library, adds a little boilerplate but allows default policies to be injected and proliferated to all instantiated work instances. We find that useful at zillow for transparently injecting the nuts and bolts of components that are necessary for our solutions to cross-cutting concerns (typically those revolving around telemetry)

    wf := zkafka.NewWorkFactory(client)

Next we create the work instance. This is finally where the dots are beginning to connect. zkafka.Work objects are responsible for continually polling topics (the set of whom is specified in the config object) they've been instructed to listen to, and execute specified code (defined in the user-controlled processor and lifecycle functions (not shown here))

   topicConfig := zkafka.TopicConfig{Topic: "my-topic", GroupdID: "mygroup", ClientID: "myclient"}
   // this implements the interface specified above and will be executed for each read message
   processor := &Processor{}
   work := wf.Create(topicConfig, processor)

All that's left now is to kick off the run loop (this will connect to the Kafka broker, create a Kafka consumer group, undergo consumer group assignments, and after the assignment begins polling for messages). The run loop executes a single reader (Kafka consumer) which reads messages and then fans those messages out to N processors (sized by the virtual partition pool size. Described later). It's a processing pipeline with a reader at the front, and processors at the back.

The run loop takes two arguments, both responsible for signaling that the run loop should exit.

  1. context.Context object. When this object is canceled, the internal work loop will begin to abruptly shut down. This involves exiting the reader loop and processor loops immediately.

  2. signal channel. This channel should be closed, and tells zkafka to begin a graceful shutdown. Graceful shutdown means the reader stops reading new messages, and the processors attempt to finish their in-flight work.

At Zillow, we deploy to a kubernetes cluster, and use a strategy that uses both mechanisms. When k8s indicates shutdown is imminent, we close the shutdown channel. Graceful shutdown is time-boxed, and if the deadline is reached, the outer context object is canceled signaling a more aggressive teardown. The below example passes in a nil shutdown signal (which is valid). That's done for brevity in the readme, production use cases should take advantage (see examples).

   err = w.Run(context.Background(), nil)

Hyper Scalability

zkafka.Work supports a concept called virtual partitions. This extends the Kafka partition concept. Message ordering is guaranteed within a Kafka partition, and the same holds true for a virtual partition. Every zkafka.Work object manages a pool of goroutines called processors (1 by default and controlled by the zkafka.Speedup(n int) option). Each processor reads from a goroutine channel called a virtual partition. When a message is read by the reader, it is assigned to one of the virtual partitions based on hash(message.Key) % virtual partition count. This follows the same mechanism used by Kafka. With this strategy, a message with the same key will be assigned to the same virtual partition.

This allows for another layer of scalability. To increase throughput and maintain the same message ordering guarantees, there is no longer a need to increase the Kafka partition count (which can be operationally challenging). Instead, you can use zkafka.Speedup() to increase the virtual partition count.

// sets up Kafka broker locally
make setup;
// terminal 1. Starts producing messages. To juice up the production rate, remove the time.Sleep() in the producer and turn acks off.
make example-producer
// terminal 2. Starts a worker with speedup=5. 
make example-worker

Configurable Dead Letter Topics

A zkafka.Work instance can be configured to write to a Dead Letter Topic (DLT) when message processing fails. This can be accomplished with the zkafka.WithDeadLetterTopic() option. Or, more conveniently, can be controlled by adding a non nil value to the zkafka.ConsumerTopicConfig DeadLetterTopic field. Minimally, the topic name of the (dead letter topic) must be specified (when specified via configuration, no clientID need be specified, as the encompassing consumer topic configs client id will be used).

     zkafka.ConsumerTopicConfig{
        ...
       // When DeadLetterTopicConfig is specified a dead letter topic will be configured and written to
       // when a processing error occurs.
       DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{
          Topic:    "zkafka-example-deadletter-topic",
       },
    }

The above will be written to zkafka-example-deadletter-topic in the case of a processing error.

The above-returned error will skip writing to the DLT.

To execute a local example of the following pattern:

// sets up kafka broker locally
make setup;
// terminal 1. Starts producing messages (1 per second)
make example-producer
// terminal 2. Starts a worker which fails processing and writes to a DLT. Log statements show when messaages
// are written to a DLT
make example-deadletter-worker

The returned processor error determines whether a message is written to a dead letter topic. In some situations, you might not want to route an error to a DLT. An example might be malformed data.

You have control over this behavior by way of the zkafka.ProcessError.

    return zkafka.ProcessError{
       Err:                 err,
       DisableDLTWrite:     true,
    }

Process Delay Workers

Process Delay Workers can be an important piece of an automated retry policy. A simple example of this would be 2 workers daisy-chained together as follows:

     workerConfig1 := zkafka.ConsumerTopicConfig{
       ClientID: "svc1",
       GroupID: "grp1",
        Topic: "topicA",
       // When DeadLetterTopicConfig is specified a dead letter topic will be configured and written to
       // when a processing error occurs.
       DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{
          Topic:    "topicB",
       },
    }

    workerConfig2 := zkafka.ConsumerTopicConfig{
      ClientID: "svc1",
      GroupID: "grp1",
      Topic: "topicB",
      // When DeadLetterTopicConfig is specified a dead letter topic will be configured and written to
      // when a processing error occurs.
      DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{
         Topic:    "topicC",
      },
    }

Messages processed by the above worker configuration would:

  1. Worker1 read from topicA
  2. If message processing fails, write to topicB
View on GitHub
GitHub Stars70
CategoryDevelopment
Updated17d ago
Forks2

Languages

Go

Security Score

100/100

Audited on Mar 21, 2026

No findings