SkillAgentSearch skills...

Jackdaw

Simple configuration and mocking of Kafka clients

Install / Use

/learn @obsidiandynamics/Jackdaw
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

<img src="https://raw.githubusercontent.com/wiki/obsidiandynamics/jackdaw/images/jackdaw-logo.png" width="90px" alt="logo"/> Jackdaw

Simple configuration and mocking of Kafka clients.

Maven release Total alerts Gradle build codecov

Why Jackdaw?

While Kafka is an awesome message streaming platform, it's also a little on the heavy side — requiring a cluster of brokers at all times. This makes rapid development and component/integration testing harder than it should be; firstly, you must have access to a broker; secondly, connection establishment and topic consumer rebalancing times can be substantial, blowing out the build/test times. If only you could simulate the entire Kafka infrastructure in a JVM so that messages can be published and consumed without relying on Kafka... After all, you just want to know that your application components integrate correctly; you aren't trying to test Kafka.

Enter Jackdaw.

Jackdaw is focused on fixing the one area in which Kafka isn't so great — mocking. While it doesn't simulate Kafka in its entirety, it lets you do most things within the mock. It also provides a factory interface — Kafka — that lets you easily switch between a mock and a real Kafka client connection, playing well with dependency injection.

Jackdaw also makes the Java clients a little nicer to use, without obfuscating the native API. The traditional Consumer API requires continuous calls to poll() from a loop. Jackdaw introduces AsyncReceiver — a background polling thread that will invoke your nominated callback handler when messages are received. Speaking of threading, Jackdaw is also capable of pipelining message (de)serialisation, improving Kafka's performance on multi-core processors.

Bootstrapping is also simplified. Jackdaw offers YConf-ready wrappers for configuring most aspects of the client.

Dependencies

Add the following snippet to your build file, replacing x.y.z with the version shown on the Download badge at the top of this README.

api "com.obsidiandynamics.jackdaw:jackdaw-core:x.y.z"
api "org.apache.kafka:kafka-clients:2.0.0"
testImplementation "com.obsidiandynamics.jackdaw:jackdaw-assurance:x.y.z"

Note: Although Jackdaw is compiled against Kafka client 2.0.0, no specific Kafka client library dependency has been bundled with Jackdaw. This allows you to use any (2.x.x) API-compatible Kafka client library in your application without being constrained by transitive dependencies.

Jackdaw is packaged as two separate modules:

  1. jackdaw-core — Configuration (YConf) support and other core Jackdaw elements. This module would typically be linked to your production build artifacts.
  2. jackdaw-assurance — Mocking components and test utilities. Normally, this module would only be used during testing and should be declared in the testImplementation configuration.

Scenarios

Configuring a real Kafka client connection

The following snippet publishes a message and consumes it using a real Kafka connection. You'll need an actual Kafka broker to run this code.

final Zlg zlg = Zlg.forDeclaringClass().get();
final Kafka<String, String> kafka = new KafkaCluster<>(new KafkaClusterConfig()
                                                      .withBootstrapServers("localhost:9092"));

final Properties producerProps = new PropsBuilder()
    .with("key.serializer", StringSerializer.class.getName())
    .with("value.serializer", StringSerializer.class.getName())
    .build();
final Producer<String, String> producer = kafka.getProducer(producerProps);

final Properties consumerProps = new PropsBuilder()
    .with("key.deserializer", StringDeserializer.class.getName())
    .with("value.deserializer", StringDeserializer.class.getName())
    .with("group.id", "group")
    .with("auto.offset.reset", "earliest")
    .with("enable.auto.commit", true)
    .build();
final Consumer<String, String> consumer = kafka.getConsumer(consumerProps);
consumer.subscribe(Collections.singleton("topic"));

zlg.i("Publishing record");
producer.send(new ProducerRecord<>("topic", "key", "value"));

for (;;) {
  final ConsumerRecords<String, String> records = consumer.poll(100);
  zlg.i("Got %d records", z -> z.arg(records::count));
}

Note: We use Zerolog within Jackdaw and also in our examples for low-overhead logging.

The code above closely resembles how you would normally acquire a Producer/Consumer pair. The only material difference is that we use the Kafka factory interface, which exposes getProducer(Properties) and getConsumer(Properties) methods. Because we're using real Kafka brokers, we have to supply a KafkaClusterConfig with bootstrapServers set.

Configuring a mock Kafka connection

