Wingfoil
graph based stream processing framework
Install / Use
/learn @wingfoil-io/WingfoilREADME
Wingfoil
Wingfoil is a blazingly fast, highly scalable stream processing framework designed for latency-critical use cases such as electronic trading and real-time AI systems.
Wingfoil simplifies receiving, processing and distributing streaming data across your entire stack.
Features
- Fast: Ultra-low latency and high throughput with an efficient DAG based execution engine.
- Simple and obvious to use: Define your graph of calculations; Wingfoil manages its execution.
- Multi-language: currently available as a Rust crate and as a beta release, python package with plans to add WASM/JavaScript/TypeScript support.
- Backtesting: Replay historical data to backtest and optimise strategies.
- Async/Tokio: seamless integration, allows you to leverage async at your graph edges.
- Multi-threading: distribute graph execution across cores.
- I/O Adapters: production-ready KDB+ integration for tick data, CSV, ZeroMQ pub/sub messaging (beta), etc.
Quick Start
In this example we build a simple, linear pipeline with all nodes ticking in lock-step.
use wingfoil::*;
use std::time::Duration;
fn main() {
let period = Duration::from_secs(1);
ticker(period)
.count()
.map(|i| format!("hello, world {:}", i))
.print()
.run(RunMode::RealTime, RunFor::Duration(period*3)
);
}
This output is produced:
hello, world 1
hello, world 2
hello, world 3
Order Book Example
Wingfoil lets you easily wire up complex business logic, splitting and recombining streams, and altering the frequency of data. I/O adapters make it easy to plug in real data sources and sinks. In this example we load a CSV of AAPL limit orders, maintain an order book using the lobster crate, derive trades and two-way prices, and export back to CSV — all in a few lines:
let book = RefCell::new(lobster::OrderBook::default());
let get_time = |msg: &Message| NanoTime::new((msg.seconds * 1e9) as u64);
let (fills, prices) = csv_read_vec("aapl.csv", get_time, true)
.map(move |chunk| process_orders(chunk, &book))
.split();
let prices_export = prices
.filter_value(|price: &Option<TwoWayPrice>| !price.is_none())
.map(|price| price.unwrap())
.distinct()
.csv_write("prices.csv");
let fills_export = fills.csv_write_vec("fills.csv");
Graph::new(vec![prices_export, fills_export], RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.print()
.run()
.unwrap();
This output is produced:
<div align="center"> <img alt="diagram" src="https://raw.githubusercontent.com/wingfoil-io/wingfoil/refs/heads/main/wingfoil/diagrams/aapl.svg"/> </div>KDB+ Example
Define a typed struct, implement KdbDeserialize to map rows, and stream time-sliced queries directly into your graph:
#[derive(Debug, Clone, Default)]
struct Price {
sym: Sym,
mid: f64,
}
kdb_read::<Price, _>(
conn,
Duration::from_secs(10),
|(t0, t1), _date, _iter| {
format!(
"select time, sym, mid from prices \
where time >= (`timestamp$){}j, time < (`timestamp$){}j",
t0.to_kdb_timestamp(),
t1.to_kdb_timestamp(),
)
},
)
.logged("prices", Info)
.run(
RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(0)),
RunFor::Duration(Duration::from_secs(100)),
)?;
Links
- Checkout the examples
- Download from crates.io
- Read the documentation
- Review the benchmarks
- Download the wingfoil Python module from pypi.org
Get Involved!
We want to hear from you! Especially if you:
- are interested in contributing
- know of a project that wingfoil would be well-suited for
- would like to request a feature or report a bug
- have any feedback
Please do get in touch:
- ping us on discord
- email us at hello@wingfoil.io
- submit an issue
- get involved in the discussion
