SkillAgentSearch skills...

D2ts

Differential Dataflow in TypeScript

Install / Use

/learn @electric-sql/D2ts
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

<p align="center"> <a href="https://electric-sql.com" target="_blank"> <picture> <source media="(prefers-color-scheme: dark)" srcset="https://raw.githubusercontent.com/electric-sql/meta/main/identity/ElectricSQL-logo-next.svg" /> <source media="(prefers-color-scheme: light)" srcset="https://raw.githubusercontent.com/electric-sql/meta/main/identity/ElectricSQL-logo-black.svg" /> <img alt="ElectricSQL logo" src="https://raw.githubusercontent.com/electric-sql/meta/main/identity/ElectricSQL-logo-black.svg" /> </picture> </a> </p> <p align="center"> <a href="https://github.com/electric-sql/d2ts/actions"><img src="https://github.com/electric-sql/d2ts/actions/workflows/ci.yml/badge.svg"></a> <a href="https://github.com/electric-sql/d2ts/blob/main/LICENSE"><img src="https://img.shields.io/badge/license-Apache_2.0-green" alt="License - Apache 2.0"></a> <a href="https://github.com/electric-sql/d2ts/ ext/milestones"><img src="https://img.shields.io/badge/status-alpha-orange" alt="Status - Alpha"></a> <a href="https://discord.electric-sql.com"><img src="https://img.shields.io/discord/933657521581858818?color=5969EA&label=discord" alt="Chat - Discord"></a> <a href="https://x.com/ElectricSQL" target="_blank"><img src="https://img.shields.io/twitter/follow/ElectricSQL.svg?style=social&label=Follow @ElectricSQL"></a> </p>

D2TS - Differential Dataflow in TypeScript <!-- omit in toc -->

D2TS is a TypeScript implementation of differential dataflow - a powerful data-parallel programming framework that enables incremental computations over changing input data.

You can use D2TS to build data pipelines that can be executed incrementally, meaning you can process data as it comes in, and only recompute the parts that have changed. This could be as simple as remapping data, or as complex as performing a full join combining two datasources where one is a computed aggregate.

D2TS can be used in conjunction with ElectricSQL to build data pipelines on top of ShapeStreams that can be executed incrementally.

A D2TS pipeline is also fully type safe, inferring the types at each step of the pipeline, and supports auto-complete in your IDE.

Table of Contents

Key Features

  • Incremental Processing: Efficiently process changes to input data without recomputing everything
  • Rich Operators: Supports common operations with a pipeline API:
    • buffer: Buffer and emit versions when they are complete
    • concat: Concatenate two streams
    • consolidate: Consolidates the elements in the stream at each version
    • count: Count elements by key
    • distinct: Remove duplicates
    • filter: Filter elements based on predicates
    • iterate: Perform iterative computations
    • join: Join two streams
    • keyBy: Key a stream by a property
    • map: Transform elements in a stream
    • reduce: Aggregate values by key
    • rekey: Change the key of a keyed stream
    • unkey: Remove keys from a keyed stream
    • output: Output the messages of the stream
    • pipe: Build a pipeline of operators enabling reuse of combinations of operators
  • SQLite Integration: Optional SQLite backend for persisting operator state allowing for larger datasets and resumable pipelines
  • Type Safety: Full TypeScript type safety and inference through the pipeline API

Quick Start

Installation

npm install @electric-sql/d2ts

Basic Usage

Here's a simple example that demonstrates the core concepts:

import { D2, map, filter, debug, MultiSet, v } from '@electric-sql/d2ts'

// Create a new D2 graph with initial frontier
// The initial frontier is the lower bound of the version of the data that may
// come in future.
const graph = new D2({ initialFrontier: 0 })

// Create an input stream
// We can specify the type of the input stream, here we are using number.
const input = graph.newInput<number>()

// Build a simple pipeline that:
// 1. Takes numbers as input
// 2. Adds 5 to each number
// 3. Filters to keep only even numbers
// Pipelines can have multiple inputs and outputs.
const output = input.pipe(
  map((x) => x + 5),
  filter((x) => x % 2 === 0),
  debug('output'),
)

