Ziggurat
No description available
Install / Use
/learn @gojekfarm/ZigguratREADME
Ziggurat Golang
Consumer Orchestration made easy. Consume events from Kafka without hassles. Ziggurat Golang is a library that aims to simplify the consumer orchestration and lets you focus on your business logic. Define simple functions to handle events from Kafka.
<!-- TOC -->- Ziggurat Golang
- Install the ziggurat CLI
- Features
- How to consume messages from Kafka
- Configuring the
Zigguratstruct - Ziggurat Handler interface
- Ziggurat Event struct
- Ziggurat MessageConsumer interface
- Using Kafka Consumer
- How to use the ziggurat Event Router
- Retries using RabbitMQ
- I have a lot of messages in my dead letter queue, how do I replay them
Install the ziggurat CLI
go install github.com/gojekfarm/ziggurat/v2/cmd/ziggurat@latest
Using with an existing application
go get github.com/gojekfarm/ziggurat/v2
Creating a new app using the CLI
ziggurat new <app_name>
go mod tidy -v # cleans up dependencies
Building from source
make lib.build
Running unit tests
make lib.test
Running integration tests
docker-compose up -d # starts up the RabbitMQ and Kafka containers
make lib.test-integration
[!NOTE] There are no integration tests for Kafka, only for RabbitMQ
Coverage Report in HTML
lib.test-coverage-html
[!NOTE] For other make tests refer to the Makefile
Contribution guidelines
- Avoid exposing unwanted APIs publicly
- Make sure the APIs provide certain guarantees about what they return
- Leave concurrency to the caller, an API if using concurrency internally must leave the decision to invoke it concurrently to the caller. Your APIs should be as simple as a function / method which takes in args and returns a value. Make sure to document it if an API blocks forever, like the
ziggurat.Runfunction. - Keep configuration minimum and leave the decisions on the user
- Do not write an interface first and then implement, write a struct first and add your methods, discover interfaces do not invent them. A premature abstraction is mostly going to be a leaky one.
- Last but not the least, Keep it simple stupid.
Features
- Ziggurat-Go enables you to orchestrate multiple message consumers by decoupling the consumer implementation from the orchestration
- A small and simple API footprint
- Ziggurat Go currently supports only Kafka as a message consumer implementation
- Ziggurat Go includes a regex based router to support complex routing patterns
- Ziggurat Go provides a RabbitMQ middleware for retrying messages
- Ziggurat Go provides a RabbitMQ message consumer implementation to consume "retried" messages from RabbitMQ
- Ziggurat Go also includes a Prometheus middleware and exposes a Prometheus exporter server for instrumentation
How to consume messages from Kafka
package main
import (
"context"
"github.com/gojekfarm/ziggurat/v2"
"github.com/gojekfarm/ziggurat/v2/kafka"
"github.com/gojekfarm/ziggurat/v2/logger"
)
func main() {
var zig ziggurat.Ziggurat
router := ziggurat.NewRouter()
ctx := context.Background()
l := logger.NewLogger(logger.LevelInfo)
kcg := kafka.ConsumerGroup{
Logger: nil,
GroupConfig: kafka.ConsumerConfig{
BootstrapServers: "localhost:9092",
GroupID: "foo.id",
Topics: []string{"foo"},
},
}
router.HandlerFunc("foo.id/*", func(ctx context.Context, event *ziggurat.Event) {
})
h := ziggurat.Use(router)
if runErr := zig.Run(ctx, h, &kcg); runErr != nil {
l.Error("error running consumers", runErr)
}
}
Configuring the Ziggurat struct
ziggurat.Ziggurat{
Logger StructuredLogger // a logger implementation of ziggurat.StructuredLogger
ShutdownTimeout time.Duration // wait timeout when consumers are shutdown, default value: 6 seconds
ErrorHandler func(err error) // a notifier for when one of the message consumers is shutdown abruptly
}
[!NOTE] Some message consumer implementations might not honor the context timeout/cancelation and this can cause your application to hangup when it is killed, use the shutdown timeout value to exit the application even if the message consunmers misbehave. The default value is 6 seconds.
[!NOTE]
The zero value ofziggurat.Zigguratis perfectly valid and can be used without any issues
Ziggurat Run method
The ziggurat.Run method is used to start the consumer orchestration. It takes in a context.Context implementation,
a ziggurat.Handler and a variable number of message consumer implementations.
ctx := context.Background()
h := ziggurat.HandlerFunc(func (context.Context, *ziggurat.Event) {...})
groupOne := kafka.ConsumerGroup{...}
groupTwo := kafka.ConsumerGroup{...}
if runErr := zig.Run(ctx, h, &groupOne, &groupTwo); runErr != nil {
logger.Error("error running consumers", runErr)
}
[!NOTE] The
Runmethod returns aziggurat.ErrCleanShutdownincase of a clean shutdown
Ziggurat Handler interface
The ziggurat.Handler is an interface for handling ziggurat events, an event is just something that happens in a finite
timeframe. This event can come from
any source (kafka,redis,rabbitmq). The handler's job is to handle the event, i.e... the handler contains your
application's business logic
type Handler interface {
Handle(ctx context.Context, event *Event)
}
type HandlerFunc func (ctx context.Context, event *Event) // serves as an adapter for normal functions to be used as handlers
Any function / struct which implements the above handler interface can be used in the ziggurat.Run method. The ziggurat.Router also implements the above interface.
Writing custom re-usable middlewares
Middlewares are a good way to run specific code before every handler is run. They provide a neat way to abstract common code which can be composed with other middlewares
Any function/Method of the signature
type Middelware func(ziggurat.Handler) ziggurat.Handler
Can be used as a middleware in the ziggurat.Use function to compose middlewares
A practical example
I want to authenticate a certain user before I run my handler, if the auth succeeds only then I want to execute my business logic
Code snippet
type Auther interface {
Authenticate(user string) bool
}
type GoogleAuth struct{}
func (g GoogleAuth) Authenticate(user string) bool {
return user == "foo"
}
type AuthMiddleware struct {
Authenticator Auther
}
func (a *AuthMiddleware) Authenticate(next ziggurat.Handler) ziggurat.Handler {
return ziggurat.HandlerFunc(func(ctx context.Context, event *ziggurat.Event) {
type user struct{ Username string }
var u user
if err := json.Unmarshal(event.Value, &u); err != nil {
// handle error
return // do not execute the next handler
}
if a.Authenticator.Authenticate(u.Username) {
// if auth succeeds call the next handler
next.Handle(ctx, event)
return
}
// handle the failed auth
})
}
func main() {
var zig Ziggurat
kcg := kafka.ConsumerGroup{
Logger: logger.NewLogger(logger.LevelInfo),
GroupConfig: kafka.ConsumerConfig{
BootstrapServers: "localhost:9092",
GroupID: "foo.id",
ConsumerCount: 1,
Topics: []string{"^.*auth-log"},
},
}
authMW := &AuthMiddleware{Authenticator: &GoogleAuth{}}
router := ziggurat.NewRouter()
router.HandlerFunc("foo.id/india-auth-log$",func(ctx context.Context,e *ziggurat.Event){...})
router.HandlerFunc("foo.id/usa-auth-log$",func(ctx context.Context,e *ziggurat.Event){...})
router.HandlerFunc("foo.id/uk-auth-log$",func(ctx context.Context,e *ziggurat.Event){...})
router.HandlerFunc("foo.id/aus-auth-log$",func(ctx context.Context,e *ziggurat.Event){...})
router.HandlerFunc("foo.id/spain-auth-log$",func(ctx context.Context,e *ziggurat.Event){...})
// Authenticate u
