SkillAgentSearch skills...

ForkUnion

Lower-latency OpenMP-style minimalistic scoped thread-pool designed for 'Fork-Join' parallelism in Rust and C++, avoiding memory allocations, mutexes, CAS-primitives, and false-sharing on the hot path 🍴

Install / Use

/learn @ashvardanian/ForkUnion

README

Fork Union 🍴

Fork Union is arguably the lowest-latency OpenMP-style NUMA-aware minimalistic scoped thread-pool designed for 'Fork-Join' parallelism in C++, C, and Rust, avoiding × mutexes & system calls, × dynamic memory allocations, × CAS-primitives, and × false-sharing of CPU cache-lines on the hot path 🍴

Motivation

Most "thread-pools" are not, in fact, thread-pools, but rather "task-queues" that are designed to synchronize a concurrent dynamically growing list of heap-allocated globally accessible shared objects. In C++ terms, think of it as a std::queue<std::function<void()>> protected by a std::mutex, where each thread waits for the next task to be available and then executes it on some random core chosen by the OS scheduler. All of that is slow... and true across C++, C, and Rust projects. Short of OpenMP, practically every other solution has high dispatch latency and noticeable memory overhead. OpenMP, however, is not ideal for fine-grained parallelism and is less portable than the C++ and Rust standard libraries.

fork_union banner

This is where fork_union comes in. It's a C++ 17 library with C 99 and Rust bindings (previously Rust implementation was standalone in v1). It supports pinning threads to specific NUMA nodes or individual CPU cores, making it much easier to ensure data locality and halving the latency of individual loads in Big Data applications.

Basic Usage

Fork Union is dead-simple to use! There is no nested parallelism, exception handling, or "future promises"; they are banned. The thread pool itself has a few core operations:

  • try_spawn to initialize worker threads, and
  • for_threads to launch a blocking callback on all threads.

Higher-level APIs for index-addressable tasks are also available:

  • for_n - for individual evenly-sized tasks,
  • for_n_dynamic - for individual unevenly-sized tasks,
  • for_slices - for slices of evenly-sized tasks.

For additional flow control and tuning, following helpers are available:

  • sleep(microseconds) - for longer naps,
  • terminate - to kill the threads before the destructor is called,
  • unsafe_for_threads - to broadcast a callback without blocking,
  • unsafe_join - to block until the completion of the current broadcast.

On Linux, in C++, given the maturity and flexibility of the HPC ecosystem, it provides NUMA extensions. That includes the linux_colocated_pool analog of the basic_pool and the linux_numa_allocator for allocating memory on a specific NUMA node. Those are out-of-the-box compatible with the higher-level APIs. Most interestingly, for Big Data applications, a higher-level distributed_pool class will address and balance the work across all NUMA nodes.

Intro in Rust

To integrate into your Rust project, add the following lines to Cargo.toml:

[dependencies]
fork_union = "2.3.1"                                    # default
fork_union = { version = "2.3.1", features = ["numa"] } # with NUMA support on Linux

Or for the preview development version:

[dependencies]
fork_union = { git = "https://github.com/ashvardanian/fork_union.git", branch = "main-dev" }

A minimal example may look like this:

use fork_union as fu;
let mut pool = fu::spawn(2);
pool.for_threads(|thread_index, colocation_index| {
    println!("Hello from thread # {} on colocation # {}", thread_index + 1, colocation_index + 1);
});

Higher-level APIs distribute index-addressable tasks across the threads in the pool:

pool.for_n(100, |prong| {
    println!("Running task {} on thread # {}",
        prong.task_index + 1, prong.thread_index + 1);
});
pool.for_slices(100, |prong, count| {
    println!("Running slice [{}, {}) on thread # {}",
        prong.task_index, prong.task_index + count, prong.thread_index + 1);
});
pool.for_n_dynamic(100, |prong| {
    println!("Running task {} on thread # {}",
        prong.task_index + 1, prong.thread_index + 1);
});

A more realistic example with named threads and error handling may look like this:

use std::error::Error;
use fork_union as fu;

fn heavy_math(_: usize) {}

fn main() -> Result<(), Box<dyn Error>> {
    let mut pool = fu::ThreadPool::try_spawn(4)?;
    let mut pool = fu::ThreadPool::try_named_spawn("heavy-math", 4)?;
    pool.for_n_dynamic(400, |prong| {
        heavy_math(prong.task_index);
    });
    Ok(())
}

