Gleam
Fast, efficient, and scalable distributed map/reduce system, DAG execution, in memory or on disk, written in pure Go, runs standalone or distributedly.
Install / Use
/learn @chrislusf/GleamREADME
Gleam
Gleam is a high performance and efficient distributed execution system, and also simple, generic, flexible and easy to customize.
Gleam is built in Go, and the user defined computation can be written in Go, Unix pipe tools, or any streaming programs.
High Performance
- Pure Go mappers and reducers have high performance and concurrency.
- Data flows through memory, optionally to disk.
- Multiple map reduce steps are merged together for better performance.
Memory Efficient
- Gleam does not have the common GC problem that plagued other languages. Each executor runs in a separated OS process. The memory is managed by the OS. One machine can host many more executors.
- Gleam master and agent servers are memory efficient, consuming about 10 MB memory.
- Gleam tries to automatically adjust the required memory size based on data size hints, avoiding the try-and-error manual memory tuning effort.
Flexible
- The Gleam flow can run standalone or distributed.
- Adjustable in memory mode or OnDisk mode.
Easy to Customize
- The Go code is much simpler to read than Scala, Java, C++.
One Flow, Multiple ways to execute
Gleam code defines the flow, specifying each dataset(vertex) and computation step(edge), and build up a directed acyclic graph(DAG). There are multiple ways to execute the DAG.
The default way is to run locally. This works in most cases.
Here we mostly talk about the distributed mode.
Distributed Mode
The distributed mode has several names to explain: Master, Agent, Executor, Driver.
Gleam Driver
- Driver is the program users write, it defines the flow, and talks to Master, Agents, and Executors.
Gleam Master
- The Master is one single server that collects resource information from Agents.
- It stores transient resource information and can be restarted.
- When the Driver program starts, it asks the Master for available Executors on Agents.
Gleam Agent
- Agents runs on any machine that can run computations.
- Agents periodically send resource usage updates to Master.
- When the Driver program has executors assigned, it talks to the Agents to start Executors.
- Agents also manage datasets generated by each Executors.
Gleam Executor
- Executors are started by Agents. They will read inputs from external or previous datasets, process them, and output to a new dataset.
Dataset
- The datasets are managed by Agents. By default, the data run only through memory and network, not touching slow disk.
- Optionally the data can be persist to disk.
By leaving it in memory, the flow can have back pressure, and can support stream computation naturally.
Documentation
Standalone Example
Word Count
Word Count
Basically, you need to register the Go functions first. It will return a mapper or reducer function id, which we can pass it to the flow.
package main
import (
"flag"
"strings"
"github.com/chrislusf/gleam/distributed"
"github.com/chrislusf/gleam/flow"
"github.com/chrislusf/gleam/gio"
"github.com/chrislusf/gleam/plugins/file"
)
var (
isDistributed = flag.Bool("distributed", false, "run in distributed or not")
Tokenize = gio.RegisterMapper(tokenize)
AppendOne = gio.RegisterMapper(appendOne)
Sum = gio.RegisterReducer(sum)
)
func main() {
gio.Init() // If the command line invokes the mapper or reducer, execute it and exit.
flag.Parse() // optional, since gio.Init() will call this also.
f := flow.New("top5 words in passwd").
Read(file.Txt("/etc/passwd", 2)). // read a txt file and partitioned to 2 shards
Map("tokenize", Tokenize). // invoke the registered "tokenize" mapper function.
Map("appendOne", AppendOne). // invoke the registered "appendOne" mapper function.
ReduceByKey("sum", Sum). // invoke the registered "sum" reducer function.
Sort("sortBySum", flow.OrderBy(2, true)).
Top("top5", 5, flow.OrderBy(2, false)).
Printlnf("%s\t%d")
if *isDistributed {
f.Run(distributed.Option())
} else {
f.Run()
}
}
func tokenize(row []interface{}) error {
line := gio.ToString(row[0])
for _, s := range strings.FieldsFunc(line, func(r rune) bool {
return !('A' <= r && r <= 'Z' || 'a' <= r && r <= 'z' || '0' <= r && r <= '9')
}) {
gio.Emit(s)
}
return nil
}
func appendOne(row []interface{}) error {
row = append(row, 1)
gio.Emit(row...)
return nil
}
func sum(x, y interface{}) (interface{}, error) {
return gio.ToInt64(x) + gio.ToInt64(y), nil
}
Now you can execute the binary directly or with "-distributed" option to run in distributed mode. The distributed mode would need a simple setup described later.
A bit more blown up example is here, using the predefined mapper or reducer: https://github.com/chrislusf/gleam/blob/master/examples/word_count_in_go/word_count_in_go.go
Word Count by Unix Pipe Tools
Here is another way to do the similar by unix pipe tools.
Unix Pipes are easy for sequential pipes, but limited to fan out, and even more limited to fan in.
With Gleam, fan-in and fan-out parallel pipes become very easy.
package main
import (
"fmt"
"github.com/chrislusf/gleam/flow"
"github.com/chrislusf/gleam/gio"
"github.com/chrislusf/gleam/gio/mapper"
"github.com/chrislusf/gleam/plugins/file"
"github.com/chrislusf/gleam/util"
)
func main() {
gio.Init()
flow.New("word count by unix pipes").
Read(file.Txt("/etc/passwd", 2)).
Map("tokenize", mapper.Tokenize).
Pipe("lowercase", "tr 'A-Z' 'a-z'").
Pipe("sort", "sort").
Pipe("uniq", "uniq -c").
OutputRow(func(row *util.Row) error {
fmt.Printf("%s\n", gio.ToString(row.K[0]))
return nil
}).Run()
}
This example used OutputRow() to process the output row directly.
Join two CSV files.
Assume there are file "a.csv" has fields "a1, a2, a3, a4, a5" and file "b.csv" has fields "b1, b2, b3". We want to join the rows where a1 = b2. And the output format should be "a1, a4, b3".
package main
import (
. "github.com/chrislusf/gleam/flow"
"github.com/chrislusf/gleam/gio"
"github.com/chrislusf/gleam/plugins/file"
)
func main() {
gio.Init()
f := New("join a.csv and b.csv by a1=b2")
a := f.Read(file.Csv("a.csv", 1)).Select("select", Field(1,4)) // a1, a4
b := f.Read(file.Csv("b.csv", 1)).Select("select", Field(2,3)) // b2, b3
a.Join("joinByKey", b).Printlnf("%s,%s,%s").Run() // a1, a4, b3
}
Distributed Computing
Setup Gleam Cluster Locally
Start a gleam master and several gleam agents
// start "gleam master" on a server
> go get github.com/chrislusf/gleam/distributed/gleam
> gleam master --address=":45326"
// start up "gleam agent" on some different servers or ports
> gleam agent --dir=2 --port 45327 --host=127.0.0.1
> gleam agent --dir=3 --port 45328 --host=127.0.0.1
Setup Gleam Cluster on Kubernetes
Install Kubernetes tools At the very least you will need a local K8s cluster, Docker & Kubectl. Docker Desktop provides all of this out the box.
Install Skaffold
Choose the appropriate binary here. For example, ARM64:
curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-darwin-arm64 && \
sudo install skaffold /usr/local/bin/
Run Latest Version
cd ./k8s
skaffold run --profile base
Use skaffold delete --profile base to bring the cluster down.
Alternately Build & Run Local Version
You can build a local copy of gleam for development with hot reloading:
cd ./k8s
skaffold dev --profile dev
Change Execution Mode.
After the flow is defined, the Run() function can be executed in local mode or distributed mode.
f := flow.New("")
...
// 1. local mode
f.Run()
// 2. distributed mode
import "github.com/chrislusf/gleam/distributed"
f.Run(distributed.Option())
f.Run(distributed.Option().SetMaster("master_ip:45326"))
Important Features
Related Skills
node-connect
339.5kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
xurl
339.5kA CLI tool for making authenticated requests to the X (Twitter) API. Use this skill when you need to post tweets, reply, quote, search, read posts, manage followers, send DMs, upload media, or interact with any X API v2 endpoint.
frontend-design
83.9kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
339.5kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