So far there wasn't much to write home about. That's about to change. Jackdaw's real power is unleashed when we swap KafkaCluster with MockKafka.

final Zlg zlg = Zlg.forDeclaringClass().get();
final Kafka<String, String> kafka = new MockKafka<>();

final Properties producerProps = new PropsBuilder()
    .with("key.serializer", StringSerializer.class.getName())
    .with("value.serializer", StringSerializer.class.getName())
    .build();
final Producer<String, String> producer = kafka.getProducer(producerProps);

final Properties consumerProps = new PropsBuilder()
    .with("group.id", "group")
    .build();
final Consumer<String, String> consumer = kafka.getConsumer(consumerProps);
consumer.subscribe(Collections.singleton("topic"));

zlg.i("Publishing record");
producer.send(new ProducerRecord<>("topic", "key", "value"));

for (;;) {
  final ConsumerRecords<String, String> records = consumer.poll(100);
  zlg.i("Got %d records", z -> z.arg(records::count));
}

The code is essentially the same. (We did strip out a bunch of unused properties.) It also behaves identically to the previous example. Only now there is no Kafka. (Oh and it's fast.)

Asynchronous consumption

Jackdaw provides AsyncReceiver — a convenient background worker that continuously polls Kafka for messages. Example below.

final Zlg zlg = Zlg.forDeclaringClass().get();
final Kafka<String, String> kafka = new MockKafka<>();

final Properties producerProps = new PropsBuilder()
    .with("key.serializer", StringSerializer.class.getName())
    .with("value.serializer", StringSerializer.class.getName())
    .build();
final Producer<String, String> producer = kafka.getProducer(producerProps);

final Properties consumerProps = new PropsBuilder()
    .with("group.id", "group")
    .build();

final Consumer<String, String> consumer = kafka.getConsumer(consumerProps);
consumer.subscribe(Collections.singleton("topic"));

// a callback for asynchronously handling records
final RecordHandler<String, String> recordHandler = records -> {
  zlg.i("Got %d records", z -> z.arg(records::count));
};

// a callback for handling exceptions
final ExceptionHandler exceptionHandler = ExceptionHandler.forPrintStream(System.err);

// wrap the consumer in an AsyncReceiver
final AsyncReceiver<?, ?> receiver = 
    new AsyncReceiver<>(consumer, 100, "AsyncReceiverThread", recordHandler, exceptionHandler);

zlg.i("Publishing record");
producer.send(new ProducerRecord<>("topic", "key", "value"));

// give it some time...
Threads.sleep(5_000);

// clean up
producer.close();
receiver.terminate();

The example above used a MockKafka factory; it could have just as easily used the real KafkaCluster. The AsyncReceiver class is completely independent of the underlying Kafka implementation.

Note: You could, of course, write your own polling loop and stick into into a daemon thread; however, AsyncReceiver will already do everything you need, as well as manage the lifecycle of the underlying Consumer instance. When AsyncReceiver terminates, it will automatically clean up after itself and close the Consumer. It also exposes the terminate() lifecycle method, allowing you to interrupt the receiver and terminate the poll loop.

Configuration

Another common use case is configuration. In the following example we use YConf to bootstrap a Kafka client from a YAML file.

final Kafka<?, ?> kafka = new MappingContext()
    .withParser(new SnakeyamlParser())
    .fromStream(new FileInputStream("src/test/resources/kafka-cluster.conf"))
    .map(Kafka.class);

// override the properties from the config
final Properties overrides = new PropsBuilder()
    .with("max.in.flight.requests.per.connection", 1)
    .build();

final Producer<?, ?> producer = kafka.getProducer(overrides);
// do stuff with the producer
// ...
producer.close();

The configuration file is shown below.

type: com.obsidiandynamics.jackdaw.KafkaCluster
clusterConfig:
  common:
    bootstrap.servers: localhost:9092
  producer:
    acks: 1
    retries: 0
    batch.size: 16384
    linger.ms: 0
    buffer.memory: 33554432
    key.serializer: org.apache.kafka.common.serialization.StringSerializer
    value.serializer: org.apache.kafka.common.serialization.StringSerializer
  consumer:
    enable.auto.commit: false
    auto.commit.interval.ms: 0

If instead of a real Kafka

View on GitHub
GitHub Stars65
CategoryDevelopment
Updated6mo ago
Forks8

Languages

Java

Security Score

92/100

Audited on Oct 10, 2025

No findings