Gorchestra
Goroutine orchestration & observability toolkit for Go. Manage worker lifecycles, supervise restarts with backoff, pass messages over typed channels, and expose live metrics & a tiny dashboard — all with minimal dependencies.
Install / Use
/learn @alibertay/GorchestraREADME
gorchestra
Goroutine orchestration & observability toolkit for Go. Manage worker lifecycles, supervise restarts with backoff, pass messages over typed channels, and expose live metrics & a tiny dashboard — all with minimal dependencies.
Why? When you build concurrent systems, you quickly need the same primitives: a place to spin up/tear down goroutines cleanly, a way to restart crashed workers with backoff, a simple mailbox/bus, and basic health/metrics to see what's going on. gorchestra gives you these pieces without forcing a framework.
Table of Contents
- Features
- Install
- Quick Start
- Core Concepts
- API Reference
- Testing & Leak Safety
- Design Notes & Guarantees
- FAQ
- Version & Requirements
- License
Features
- 🧵 Managed goroutines with a central
Orchestrator - 💓 Heartbeat & idle-timeout guard (stop workers that go quiet)
- 🔁 Supervisor with restart policies and exponential backoff + jitter
- ✉️ Typed mailbox/channel with per-channel stats (len/cap/blocked/bytes)
- 🚌 Named bus to create topic channels on demand
- 📊 Prometheus metrics + /metrics endpoint
- 📈 Tiny HTML dashboard at
/gorchestra(sortable table + live CPU/queues) - 🧠 goleak-friendly tests (no goroutine leaks)
- 🧹 Graceful shutdown with bounded wait
- 🧰 No heavy framework; import what you need
Go’s runtime doesn’t expose per-goroutine CPU/mem. gorchestra approximates CPU/busyness via blocking/processing time it can observe (mailboxes; explicit
AddBusy, etc.). Treat CPU% as a helpful, not exact, signal.
Install
go get github.com/alibertay/gorchestra
Add Prometheus only if you use metrics or the obs server:
go get github.com/prometheus/client_golang@v1
Quick Start
Run managed goroutines
package main
import (
"context"
"log"
"time"
g "github.com/alibertay/gorchestra"
)
func main() {
orch := g.New()
// Start a managed routine
r := orch.Go(func(ctx context.Context, self *g.Routine) error {
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
self.Beat() // heartbeat (resets idle timer)
self.AddBusy(10*time.Millisecond) // optional: record work time
// do work...
}
}
}, g.WithName("ticker-250ms"), g.WithIdleTimeout(5*time.Second), g.WithQueueCap(256))
// Wait a little, then shut down everything
time.Sleep(2 * time.Second)
r.Kill()
if err := orch.Shutdown(3 * time.Second); err != nil {
log.Printf("shutdown: %v", err)
}
}
Send messages with typed channels
// Channel is typed; optionally implement Sizer (SizeBytes() int) on your payload
ch := g.NewChannel[string](1024)
go func() {
_ = ch.Send(context.Background(), "hello")
}()
msg, err := ch.Recv(context.Background())
_ = msg; _ = err
stats := ch.Stats() // Len/Cap/Sends/Recvs/Blocked[Send|Recv]Ns/ApproxBytes
Or create a bus of named topics:
bus := g.NewBus[[]byte]()
topic := bus.Topic("price-feed", 4096)
_ = topic.Send(ctx, payload)
Supervise with restart backoff
r := orch.GoSupervised(func(ctx context.Context, self *g.Routine) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
// do risky work; return error to trigger RestartOnFailure/Always
return nil
}
}
},
g.WithSupName("super-worker"),
g.WithSupPolicy(g.RestartOnFailure),
g.WithSupBackoff(500*time.Millisecond, 30*time.Second, 2.0, 0.2), // initial, max, multiplier, jitter
g.WithSupIdleTimeout(10*time.Second),
g.WithSupQueueCap(128),
)
_ = r
Expose metrics & dashboard
import (
"log"
obs "github.com/alibertay/gorchestra/obs"
g "github.com/alibertay/gorchestra"
)
func main() {
orch := g.New()
// Start HTTP server with /metrics, /gorchestra, and optional pprof
s := obs.NewServer(orch,
obs.WithAddr(":9090"),
obs.WithPProf(true),
obs.WithDashboard(true),
obs.WithCPUSampleEvery(500*time.Millisecond),
obs.WithTopicSampleEvery(1*time.Second),
)
go func() { log.Fatal(s.Start()) }()
// ...
}
- Dashboard:
GET http://localhost:9090/gorchestra - Prometheus:
GET http://localhost:9090/metrics - pprof:
GET http://localhost:9090/debug/pprof/(if enabled)
You can also hook directly into Prometheus without the server via
metrics.NewPrometheusCollector(orch).
Graceful shutdown
orch.KillAll()
if err := orch.Shutdown(5 * time.Second); err != nil {
log.Printf("shutdown timed out: %v", err)
}
Core Concepts
Orchestrator
Central registry of managed goroutines. Spawns routines, tracks their state, prints stats, shuts them down, and exposes snapshots for metrics/UI.
Routine
A managed goroutine with a unique ID & name. Provides a context, heartbeat (Beat()), busy-time recording (AddBusy), a mailbox (Mailbox()), restart counter (Restarts()), Kill() and Wait() helpers, and an idle watchdog (if WithIdleTimeout > 0).
Channel & Bus
A typed channel wrapper with additional stats and (optional) payload sizing via Sizer{ SizeBytes() int }. A Bus[T] lets you request or create named topic channels on demand.
Supervisor
A thin wrapper that restarts a routine based on a policy and backoff configuration. Useful for “always-on” workers that may fail transiently.
Observability Server
A small HTTP server that ships with gorchestra. It serves Prometheus metrics, a tiny dashboard, and optionally pprof. It also samples process CPU usage and exports gauges for topic lengths/bytes so you see backpressure build-ups.
Prometheus Collector
If you already have your own HTTP stack, you can register the standalone collector and expose it yourself.
API Reference
Package import paths (shortened below):
- Core:
github.com/alibertay/gorchestra- Observability:
github.com/alibertay/gorchestra/obs- Prometheus collector:
github.com/alibertay/gorchestra/metrics
Orchestrator API
// Create a new orchestrator
func New() *Orchestrator
// Run a managed routine once
func (o *Orchestrator) Go(
fn func(ctx context.Context, self *Routine) error,
opts ...RoutineOption,
) *Routine
// Run a supervised routine that can restart based on policy/backoff
func (o *Orchestrator) GoSupervised(
fn func(ctx context.Context, self *Routine) error,
opts ...SupervisorOption,
) *Routine
// Lookup & listing
func (o *Orchestrator) Get(id uint64) (*Routine, bool)
func (o *Orchestrator) List() []*Routine
// Printing current stats in a table
func (o *Orchestrator) PrintStats(w io.Writer)
// Stop all routines now (best-effort cancel)
func (o *Orchestrator) KillAll()
// Wait for drain with a bound (best-effort)
func (o *Orchestrator) Shutdown(d time.Duration) error
// Stable snapshot for metrics/UI
type PublicSnapshot struct {
ID uint64 `json:"id"`
Name string `json:"name"`
State string `json:"state"` // INIT/RUNNING/STOPPING/TIMED_OUT/PANICKED/STOPPED
Health Health `json:"health"` // OK/IDLE_TIMEOUT/STOPPING/...
QueueLen int `json:"queueLen"`
QueueBytes int64 `json:"queueBytes"`
Restarts uint64 `json:"restarts"`
}
func (o *Orchestrator) PublicSnapshots() []PublicSnapshot
Routine API
type RoutineState int32
const (
StateInit StateRunning StateStopping StateTimedOut StatePanicked StateStopped
)
func (r *Routine) ID() uint64
func (r *Routine) Name() string
func (r *Routine) Context() context.Context
func (r *Routine) Mailbox() *Channel[any]
func (r *Routine) Restarts() uint64
func (r *Routine) Beat() // heartbeat (resets idle timer)
func (r *Routine) AddBusy(d time.Duration) // add "busy" time (approx CPU)
func (r *Routine) Kill()
func (r *Routine) Wait() error
// Routine options
func WithName(n string) RoutineOption
func WithQueueCap(n int) RoutineOption
func WithIdleTimeout(d time.Duration) RoutineOption
Channel & Bus API
type Sizer interface{ SizeBytes() int }
type Channel[T any] struct{ /* ... */ }
func NewChannel[T any](capacity int) *Channel[T]
func (c *Channel[T]) Send(ctx context.Context, v T) error
func (c *Channel[T]) Recv(ctx context.Context) (T, error)
func (c *Channel[T]) Len() int
func (c *Channel[T]) Cap() int
type ChannelStats struct {
Len, Cap int
Sends, Recvs uint64
BlockedSendNs int64
BlockedRecvNs int64
ApproxBytes int64
}
func (c *Channel[T]) Stats() ChannelStats
type Bus[T any] struct{ /* ... */ }
func NewBus[T any]() *Bus[T]
func (b *Bus[T]) Topic(name string, capacity int) *Channel[T]
Supervisor options
type RestartPolicy int
const (
RestartNever Res
