SkillAgentSearch skills...

Conduit

C++ library that wraps intra-thread, inter-thread, and inter-process communication in a uniform, modular, object-oriented interface, with a focus on asynchronous high-performance computing applications.

Install / Use

/learn @mmore500/Conduit
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

conduit logo

version Codacy Badge continuous integration documentation status documentation coverage code coverage status DockerHub link Lines of Code Comments dotos GitHub stars DOI

C++ library that wraps intra-thread, inter-thread, and inter-process communication in a uniform, modular, object-oriented interface, with a focus on asynchronous high-performance computing applications.

Design

The driving objective behind this library is to provide a performant, uniform, convenient interface for communication between simulation elements, whether those simulation elements reside on the same thread, different threads, or entirely different processes.

Inlet and Outlet holding a shared Duct with intra-thread implementation active

The conduit model consists of:

  • Inlet's, which accepts inputs T through a non-blocking call,
  • Duct's, which handle transmission logistics,
  • Outlet's, which provides the latest T or next T through a non-blocking call.

Inlet and Outlet objects both hold a std::shared_ptr to a Duct object. The Duct object is implemented as a std::variant of three implementation types:

  • IntraDuct type for intra-thread communication (default),
  • ThreadDuct type one for inter-thread communication, and
  • ProcDuct type for inter-process communication.

The Duct's active implementation can be switched at run-time by calling EmplaceDuct<Type> from either the Inlet or the Outlet. All calls to a Duct at run-time are forwarded to its active implementation. For example, emplacing a ThreadDuct might yield the following.

Inlet and Outlet holding a shared Duct with inter-thread implementation active

Calling SplitDuct<Type> from either the Inlet or the Outlet will drop the callee's std::shared_ptr to the existing Duct in favor of a std::shared_ptr to a newly-constructed Duct with the specified implementation type active. (This operation is useful for inter-process communication, where coupled Inlet and Outlet's do not reside in a common memory space). For example, calling SplitDuct<ProcDuct> on an Inlet might yield the following.

Inlet and Outlet holding separate Ducts

Inlet and Outlet are entirely interchangeable no matter the current Duct implementation is active. Once a Duct's are configured, Inlet and Outlet objects can be without any concern for underlying implementation. This abstraction ensures a uniform API whether underlying communication is intra-thread, inter-thread, or inter-process. Furthermore, a Duct implementation can be re-configured or even re-directed at run time without any interaction with an Inlet or Outlet its tied to.

Low-Level Interface: uit

Conduit provides three helper construction interfaces:

  • Conduit, which constructs an Inlet and Outlet with a shared Duct,
  • Sink, which constructs an Inlet with sole holdership of a Duct,
  • Source, which constructs an Outlet with sole holdership of a Duct.

comparison of Conduit, Sink, and Source

After constructing a Conduit, Sink, or Source, users can use structured binding or an accessor method to retrieve Inlet's or Outlet's.

Here's an example of how this works in code.

conduit/low.cpp:

#include <iostream>
#include <ratio>
#include <utility>

#include "uit/fixtures/Conduit.hpp"
#include "uitsl/parallel/ThreadTeam.hpp"
#include "uit/setup/ImplSpec.hpp"

// use int as message type
using Spec = uit::ImplSpec<int>;

int main() {

  // construct conduit with thread-safe implementation active
  uit::Conduit<Spec> conduit{
    std::in_place_type_t<Spec::ThreadDuct>{}
  };

  auto& [inlet, outlet] = conduit;

  uitsl::ThreadTeam team;

  // start a producer thread
  team.Add( [&inlet](){
    for (int i = 0; i < std::mega::num; ++i) inlet.TryPut(i);
  } );

  // start a consumer thread
  team.Add( [&outlet](){
    int prev{ outlet.JumpGet() };
    size_t update_counter{};
    for (size_t i = 0; i < std::mega::num; ++i) {
      update_counter += std::exchange(prev, outlet.JumpGet()) == prev;
    }
    std::cout << update_counter << " updates detected" << '\n';
  } );

  // wait for threads to complete
  team.Join();

  return 0;

}

Navigate to the conduit directory. Then, to compile and run,

mpicxx --std=c++17 -O3 -DNDEBUG -Iinclude/ low.cpp -lpthread
./a.out

:bangbang: You'll need an MPI compiler and runtime library for the code examples here. If you don't have those on hand, grab a copy of our pre-built Docker container and hop inside there.

sudo docker run -it mmore500/conduit:latest

If you're on a cluster without root access, you can try using Singularity.

singularity shell docker://mmore500/conduit

High-Level Interface: uitnet

The conduit library provides a Mesh interface to streamline construction of complex, potentially irregular, conduit networks. These networks are conceived as a directed graph, with edges representing conduits and nodes representing an actor that holds a set of Inlet's and/or Outlet's.

Meshes are constructed through two independently-specified components,

  1. topology: how should nodes be connected?
  2. delegation: how should nodes be assigned to threads and processes?

Here's an example topology, with each node connected to a successor in a one-dimensional ring.

unidirectional ring graph

We might choose to delegate contiguous subsets of nodes to threads and processes. For example, to distribute 24 nodes over four double-threaded processes, we might perform the following assignment:

  • node 0 :arrow_right: thread 0, process 0
  • node 1 :arrow_right: thread 0, process 0
  • node 2 :arrow_right: thread 0, process 0
  • node 3 :arrow_right: thread 1, process 0
  • node 4 :arrow_right: thread 1, process 0
  • node 5 :arrow_right: thread 1, process 0
  • node 6 :arrow_right: thread 0, process 1
  • node 7 :arrow_right: thread 0, process 1
  • node 8 :arrow_right: thread 0, process 1
  • node 9 :arrow_right: thread 1, process 1
  • etc.

graph nodes assigned to threads and processes

Arbitrary topologies can be specified, with pre-built factories available to construct the most common configurations. For example, a two-dimensional lattice grid,

grid lattice graph

We can use a specialized delegation function to distribute nodes.

graph nodes assigned to threads and processes

When a Mesh is constructed from a topology and a delegation function, edges between nodes are instantiated in terms of Inlet's and Outlet's. During Mesh construction, thread-safe Duct implementations are emplaced on conduits that span between nodes assigned to different threads and inter-process Duct implementations are emplaced on conduits that span between nodes assigned to different proceses.

Once the Mesh is constructed, GetSubmesh() returns the network components that are assigned to a particular thread or process.

nodes within a particular thread on a particular process

The GetSubmesh() call returns an emp::vector of MeshNode's. Each MeshNode consists of an "input" vector of Outlet's and an "output" vector of Inlet's.

mesh node with constituent input outlets and output inlets

Here's what the entire process looks like in code.

conduit/high.cpp:

#include <iostream>
#include <tuple>
#include <sstream>

#include "uitsl/mpi/MpiGuard.hpp"
#include "uitsl/parallel/ThreadTeam.hpp"

#include "uit/setup/ImplSpec.hpp"

#include "netuit/arrange/RingTopologyFactory.hpp"
#include "netuit/mesh/Mesh.hpp"

const size_t num_nodes = 5; // five nodes in our topology
const size_t num_procs = 2; // two MPI processes
const size_t num_threads = 2; // two threads per process

// m
View on GitHub
GitHub Stars13
CategoryDevelopment
Updated7mo ago
Forks4

Languages

C++

Security Score

87/100

Audited on Aug 29, 2025

No findings