Goneli
Implementation of the NELI leader election protocol for Go and Kafka
Install / Use
/learn @obsidiandynamics/GoneliREADME
<img src="https://raw.githubusercontent.com/wiki/obsidiandynamics/goneli/images/goneli-logo.png" width="90px" alt="logo"/> goNELI
Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in exclusive mode over a group of contending processes.
Concept
Leader election is a straightforward concept, long-standing in the academic papers on distributed computing. For a set of competing processes, select one process that is a notional leader, ensuring that at-most one process may bear the leader status at any point in time, and that this status is unanimously agreed upon among the remaining processes. Conceptually, leader election is depicted below.
<img src="https://raw.githubusercontent.com/wiki/obsidiandynamics/goneli/images/figure-concept.png" width="100%" alt="Leader Election Concept"/> <br/>As straightforward as the concept might appear, the implementation is considerably more nuanced, owing to the myriad of edge cases that must be accounted for — such as spurious process failures and network partitions. Leader election also requires additional infrastructure to provide for centralised coordination, increasing the complexity of the overall architecture.
Meanwhile, many event-driven microservices have come to rely upon Apache Kafka for their messaging needs, and Kafka has an internal leader election mechanism for assigning the cluster controller as well as the group and transaction coordinators. Wouldn't it be nice if we could somehow 'borrow' this coveted feature for our own leader election needs?
This is where NELI comes in. Rather than dragging in additional infrastructure components and dependencies, NELI makes do with what's already there.
Getting started
Add the dependency
go get -u github.com/obsidiandynamics/goneli
Go import
import "github.com/obsidiandynamics/goneli"
Basic leader election
This is the easiest way of getting started with leader election. A task will be continuously invoked in the background while the Neli instance is the leader of its group.
// Create a new Neli curator.
neli, err := New(Config{
KafkaConfig: KafkaConfigMap{
"bootstrap.servers": "localhost:9092",
},
LeaderGroupID: "my-app-name.group",
LeaderTopic: "my-app-name.topic",
})
if err != nil {
panic(err)
}
// Starts a pulser Goroutine in the background, which will automatically terminate when Neli is closed.
p, _ := neli.Background(func() {
// An activity performed by the client application if it is the elected leader. This task should
// perform a small amount of work that is exclusively attributable to a leader, and return immediately.
// For as long as the associated Neli instance is the leader, this task will be invoked repeatedly;
// therefore, it should break down any long-running work into bite-sized chunks that can be safely
// performed without causing excessive blocking.
log.Printf("Do important leader stuff")
time.Sleep(100 * time.Millisecond)
})
// Blocks until Neli is closed or an unrecoverable error occurs.
panic(p.Await())
Full control
Sometimes more control is needed. For example —
- Configuring a custom logger, based on your application's needs.
- Setting up a barrier to synchronize leadership transition, so that the new leader does not step in until the outgoing leader has completed all of its backlogged work.
- Pulsing of the
Neliinstance from your own Goroutine.
// Additional imports for the logger and Scribe bindings.
import (
scribelogrus "github.com/obsidiandynamics/libstdgo/scribe/logrus"
logrus "github.com/sirupsen/logrus"
)
// Bootstrap a custom logger.
log := logrus.StandardLogger()
log.SetLevel(logrus.TraceLevel)
// Configure Neli.
config := Config{
KafkaConfig: KafkaConfigMap{
"bootstrap.servers": "localhost:9092",
},
Scribe: scribe.New(scribelogrus.Bind()),
LeaderGroupID: "my-app-name.group",
LeaderTopic: "my-app-name.topic",
}
// Handler of leader status updates. Used to initialise state upon leader acquisition, and to
// wrap up in-flight work upon loss of leader status.
barrier := func(e Event) {
switch e.(type) {
case LeaderAcquired:
// The application may initialise any state necessary to perform work as a leader.
log.Infof("Received event: leader elected")
case LeaderRevoked:
// The application may block the Barrier callback until it wraps up any in-flight
// activity. Only upon returning from the callback, will a new leader be elected.
log.Infof("Received event: leader revoked")
case LeaderFenced:
// The application must immediately terminate any ongoing activity, on the assumption
// that another leader may be imminently elected. Unlike the handling of LeaderRevoked,
// blocking in the Barrier callback will not prevent a new leader from being elected.
log.Infof("Received event: leader fenced")
}
}
// Create a new Neli curator, supplying the barrier as an optional argument.
neli, err := New(config, barrier)
if err != nil {
panic(err)
}
// Pulsing is done in a separate Goroutine. (We don't have to, but it's often practical to do so.)
go func() {
defer neli.Close()
for {
// Pulse our presence, allowing for some time to acquire leader status.
// Will return instantly if already leader.
isLeader, err := neli.Pulse(10 * time.Millisecond)
if err != nil {
// Only fatal errors are returned from Pulse().
panic(err)
}
if isLeader {
// We hold leader status... can safely do some work.
// Avoid blocking for too long, otherwise we may miss a poll and lose leader status.
log.Infof("Do important leader stuff")
time.Sleep(100 * time.Millisecond)
}
}
}()
// Blocks until Neli is closed.
neli.Await()
Configuration
There are handful of parameters that control goNELI's behaviour, assigned via the Config struct:
Design
Motivation
Traditionally, leader election is performed using an *Atomi
Related Skills
node-connect
339.3kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
xurl
339.3kA CLI tool for making authenticated requests to the X (Twitter) API. Use this skill when you need to post tweets, reply, quote, search, read posts, manage followers, send DMs, upload media, or interact with any X API v2 endpoint.
frontend-design
83.9kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
339.3kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
