SkillAgentSearch skills...

Gorchestra

Goroutine orchestration & observability toolkit for Go. Manage worker lifecycles, supervise restarts with backoff, pass messages over typed channels, and expose live metrics & a tiny dashboard — all with minimal dependencies.

Install / Use

/learn @alibertay/Gorchestra
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

gorchestra

Goroutine orchestration & observability toolkit for Go. Manage worker lifecycles, supervise restarts with backoff, pass messages over typed channels, and expose live metrics & a tiny dashboard — all with minimal dependencies.

Why? When you build concurrent systems, you quickly need the same primitives: a place to spin up/tear down goroutines cleanly, a way to restart crashed workers with backoff, a simple mailbox/bus, and basic health/metrics to see what's going on. gorchestra gives you these pieces without forcing a framework.


Table of Contents


Features

  • 🧵 Managed goroutines with a central Orchestrator
  • 💓 Heartbeat & idle-timeout guard (stop workers that go quiet)
  • 🔁 Supervisor with restart policies and exponential backoff + jitter
  • ✉️ Typed mailbox/channel with per-channel stats (len/cap/blocked/bytes)
  • 🚌 Named bus to create topic channels on demand
  • 📊 Prometheus metrics + /metrics endpoint
  • 📈 Tiny HTML dashboard at /gorchestra (sortable table + live CPU/queues)
  • 🧠 goleak-friendly tests (no goroutine leaks)
  • 🧹 Graceful shutdown with bounded wait
  • 🧰 No heavy framework; import what you need

Go’s runtime doesn’t expose per-goroutine CPU/mem. gorchestra approximates CPU/busyness via blocking/processing time it can observe (mailboxes; explicit AddBusy, etc.). Treat CPU% as a helpful, not exact, signal.


Install

go get github.com/alibertay/gorchestra

Add Prometheus only if you use metrics or the obs server:

go get github.com/prometheus/client_golang@v1

Quick Start

Run managed goroutines

package main

import (
    "context"
    "log"
    "time"

    g "github.com/alibertay/gorchestra"
)

func main() {
    orch := g.New()

    // Start a managed routine
    r := orch.Go(func(ctx context.Context, self *g.Routine) error {
        ticker := time.NewTicker(250 * time.Millisecond)
        defer ticker.Stop()

        for {
            select {
            case <-ctx.Done():
                return ctx.Err()
            case <-ticker.C:
                self.Beat()             // heartbeat (resets idle timer)
                self.AddBusy(10*time.Millisecond) // optional: record work time
                // do work...
            }
        }
    }, g.WithName("ticker-250ms"), g.WithIdleTimeout(5*time.Second), g.WithQueueCap(256))

    // Wait a little, then shut down everything
    time.Sleep(2 * time.Second)
    r.Kill()
    if err := orch.Shutdown(3 * time.Second); err != nil {
        log.Printf("shutdown: %v", err)
    }
}

Send messages with typed channels

// Channel is typed; optionally implement Sizer (SizeBytes() int) on your payload
ch := g.NewChannel[string](1024)

go func() {
    _ = ch.Send(context.Background(), "hello")
}()

msg, err := ch.Recv(context.Background())
_ = msg; _ = err

stats := ch.Stats() // Len/Cap/Sends/Recvs/Blocked[Send|Recv]Ns/ApproxBytes

Or create a bus of named topics:

bus := g.NewBus[[]byte]()
topic := bus.Topic("price-feed", 4096)
_ = topic.Send(ctx, payload)

Supervise with restart backoff

r := orch.GoSupervised(func(ctx context.Context, self *g.Routine) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            // do risky work; return error to trigger RestartOnFailure/Always
            return nil
        }
    }
},
    g.WithSupName("super-worker"),
    g.WithSupPolicy(g.RestartOnFailure),
    g.WithSupBackoff(500*time.Millisecond, 30*time.Second, 2.0, 0.2), // initial, max, multiplier, jitter
    g.WithSupIdleTimeout(10*time.Second),
    g.WithSupQueueCap(128),
)
_ = r

Expose metrics & dashboard

import (
    "log"
    obs "github.com/alibertay/gorchestra/obs"
    g "github.com/alibertay/gorchestra"
)

func main() {
    orch := g.New()

    // Start HTTP server with /metrics, /gorchestra, and optional pprof
    s := obs.NewServer(orch,
        obs.WithAddr(":9090"),
        obs.WithPProf(true),
        obs.WithDashboard(true),
        obs.WithCPUSampleEvery(500*time.Millisecond),
        obs.WithTopicSampleEvery(1*time.Second),
    )
    go func() { log.Fatal(s.Start()) }()

    // ...
}
  • Dashboard: GET http://localhost:9090/gorchestra
  • Prometheus: GET http://localhost:9090/metrics
  • pprof: GET http://localhost:9090/debug/pprof/ (if enabled)

