Pulse
Pulse is a tiny real-time data streaming framework (mini Flink/Beam) in Rust. Async (Tokio), pluggable operators, state, and I/O. Fast, modular, local-first.
Install / Use
/learn @brbtavares/PulseREADME
Pulse
| Crate | crates.io | docs.rs |
|-------------|---------------------------------------------------------------------------|------------------------------------------------------|
| pulse-core | |
|
| pulse-state |
|
|
| pulse-ops |
|
|
| pulse-io |
|
|
Pulse is a tiny, modular, event-time streaming framework (Flink/Beam-like) written in Rust. It focuses on clarity, testability, and a local-first workflow. It supports watermarks, windowing, pluggable state, Prometheus metrics, and a single-binary CLI that runs pipelines from a TOML file.
Goals:
- Local-first: zero external services required for the common path
- Stream processing with event-time and watermarks
- Configurable windowing (tumbling/sliding/session) also in CLI
- Pluggable state backends (in-memory and optional RocksDB)
- File and Parquet I/O, plus optional Kafka
- First-class observability (tracing/Prometheus)
Local-first vision
- Single binary you can run on your laptop or inside a container without standing up a cluster or control plane.
- Files-in/files-out by default; opt into Kafka when needed.
- Deterministic replay support (EOF watermark) so you can iterate quickly and write golden tests.
- Ergonomics first: small, readable codebase; simple config; sensible defaults.
Workspace structure
pulse-core: core types/traits,Executor,Record, event-timeWatermark, timers, metrics, and config loaderpulse-ops: operators (Map,Filter,KeyBy,Aggregate,WindowedAggregate), event-time window helperspulse-state: state backendsInMemoryState(default)RocksDbState(featurerocksdb)
pulse-io: sources/sinksFileSource(JSONL/CSV) with EOF watermarkFileSink(JSONL)ParquetSink(featureparquet) with date partitioning and rotationKafkaSource/KafkaSink(featurekafka) with resume from persisted offsets
pulse-examples: runnable examples and sample datapulse-bin: CLI (pulse) and/metricsHTTP server
Core features
Event-time & watermarks
Record { event_time: chrono::DateTime<Utc>, value: serde_json::Value }Watermark(EventTime)propagated through the pipeline- Executor drives
on_watermarkfor operators and informs sinks - Lag metric (
pulse_watermark_lag_ms) = now - watermark
Semantics overview:
- Event-time is derived from your data (RFC3339 string or epoch ms).
- Sources advance a low watermark to signal “no more records ≤ t expected”.
- Operators emit window results when
watermark >= window.end. - FileSource emits a final EOF watermark far in the future to flush all windows in batch-like runs.
Windowing
- Operators:
WindowedAggregatesupports Tumbling/Sliding/Session - The CLI supports tumbling, sliding, and session with aggregations:
count,sum,avg,distinct - EOF watermark emitted by
FileSourceflushes windows
State & snapshots
KvStatetrait:get/put/delete/iter_prefix/snapshot/restorepulse-state::InMemoryStateimplements full API (snapshots kept in-memory)pulse-state::RocksDbState(featurerocksdb): prefix iteration and checkpoint directory creation via RocksDB CheckpointWindowOperatorcan persist per-window state via a backend and restore after restart (optional hook)
Guarantees (MVP)
- Single-node at-least-once: each record is processed at least once; exactly-once is not guaranteed.
- FileSource is deterministic (EOF watermark). KafkaSource commits offsets periodically (configurable); on restart, offsets are available in KvState for recovery logic.
- Operator state updates and sink writes are not transactional as a unit (no two-phase commit in MVP).
Limitations (MVP)
- No cluster/distributed runtime yet (single process, single binary).
- No SQL/DSL planner; define pipelines in Rust or via TOML.
- Checkpoint/resume orchestration is minimal: offsets/snapshots exist, but full CLI-driven recovery is a follow-up.
- Kafka is optional and depends on native
librdkafka.
I/O
FileSource(JSONL/CSV): parsesevent_timefrom RFC3339 or epoch ms; final watermark at EOFFileSink: writes JSON lines to stdout/fileParquetSink(featureparquet):- Schema:
event_time: timestamp(ms),payload: utf8(full JSON) - Partitioning:
- By date (default):
out_dir/dt=YYYY-MM-DD/part-*.parquet(configurable format viapartition_format) - By field:
out_dir/<field>=<value>/part-*.parquet(setpartition_field)
- By date (default):
- Rotation: by row-count, time, and optional bytes (
max_bytes) - Compression:
snappy(default),zstd, ornone - Tested: writes files then read back via Arrow reader, asserting row counts
- Schema:
KafkaSource/KafkaSink(featurekafka): integration withrdkafka, with resuming offsets from persisted state
Observability
- Tracing spans on operators (receive/emit) using
tracing - Prometheus metrics (via
pulse-core::metrics):pulse_operator_records_total{operator,stage=receive|emit}pulse_watermark_lag_ms(gauge)pulse_bytes_written_total{sink}pulse_state_size{operator}pulse_operator_process_latency_ms(histogram)pulse_sink_process_latency_ms(histogram)pulse_queue_depth(gauge)pulse_dropped_records_total{reason}(counter)
/metricsHTTP endpoint served bypulse-bin(axum 0.7)
CLI: pulse
Binary crate: pulse-bin. Subcommands:
-
pulse serve --port 9898- Serves
/metricsin Prometheus format.
- Serves
-
pulse run --config pipeline.toml [--http-port 9898]- Loads a TOML config, validates it, builds the pipeline, and runs until EOF (or Ctrl-C if you wire a streaming source).
- If
--http-portis provided, starts/metricson that port. - Optional backpressure (soft-bound) via environment: set
PULSE_CHANNEL_BOUND(e.g.,PULSE_CHANNEL_BOUND=10000) to drop new records when the in-flight depth reaches the bound. Watermarks are never dropped.
Config format (pulse-core::config)
[source]
kind = "file"
path = "pulse-examples/examples/sliding_avg.jsonl"
time_field = "event_time"
[time]
allowed_lateness = "10s"
[window]
# supported: tumbling|sliding|session
type = "sliding"
size = "60s"
slide = "15s" # for sliding; for session, use: gap = "30s"
[ops]
# aggregation over a key; supported: count (default), sum, avg, distinct
count_by = "word"
# agg = "count" # default
# agg_field = "value" # obrigatório para sum|avg|distinct
[sink]
kind = "parquet"
out_dir = "outputs"
## Optional Parquet settings
# compression = "snappy" # one of: snappy (default) | zstd | none
# max_bytes = 104857600 # rotate file when ~bytes reached (e.g. 100MB)
# partition_field = "user_id" # partition by a payload field value
# partition_format = "%Y-%m" # date partition format when partitioning by event_time
Validation rules:
source.kindmust befile(orkafka)sink.kindmust beparquet/file(orkafka)ops.count_bymust be present
Example: run from config
# Build
cargo build
# Run the pipeline and export metrics
cargo run -p pulse-bin -- run --config examples/pipeline.toml --http-port 9898
# Scrape metrics
curl http://127.0.0.1:9898/metrics
Expected output:
- Parquet files created under
outputs/dt=YYYY-MM-DD/part-*.parquet.
Example: CSV → Parquet
CLI supports JSONL and CSV via source.format.
- Direct CSV in the CLI:
[source]
kind = "file"
format = "csv" # jsonl | csv
path = "input.csv"
time_field = "event_time" # epoch ms in CSV
[time]
allowed_lateness = "10s"
[window]
type = "tumbling"
size = "60s"
[ops]
count_by = "word"
[sink]
kind = "parquet"
out_dir = "outputs"
- Use the Rust API directly:
use pulse_core::Executor;
use pulse_io::{FileSource, ParquetSink, ParquetSinkConfig, PartitionSpec};
use pulse_ops::{KeyBy, WindowedAggregate};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// CSV source: header must include event_time (epoch ms)
let src = FileSource { path: "input.csv".into(), format: pulse_io::FileFormat::Csv, event_time_field: "event_time".into(), text_field: None };
let mut exec = Executor::new();
exec.source(src)
.operator(KeyBy::new("word"))
.operator(WindowedAggregate::tumbling_count("key", 60_000))
.sink(ParquetSink::new(ParquetSinkConfig {
out_dir: "outputs".into(),
partition_by: PartitionSpec::ByDate { field: "event_time".into(), fmt: "%Y-%m-%d".into() },
max_rows: 1_000_000,
max_age: std::time::Duration::from_secs(300),
}));
exec.run().await?;
Ok(())
}
Examples: other aggregations in the CLI
[source]
kind = "file"
path = "pulse-examples/examples/sliding_avg.jsonl"
time_field = "event_time"
[time]
allowed_lateness = "10s"
[window]
type = "tumbling"
size = "60s"
[ops]
count_by =
Related Skills
himalaya
344.1kCLI to manage emails via IMAP/SMTP. Use `himalaya` to list, read, write, reply, forward, search, and organize emails from the terminal. Supports multiple accounts and message composition with MML (MIME Meta Language).
node-connect
344.1kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
96.8kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
coding-agent
344.1kDelegate coding tasks to Codex, Claude Code, or Pi agents via background process
