SkillAgentSearch skills...

Ziggurat

No description available

Install / Use

/learn @gojekfarm/Ziggurat
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Ziggurat Golang

Consumer Orchestration made easy. Consume events from Kafka without hassles. Ziggurat Golang is a library that aims to simplify the consumer orchestration and lets you focus on your business logic. Define simple functions to handle events from Kafka.

<!-- TOC --> <!-- TOC -->

Install the ziggurat CLI

go install github.com/gojekfarm/ziggurat/v2/cmd/ziggurat@latest                                                                                                                                                    

Using with an existing application

go get github.com/gojekfarm/ziggurat/v2

Creating a new app using the CLI

ziggurat new <app_name>
go mod tidy -v  # cleans up dependencies

Building from source

make lib.build

Running unit tests

make lib.test

Running integration tests

docker-compose up -d # starts up the RabbitMQ and Kafka containers
make lib.test-integration

[!NOTE] There are no integration tests for Kafka, only for RabbitMQ

Coverage Report in HTML

lib.test-coverage-html

[!NOTE] For other make tests refer to the Makefile

Contribution guidelines

  • Avoid exposing unwanted APIs publicly
  • Make sure the APIs provide certain guarantees about what they return
  • Leave concurrency to the caller, an API if using concurrency internally must leave the decision to invoke it concurrently to the caller. Your APIs should be as simple as a function / method which takes in args and returns a value. Make sure to document it if an API blocks forever, like the ziggurat.Run function.
  • Keep configuration minimum and leave the decisions on the user
  • Do not write an interface first and then implement, write a struct first and add your methods, discover interfaces do not invent them. A premature abstraction is mostly going to be a leaky one.
  • Last but not the least, Keep it simple stupid.

Features

  • Ziggurat-Go enables you to orchestrate multiple message consumers by decoupling the consumer implementation from the orchestration
  • A small and simple API footprint
  • Ziggurat Go currently supports only Kafka as a message consumer implementation
  • Ziggurat Go includes a regex based router to support complex routing patterns
  • Ziggurat Go provides a RabbitMQ middleware for retrying messages
  • Ziggurat Go provides a RabbitMQ message consumer implementation to consume "retried" messages from RabbitMQ
  • Ziggurat Go also includes a Prometheus middleware and exposes a Prometheus exporter server for instrumentation

How to consume messages from Kafka

package main

import (
	"context"
	"github.com/gojekfarm/ziggurat/v2"
	"github.com/gojekfarm/ziggurat/v2/kafka"
	"github.com/gojekfarm/ziggurat/v2/logger"
)

func main() {
	var zig ziggurat.Ziggurat
	router := ziggurat.NewRouter()

	ctx := context.Background()
	l := logger.NewLogger(logger.LevelInfo)

	kcg := kafka.ConsumerGroup{
		Logger: nil,
		GroupConfig: kafka.ConsumerConfig{
			BootstrapServers: "localhost:9092",
			GroupID:          "foo.id",
			Topics:           []string{"foo"},
		},
	}

	router.HandlerFunc("foo.id/*", func(ctx context.Context, event *ziggurat.Event)  {
		
	})

	h := ziggurat.Use(router)

	if runErr := zig.Run(ctx, h, &kcg); runErr != nil {
		l.Error("error running consumers", runErr)
	}

}

Configuring the Ziggurat struct

ziggurat.Ziggurat{
    Logger            StructuredLogger  // a logger implementation of ziggurat.StructuredLogger
    ShutdownTimeout  time.Duration      // wait timeout when consumers are shutdown, default value: 6 seconds
    ErrorHandler     func(err error)    // a notifier for when one of the message consumers is shutdown abruptly
}

[!NOTE] Some message consumer implementations might not honor the context timeout/cancelation and this can cause your application to hangup when it is killed, use the shutdown timeout value to exit the application even if the message consunmers misbehave. The default value is 6 seconds.

