Rop3
Railway Oriented Programming (ROP)
Install / Use
/learn @ib-77/Rop3README
Railway Oriented Programming (ROP) for Go
Lightweight, composable primitives for building robust synchronous and concurrent processing pipelines in Go using the Railway Oriented Programming pattern.
Overview
Railway Oriented Programming is a functional programming pattern for handling errors in a clean and composable way. This Go library implements ROP principles using generics, allowing you to build robust error handling pipelines.
Install
go get github.com/ib-77/rop3
Import paths follow the module name from go.mod:
import (
"github.com/ib-77/rop3/pkg/rop"
"github.com/ib-77/rop3/pkg/rop/solo"
"github.com/ib-77/rop3/pkg/rop/mass"
"github.com/ib-77/rop3/pkg/rop/lite"
"github.com/ib-77/rop3/pkg/rop/core"
"github.com/ib-77/rop3/pkg/rop/chain"
"github.com/ib-77/rop3/pkg/rop/tiny"
"github.com/ib-77/rop3/pkg/rop/custom"
)
Packages
rop: coreResult[T], constructors, and helperssolo: synchronous single-value helpers for ROPchain: context-aware fluent chaining overResult[T]tiny: minimal synchronous fluent chaining with contextlite: ergonomic channel-lifted helpers and worker utilitiesmass: lower-level channel primitives for concurrent pipelinescustom: cancellation-aware wrappers and helperscore: channel I/O and worker orchestration utilities
Core concepts
Railway Oriented Programming visualizes program flow as a railway track:
- Success Track: When operations succeed, they continue along the main track.
- Failure Track: When operations fail, they switch to a parallel error track.
Diagram (simplified):
This repository centers around a typed Result[T] that carries either a value, an error (fail), or a cancel signal. The package split is roughly:
solo: single-value, synchronous composition helpersmass: channel-based building blocks that liftsolooperations over channelslite: ergonomic wrappers aroundmassto simplify common patterns and set up multi-worker stagescore: utilities for channel IO and worker orchestration
Common primitives and what they do
- Result[T]: a typed container that can be success, failure, or cancel.
rop.Success(value)/rop.Fail[T](err)/rop.Cancel[T](err)- Inspect with
IsSuccess(),IsCancel(),Err(),Result()
Operations (short snippets)
Below are compact, single-line snippets showing common lite usage. Full runnable examples are in examples/lite_examples.
- Validate:
lite.Validate(func(ctx context.Context, s string) (bool, string) { return s!="", "empty" })
- Switch:
lite.Switch(func(ctx context.Context, s string) rop.Result[int] { return rop.Success(1) })
- Map:
lite.Map(func(ctx context.Context, n int) int { return n*2 })
- DoubleMap:
lite.DoubleMap(func(ctx context.Context, s string) string { return s }, func(ctx context.Context, err error) string { return "err" }, func(ctx context.Context, err error) string { return "cancel" })
- Tee / DoubleTee:
lite.Tee(func(ctx context.Context, r rop.Result[int]) { /* side effect */ })
lite.DoubleTee(func(ctx context.Context, v int) {}, func(ctx context.Context, err error) {}, func(ctx context.Context, err error) {})
- Try:
lite.Try(func(ctx context.Context, s string) (int, error) { return strconv.Atoi(s) })
- Finally:
lite.Finally(ctx, inCh, mass.FinallyHandlers[In, Out]{ OnSuccess: ... })
Workers and execution patterns
The lite package provides small helpers to run stages with worker pools. Here are the common worker patterns:
- Run: execute a stage over an input channel of
Result[T]without changing the result type. Use multiple workers to parallelize processing.
out := core.FromChanMany(context.Background(),
lite.Run(context.Background(), core.ToChanManyResults(context.Background(), []int{1,2,3}),
lite.Tee(func(_ context.Context, n int) { fmt.Println("processing", n) }), 3), // 3 workers
)
- Turnout: execute a stage that changes the underlying value type (e.g., from string to int). This is used when a stage needs to change the pipeline's payload type.
out := core.FromChanMany(context.Background(),
lite.Turnout(context.Background(),
lite.Run(context.Background(), core.ToChanManyResults(context.Background(), []string{"1","2"}),
lite.Switch(toNumber), 2), // Turnout wraps a Run that changes type
, 2),
)
Cancellation
The custom package provides helper wrappers around lite/mass that let you supply cancellation-aware handlers and utilities to "cancel" or drain remaining results when a context is cancelled.
Key points:
custom.Run/custom.Turnoutacceptcore.CancellationHandlersso you can control what happens on cancellation (unprocessed, processed, or whole-stream cancellation).- The
custom/cancels.gohelpers likeCancelRemainingResults,CancelRemainingValue,CancelResultsandCancelRemainingValuesmake it easy to drain or convert remaining inputs when cancellation happens. - Toggle processing remaining items with
core.WithProcessOptions(ctx, true)— when enabled, the cancel helpers will emit corresponding canceled outputs for remaining items.
Compact example (wire up cancel handlers):
ctx = core.WithProcessOptions(ctx, true) // enable draining remaining items on cancel
handlers := core.CancellationHandlers[string, string]{
OnCancel: func(ctx context.Context, inCh <-chan rop.Result[string], outCh chan<- rop.Result[string]) {
custom.CancelResults(ctx, inCh, outCh)
},
OnCancelUnprocessed: func(ctx context.Context, in rop.Result[string], outCh chan<- rop.Result[string]) {
custom.CancelRemainingResult(ctx, in, outCh)
},
}
out := custom.Run(ctx, core.ToChanManyResults(ctx, inputs), custom.Validate(validateFn, nil), handlers, nil, 2)
This pattern gives you fine-grained control over pipeline behavior when cancellation occurs (emit cancel markers, map to final values, or simply drain remaining inputs).
- Single-worker execution: run the pipeline with one worker using
lite.Run(..., 1). This is useful for deterministic, ordered processing or when parallelism is not desired.
out := core.FromChanMany(context.Background(),
lite.Run(context.Background(), core.ToChanManyResults(context.Background(), []int{1,2,3}),
lite.Map(func(_ context.Context, n int) int { return n * 10 }), 1), // one worker
)
Synchronous chains
Use fluent chains when you don't need channels.
- Chain (context-aware):
ctx := context.Background()
out := chain.Start[int](ctx).
Then(func(ctx context.Context, n int) (int, error) { return n * 2, nil }).
ThenTry(func(ctx context.Context, n int) (int, error) {
if n > 10 { return 0, fmt.Errorf("too big") }
return n, nil
}).
Map(func(ctx context.Context, n int) string { return fmt.Sprintf("n=%d", n) }).
Ensure(func(ctx context.Context, r rop.Result[string]) rop.Result[string] { return r }).
Finally(func(ctx context.Context, r rop.Result[string]) string {
if r.IsSuccess() { return r.Result() }
return "invalid"
}).
Result(rop.Success(5)) // final string
// Alternatively, start with a value:
out2 := chain.FromValue(ctx, 7).
Then(func(ctx context.Context, n int) (int, error) { return n + 1, nil }).
Finally(func(ctx context.Context, r rop.Result[int]) int { return r.UnwrapOr(0) }).
Result(rop.Success(7))
- Tiny (no context):
out := tiny.Start[int]().
Then(func(n int) int { return n * 2 }).
ThenTry(func(n int) (int, error) {
if n%2 == 0 { return n, nil }
return 0, fmt.Errorf("odd not allowed")
}).
Map(func(n int) string { return fmt.Sprintf("n=%d", n) }).
Ensure(func(r rop.Result[string]) rop.Result[string] { return r }).
Finally(func(r rop.Result[string]) string {
if r.IsSuccess() { return r.Result() }
return "invalid"
}).
Result(rop.Success(4))
Note: Then short-circuits on failures, ThenTry converts errors to Fail, Ensure enforces invariants, and Finally reduces to a final value.
Example: URL processing pipeline
The example below mirrors tests/pipeline_test.go and demonstrates building a concurrent pipeline that:
- Validates input URLs
- Tries to fetch a page title (mocked in tests)
- Switches to a title-length calculation
- Finally maps each result to a printable string, returning "invalid" on errors/cancels
package main
import (
"context"
"fmt"
"strings"
"github.com/ib-77/rop3/pkg/rop"
"github.com/ib-77/rop3/pkg/rop/core"
"github.com/ib-77/rop3/pkg/rop/lite"
"github.com/ib-77/rop3/pkg/rop/mass"
)
func main() {
urls := []string{
"https://www.example.com",
"invalid-url",
"https://www.test.org",
}
out := processRequest(urls)
for i, v := range out {
fmt.Printf("%d: %s -> %s\n", i+1, urls[i], v)
}
}
func processRequest(urls []string) []string {
ctx := context.Background()
finallyHandlers := mass.FinallyHandlers[int, string]{
OnSuccess: func(ctx context.Context, r int) string { return fmt.Sprintf("title length: %d", r) },
OnError: func(ctx context.Context, err error) string { return "invalid" },
OnCancel: func(ctx context.Context, err error) string { return "invalid" },
}
return core.FromChanMany(ctx,
lite.Finally(ctx,
lite.Turnout(ctx,
lite.Turnout(ctx,
lite.Run(ctx,
core.ToChanManyResults(ctx, urls),
lite.Validate(validateURLTest), 2),
lite.Try(mockFetchTitle), 2),
lite.Switch(calculateTitleLength), 2),
finallyHandlers,
),
)
}
// validateURLTest checks structure and scheme in this test/demo version
func vali
Related Skills
node-connect
338.0kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
83.4kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
338.0kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
commit-push-pr
83.4kCommit, push, and open a PR