For advanced usage, refer to the NUMA section below. For convenience Rayon-style parallel iterators pull the prelude module and check out related examples.

Intro in C++

To integrate into your C++ project, either just copy the include/fork_union.hpp file into your project, add a Git submodule, or CMake. For a Git submodule, run:

git submodule add https://github.com/ashvardanian/fork_union.git extern/fork_union

Alternatively, using CMake:

FetchContent_Declare(
    fork_union
    GIT_REPOSITORY https://github.com/ashvardanian/fork_union
    GIT_TAG v2.3.1
)
FetchContent_MakeAvailable(fork_union)
target_link_libraries(your_target PRIVATE fork_union::fork_union)

Then, include the header in your C++ code:

#include <fork_union.hpp>   // `basic_pool_t`
#include <cstdio>           // `stderr`
#include <cstdlib>          // `EXIT_SUCCESS`

namespace fu = ashvardanian::fork_union;

int main() {
    alignas(fu::default_alignment_k) fu::basic_pool_t pool;
    if (!pool.try_spawn(std::thread::hardware_concurrency())) {
        std::fprintf(stderr, "Failed to fork the threads\n");
        return EXIT_FAILURE;
    }

    // Dispatch a callback to each thread in the pool
    pool.for_threads([&](std::size_t thread_index) noexcept {
        std::printf("Hello from thread # %zu (of %zu)\n", thread_index + 1, pool.threads_count());
    });

    // Execute 1000 tasks in parallel, expecting them to have comparable runtimes
    // and mostly co-locating subsequent tasks on the same thread. Analogous to:
    //
    //      #pragma omp parallel for schedule(static)
    //      for (int i = 0; i < 1000; ++i) { ... }
    //
    // You can also think about it as a shortcut for the `for_slices` + `for`.
    pool.for_n(1000, [](std::size_t task_index) noexcept {
        std::printf("Running task %zu of 1000\n", task_index + 1);
    });
    pool.for_slices(1000, [](std::size_t first_index, std::size_t count) noexcept {
        std::printf("Running slice [%zu, %zu)\n", first_index, first_index + count);
    });

    // Like `for_n`, but each thread greedily steals tasks, without waiting for  
    // the others or expecting individual tasks to have same runtimes. Analogous to:
    //
    //      #pragma omp parallel for schedule(dynamic, 1)
    //      for (int i = 0; i < 3; ++i) { ... }
    pool.for_n_dynamic(3, [](std::size_t task_index) noexcept {
        std::printf("Running dynamic task %zu of 3\n", task_index + 1);
    });
    return EXIT_SUCCESS;
}

For advanced usage, refer to the NUMA section below. NUMA detection on Linux defaults to AUTO. Override with -D FORK_UNION_ENABLE_NUMA=ON or OFF.

Alternatives & Differences

Many other thread-pool implementations are more feature-rich but have different limitations and design goals.

Those are not designed for the same OpenMP-like use cases as fork_union. Instead, they primarily focus on task queuing, which requires significantly more work.

Locks and Mutexes

Unlike the std::atomic, the std::mutex is a system call, and it can be expensive to acquire and release. Its implementations generally have 2 executable paths:

  • the fast path, where the mutex is not contended, where it first tries to grab the mutex via a compare-and-swap operation, and if it succeeds, it returns immediately.
  • the slow path, where the mutex is contended, and it has to go through the kernel to block the thread until the mutex is available.

On Linux, the latter translates to "futex" "system calls", which is expensive.

Memory Allocations

C++ has rich functionality for concurrent applications, like std::future, std::packaged_task, std::function, std::queue, std::condition_variable, and so on. Most of those, I believe, are unusable in Big-Data applications, where you always operate in memory-constrained environments:

  • The idea of raising a std::bad_alloc exception when there is no memory left and just hoping that someone up the call stack will catch it is not a great design idea for any Systems Engineering.
  • The threat of having to synchronize ~200 physical CPU cores across 2-8 sockets and potentially dozens of NUMA nodes around a shared global memory allocator practically means you can't have predictable performance.

As we focus on a simpler ~~concurrency~~ parallelism model, we can avoid the complexity of allocating shared states, wrapping callbacks into some heap-all

View on GitHub
GitHub Stars317
CategoryDesign
Updated1d ago
Forks23

Languages

C++

Security Score

100/100

Audited on Mar 22, 2026

No findings