SkillAgentSearch skills...

Wingfoil

graph based stream processing framework

Install / Use

/learn @wingfoil-io/Wingfoil

README

CI Crates.io Version Docs.rs PyPI - Version Documentation Status

Wingfoil

Wingfoil is a blazingly fast, highly scalable stream processing framework designed for latency-critical use cases such as electronic trading and real-time AI systems.

Wingfoil simplifies receiving, processing and distributing streaming data across your entire stack.

Features

  • Fast: Ultra-low latency and high throughput with an efficient DAG based execution engine.
  • Simple and obvious to use: Define your graph of calculations; Wingfoil manages its execution.
  • Multi-language: currently available as a Rust crate and as a beta release, python package with plans to add WASM/JavaScript/TypeScript support.
  • Backtesting: Replay historical data to backtest and optimise strategies.
  • Async/Tokio: seamless integration, allows you to leverage async at your graph edges.
  • Multi-threading: distribute graph execution across cores.
  • I/O Adapters: production-ready KDB+ integration for tick data, CSV, ZeroMQ pub/sub messaging (beta), etc.

Quick Start

In this example we build a simple, linear pipeline with all nodes ticking in lock-step.

use wingfoil::*;
use std::time::Duration;
fn main() {
    let period = Duration::from_secs(1);
    ticker(period)
        .count()
        .map(|i| format!("hello, world {:}", i))
        .print()
        .run(RunMode::RealTime, RunFor::Duration(period*3)
    );
}

This output is produced:

hello, world 1
hello, world 2
hello, world 3

Order Book Example

Wingfoil lets you easily wire up complex business logic, splitting and recombining streams, and altering the frequency of data. I/O adapters make it easy to plug in real data sources and sinks. In this example we load a CSV of AAPL limit orders, maintain an order book using the lobster crate, derive trades and two-way prices, and export back to CSV — all in a few lines:

let book = RefCell::new(lobster::OrderBook::default());
let get_time = |msg: &Message| NanoTime::new((msg.seconds * 1e9) as u64);
let (fills, prices) = csv_read_vec("aapl.csv", get_time, true)
    .map(move |chunk| process_orders(chunk, &book))
    .split();
let prices_export = prices
    .filter_value(|price: &Option<TwoWayPrice>| !price.is_none())
    .map(|price| price.unwrap())
    .distinct()
    .csv_write("prices.csv");
let fills_export = fills.csv_write_vec("fills.csv");
Graph::new(vec![prices_export, fills_export], RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
    .print()
    .run()
    .unwrap();

This output is produced:

<div align="center"> <img alt="diagram" src="https://raw.githubusercontent.com/wingfoil-io/wingfoil/refs/heads/main/wingfoil/diagrams/aapl.svg"/> </div>

Full example.

KDB+ Example

Define a typed struct, implement KdbDeserialize to map rows, and stream time-sliced queries directly into your graph:

#[derive(Debug, Clone, Default)]
struct Price {
    sym: Sym,
    mid: f64,
}

kdb_read::<Price, _>(
    conn,
    Duration::from_secs(10),
    |(t0, t1), _date, _iter| {
        format!(
            "select time, sym, mid from prices \
             where time >= (`timestamp$){}j, time < (`timestamp$){}j",
            t0.to_kdb_timestamp(),
            t1.to_kdb_timestamp(),
        )
    },
)
.logged("prices", Info)
.run(
    RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(0)),
    RunFor::Duration(Duration::from_secs(100)),
)?;

Full example.

Links

Get Involved!

We want to hear from you! Especially if you:

  • are interested in contributing
  • know of a project that wingfoil would be well-suited for
  • would like to request a feature or report a bug
  • have any feedback

Please do get in touch:

View on GitHub
GitHub Stars148
CategoryDevelopment
Updated6h ago
Forks19

Languages

Rust

Security Score

85/100

Audited on Mar 31, 2026

No findings