// Finalize the pipeline, after this point we can no longer add operators or
// inputs
graph.finalize()

// Send some data
// Data is sent as a MultiSet, which is a map of values to their multiplicity
// Here we are sending 3 numbers (1-3), each with a multiplicity of 1
// When you send data, you set the version number, here we are using 0
// The key thing to understand is that the MultiSet represents a *change* to
// the data, not the data itself. "Inserts" and "Deletes" are represented as
// an element with a multiplicity of 1 or -1 respectively.
input.sendData(
  0, // The version of the data
  new MultiSet([
    [1, 1],
    [2, 1],
    [3, 1],
  ]),
)

// Set the frontier to version 1
// The "frontier" is the lower bound of the version of the data that may come in future.
// By sending a frontier, you are indicating that you are done sending data for any version less than the frontier and therefor D2TS operators that require them can process that data and output the results.
input.sendFrontier(1)

// Process the data
graph.run()

// Output will show:
// 6 (from 1 + 5)
// 8 (from 3 + 5)

Using with ElectricSQL

D2TS can be used in conjunction with ElectricSQL to build data pipelines on top of ShapeStreams that can be executed incrementally.

Here's an example of how to use D2TS with ElectricSQL:

import { D2, map, filter, output } from '@electric-sql/d2ts'
import { electricStreamToD2Input } from '@electric-sql/d2ts/electric'
import { ShapeStream } from '@electric-sql/client'

// Create D2 graph
const graph = new D2({ initialFrontier: 0 })

// Create D2 input
const input = graph.newInput<any>()

// Configure the pipeline
input
  .pipe(
    map(([key, data]) => data.value),
    filter(value => value > 10),
    // ... any other processing / joining
    output((msg) => doSomething(msg))
  )

// Finalize graph
graph.finalize()

// Create Electric stream (example)
const electricStream = new ShapeStream({
  url: 'http://localhost:3000/v1/shape',
  params: {
    table: 'items',
    replica: 'full',  // <-- IMPORTANT!
  }
})

// Connect Electric stream to D2 input
electricStreamToD2Input(electricStream, input)

There is a complete example in the ./examples/electric directory.

Examples

There are a number of examples in the ./examples directory, covering:

API

D2 graph construction

const graph = new D2({ initialFrontier: 0 })

The D2 constructor takes an optional options object with the following properties:

  • initialFrontier: The initial frontier of the graph, defaults to 0

An instance of a D2 graph is used to build a dataflow graph, and has the following main methods:

  • newInput<T>(): IStreamBuilder<T>: Create a new input stream
  • finalize(): void: Finalize the graph, after this point no more operators or inputs can be added
  • run(): void: Process all pending versions of the dataflow graph

Input Streams

Input streams are created using the newInput<T>() method, and have the following methods:

  • sendData(version: Version | number | number[], data: MultiSet<T>): void: Send data to the input stream
  • sendFrontier(version: Antichain | Version | number | number[]): void: Send a frontier to the input stream

Versions and Frontiers

Versions are used to represent the version of the data, and are a lattice of integers. For most use cases you will only need to provide a single integer version, and all apis that take a version will work with a single integer. More advanced use cases may require the use of the latice to track multidimensional versions.

Frontiers are used to represent the lower bound of the version of the data that may come in future, and are an antichain of versions. Again in most cases you can just use a single integer version to represent the frontier.

Version

There is a Version class that represents a version, the prefered way to create a version is using the v helper function as this ensures that you reuse the same object for the same version making equality checks and comparisons more efficient:

const version = v(1)

Multidimensional versions are also supported, and are created using the v helper function:

const version = v([1, 2])

In most cases you will only need to use a single integer version to represent the version which can be passed directly to the sendData and sendFrontier methods:

input.sendData(1, new MultiSet([[1, 1]]))
View on GitHub
GitHub Stars521
CategoryDevelopment
Updated19h ago
Forks9

Languages

TypeScript

Security Score

95/100

Audited on Apr 3, 2026

No findings