[!NOTE]
The zero value of ziggurat.Ziggurat is perfectly valid and can be used without any issues

Ziggurat Run method

The ziggurat.Run method is used to start the consumer orchestration. It takes in a context.Context implementation, a ziggurat.Handler and a variable number of message consumer implementations.

ctx := context.Background()
h := ziggurat.HandlerFunc(func (context.Context, *ziggurat.Event)  {...})
groupOne := kafka.ConsumerGroup{...}
groupTwo := kafka.ConsumerGroup{...}
if runErr := zig.Run(ctx, h, &groupOne, &groupTwo); runErr != nil {
    logger.Error("error running consumers", runErr)
}

[!NOTE] The Run method returns a ziggurat.ErrCleanShutdown incase of a clean shutdown

Ziggurat Handler interface

The ziggurat.Handler is an interface for handling ziggurat events, an event is just something that happens in a finite timeframe. This event can come from any source (kafka,redis,rabbitmq). The handler's job is to handle the event, i.e... the handler contains your application's business logic

type Handler interface {
    Handle(ctx context.Context, event *Event) 
}
type HandlerFunc func (ctx context.Context, event *Event)  // serves as an adapter for normal functions to be used as handlers

Any function / struct which implements the above handler interface can be used in the ziggurat.Run method. The ziggurat.Router also implements the above interface.

Writing custom re-usable middlewares

Middlewares are a good way to run specific code before every handler is run. They provide a neat way to abstract common code which can be composed with other middlewares

Any function/Method of the signature

type Middelware func(ziggurat.Handler) ziggurat.Handler

Can be used as a middleware in the ziggurat.Use function to compose middlewares

A practical example

I want to authenticate a certain user before I run my handler, if the auth succeeds only then I want to execute my business logic

Code snippet

type Auther interface {
	Authenticate(user string) bool
}

type GoogleAuth struct{}

func (g GoogleAuth) Authenticate(user string) bool {
	return user == "foo"
}

type AuthMiddleware struct {
	Authenticator Auther
}

func (a *AuthMiddleware) Authenticate(next ziggurat.Handler) ziggurat.Handler {
	return ziggurat.HandlerFunc(func(ctx context.Context, event *ziggurat.Event) {
		type user struct{ Username string }
		var u user
		if err := json.Unmarshal(event.Value, &u); err != nil {
			// handle error
			return // do not execute the next handler
		}
		if a.Authenticator.Authenticate(u.Username) {
			// if auth succeeds call the next handler
			next.Handle(ctx, event)
			return
		}
		// handle the failed auth
	})
}

func main() {
	var zig Ziggurat
	kcg := kafka.ConsumerGroup{
		Logger: logger.NewLogger(logger.LevelInfo),
		GroupConfig: kafka.ConsumerConfig{
			BootstrapServers: "localhost:9092",
			GroupID:          "foo.id",
			ConsumerCount:    1,
			Topics:           []string{"^.*auth-log"},
		},
	}
	
	authMW := &AuthMiddleware{Authenticator: &GoogleAuth{}}
	router := ziggurat.NewRouter()
	router.HandlerFunc("foo.id/india-auth-log$",func(ctx context.Context,e *ziggurat.Event){...})
	router.HandlerFunc("foo.id/usa-auth-log$",func(ctx context.Context,e *ziggurat.Event){...})
	router.HandlerFunc("foo.id/uk-auth-log$",func(ctx context.Context,e *ziggurat.Event){...})
	router.HandlerFunc("foo.id/aus-auth-log$",func(ctx context.Context,e *ziggurat.Event){...})
	router.HandlerFunc("foo.id/spain-auth-log$",func(ctx context.Context,e *ziggurat.Event){...})
	
	// Authenticate u
View on GitHub
GitHub Stars38
CategoryDevelopment
Updated7mo ago
Forks12

Languages

Go

Security Score

77/100

Audited on Aug 7, 2025

No findings