You can also hook directly into Prometheus without the server via metrics.NewPrometheusCollector(orch).

Graceful shutdown

orch.KillAll()
if err := orch.Shutdown(5 * time.Second); err != nil {
    log.Printf("shutdown timed out: %v", err)
}

Core Concepts

Orchestrator

Central registry of managed goroutines. Spawns routines, tracks their state, prints stats, shuts them down, and exposes snapshots for metrics/UI.

Routine

A managed goroutine with a unique ID & name. Provides a context, heartbeat (Beat()), busy-time recording (AddBusy), a mailbox (Mailbox()), restart counter (Restarts()), Kill() and Wait() helpers, and an idle watchdog (if WithIdleTimeout > 0).

Channel & Bus

A typed channel wrapper with additional stats and (optional) payload sizing via Sizer{ SizeBytes() int }. A Bus[T] lets you request or create named topic channels on demand.

Supervisor

A thin wrapper that restarts a routine based on a policy and backoff configuration. Useful for “always-on” workers that may fail transiently.

Observability Server

A small HTTP server that ships with gorchestra. It serves Prometheus metrics, a tiny dashboard, and optionally pprof. It also samples process CPU usage and exports gauges for topic lengths/bytes so you see backpressure build-ups.

Prometheus Collector

If you already have your own HTTP stack, you can register the standalone collector and expose it yourself.


API Reference

Package import paths (shortened below):

  • Core: github.com/alibertay/gorchestra
  • Observability: github.com/alibertay/gorchestra/obs
  • Prometheus collector: github.com/alibertay/gorchestra/metrics

Orchestrator API

// Create a new orchestrator
func New() *Orchestrator

// Run a managed routine once
func (o *Orchestrator) Go(
    fn func(ctx context.Context, self *Routine) error,
    opts ...RoutineOption,
) *Routine

// Run a supervised routine that can restart based on policy/backoff
func (o *Orchestrator) GoSupervised(
    fn func(ctx context.Context, self *Routine) error,
    opts ...SupervisorOption,
) *Routine

// Lookup & listing
func (o *Orchestrator) Get(id uint64) (*Routine, bool)
func (o *Orchestrator) List() []*Routine

// Printing current stats in a table
func (o *Orchestrator) PrintStats(w io.Writer)

// Stop all routines now (best-effort cancel)
func (o *Orchestrator) KillAll()

// Wait for drain with a bound (best-effort)
func (o *Orchestrator) Shutdown(d time.Duration) error

// Stable snapshot for metrics/UI
type PublicSnapshot struct {
    ID         uint64 `json:"id"`
    Name       string `json:"name"`
    State      string `json:"state"` // INIT/RUNNING/STOPPING/TIMED_OUT/PANICKED/STOPPED
    Health     Health `json:"health"` // OK/IDLE_TIMEOUT/STOPPING/...
    QueueLen   int    `json:"queueLen"`
    QueueBytes int64  `json:"queueBytes"`
    Restarts   uint64 `json:"restarts"`
}
func (o *Orchestrator) PublicSnapshots() []PublicSnapshot

Routine API

type RoutineState int32
const (
    StateInit StateRunning StateStopping StateTimedOut StatePanicked StateStopped
)

func (r *Routine) ID() uint64
func (r *Routine) Name() string
func (r *Routine) Context() context.Context
func (r *Routine) Mailbox() *Channel[any]

func (r *Routine) Restarts() uint64
func (r *Routine) Beat()                    // heartbeat (resets idle timer)
func (r *Routine) AddBusy(d time.Duration)  // add "busy" time (approx CPU)
func (r *Routine) Kill()
func (r *Routine) Wait() error

// Routine options
func WithName(n string) RoutineOption
func WithQueueCap(n int) RoutineOption
func WithIdleTimeout(d time.Duration) RoutineOption

Channel & Bus API

type Sizer interface{ SizeBytes() int }

type Channel[T any] struct{ /* ... */ }
func NewChannel[T any](capacity int) *Channel[T]
func (c *Channel[T]) Send(ctx context.Context, v T) error
func (c *Channel[T]) Recv(ctx context.Context) (T, error)
func (c *Channel[T]) Len() int
func (c *Channel[T]) Cap() int
type ChannelStats struct {
    Len, Cap      int
    Sends, Recvs  uint64
    BlockedSendNs int64
    BlockedRecvNs int64
    ApproxBytes   int64
}
func (c *Channel[T]) Stats() ChannelStats

type Bus[T any] struct{ /* ... */ }
func NewBus[T any]() *Bus[T]
func (b *Bus[T]) Topic(name string, capacity int) *Channel[T]

Supervisor options

type RestartPolicy int
const (
    RestartNever Res
View on GitHub
GitHub Stars6
CategoryDevelopment
Updated4mo ago
Forks0

Languages

Go

Security Score

67/100

Audited on Oct 30, 2025

No findings