Graphbolt
GraphBolt: Dependency-Driven Synchronous Processing of Streaming Graphs
Install / Use
/learn @pdclab/GraphboltREADME
GraphBolt: Dependency-Driven Synchronous Processing of Streaming Graphs
1. What is it?
GraphBolt is an efficient streaming graph processing system that provides Bulk Synchronous Parallel (BSP) guarantees. GraphBolt performs dependency-driven incremental processing which quickly reacts to graph changes, and provides low latency & high throughput processing. [Read more]
GraphBolt, now incorporates the DZiG run-time inorder to perform sparsity-aware incremental processing, thereby pushing the boundary of dependency-driven processing of streaming graphs. [Read more]
For asynchronous algorithms, GraphBolt incorporates KickStarter's light-weight dependency tracking and trimming strategy. [Read more]
2. Getting Started
2.1 Core Organization
The core/graphBolt/ folder contains the GraphBolt Engine, the KickStarter Engine, and our Stream Ingestor module. The application/benchmark codes (e.g., PageRank, SSSP, etc.) can be found in the apps/ directory. Useful helper files for generating the stream of changes (tools/generators/streamGenerator.C), creating the graph inputs in the correct format (tools/converters/SNAPtoAdjConverter.C - from Ligra's codebase), and comparing the output of the algorithms (tools/output_comparators/) are also provided.
2.2 Requirements
- g++ >= 5.3.0 with support for Cilk Plus.
- Mimalloc - A fast general purpose memory allocator from Microsoft (version >= 1.6).
- Use the helper script
install_mimalloc.shto install mimalloc. - Update the LD_PRELOAD enviroment variable as specified by install_mimalloc.sh script.
- Use the helper script
Important: GraphBolt requires mimalloc to function correctly and efficiently.
Note: gcc-5 and gcc-7 come with cilk support by default. You can easily maintain multiple versions of gcc using update-alternatives tool. If you currently have gcc-9, you can easily install gcc-5 and switch to it as follows:
$ # Install gcc-5
$ sudo apt install gcc-5
$ # Set the path for all gcc versions
$ sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-5 50
$ # gcc-9 version
$ sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-9 60
$ # Configure gcc to use gcc-5
$ sudo update-alternatives --config gcc
$ # Verify gcc version
$ gcc --version
2.3 Compiling and Running the Application
Compilation is done from within apps directory. To compile, run
$ cd apps
$ make -j
The executable takes the following command-line parameters:
-s: Optional parameter to indicate a symmetric (undirected) graph is used.-streamPath: Path to the input stream file or pipe (More information on the input format can be found in Section 2.4).-numberOfUpdateBatches: Optional parameter to specify the number of edge updates to be made. Default is 1.-nEdges: Number of edge operations to be processed in a given update batch.-outputFile: Optional parameter to print the output of a given algorithms.- Input graph file path (More information on the input format can be found in Section 2.4).
For example,
$ # Ensure that LD_PRELOAD is set as specified by the install_mimalloc.sh
$ ./PageRank -numberOfUpdateBatches 2 -nEdges 1000 -streamPath ../inputs/sample_edge_operations.txt -outputFile /tmp/output/pr_output ../inputs/sample_graph.adj
$ ./LabelPropagation -numberOfUpdateBatches 3 -nEdges 2000 -streamPath ../inputs/sample_edge_operations.txt -seedsFile ../inputs/sample_seeds_file -outputFile /tmp/output/lp_output ../inputs/sample_graph.adj
$ ./COEM -s -numberOfUpdateBatches 3 -nEdges 2000 -streamPath ../inputs/sample_edge_operations.txt -seedsFile ../inputs/sample_seeds_file -partitionsFile ../inputs/sample_partitions_file -outputFile /tmp/output/coem_output ../inputs/sample_graph.adj
$ ./CF -s -numberOfUpdateBatches 2 -nEdges 10000 -streamPath ../inputs/sample_edge_operations.txt -partitionsFile ../inputs/sample_partitions_file -outputFile /tmp/output/cf_output ../inputs/sample_graph.adj.un
$ ./SSSP -source 0 -numberOfUpdateBatches 1 -nEdges 500 -streamPath ../inputs/sample_edge_operations.txt -outputFile /tmp/output/sssp_output ../inputs/sample_graph.adj
$ ./BFS -source 0 -numberOfUpdateBatches 1 -nEdges 50000 -streamPath ../inputs/sample_edge_operations.txt -outputFile /tmp/output/bfs_output ../inputs/sample_graph.adj
Other additional parameters may be required depending on the algorithm. Refer to the Compute() function in the application code (apps/PageRank.C, apps/SSSP.C etc.) for the supported arguments. Additional configurations for the graph ingestor and the graph can be found in Section 5.
2.4 Graph Input and Stream Input Format
The initial input graph should be in the adjacency graph format. For example, the SNAP format (edgelist) and the adjacency graph format for a sample graph are shown below.
SNAP format:
0 1
0 2
2 0
2 1
Adjacency Graph format:
AdjacencyGraph
3
4
0
2
2
1
2
0
1
You can use tools/converters/SNAPtoAdjConverter to convert an input graph in Edgelist format (SNAP format) to the adjacency graph format, as follows:
$ ./SNAPtoAdjConverter inputGraph.snap inputGraph.adj
$ # for undirected (symmetric) graphs, use the -s flag
$ ./SNAPtoAdjConverter -s inputGraph.snap inputGraphUndirected.adj
The streaming input file should have the edge operation (addition/deletion) on a separate line. The edge operation should be of the format, [d/a] source destination where d indicates edge deletion and a indicates edge addition. Example streaming input file:
a 1 2
d 2 3
a 4 5
...
Edge operations can be streamed through a pipe using tools/generators/streamGenerator.C. It takes in the following command-line parameters:
-edgeOperationsFile: Input file containing the edge operations in the format mentioned above.-outputPipe: Path of the output pipe where the edges are streamed to.
$ cd tools/generators
$ make streamGenerator
$ ./streamGenerator -edgeOperationsFile ../inputs/sample_edge_operations.txt -outputPipe ../inputs/sample_edge_operations.pipe
More details regarding the ingestor can be found in Section 5. Information regarding weighted graphs can be found in Section 6.
3. GraphBolt Engine
The GraphBolt engine provides Bulk Synchronous Parallel (BSP) guarantees while incrementally processing streaming graphs.
3.1 Creating Applications using the GraphBolt Engine
A key design decision of the GraphBolt framework is to ensure that the application code remains oblivious to GraphBolt's internal subtleties while still providing fast performance.
So, the application code only needs to express its computation using the following functions. More details regarding these functions can be found in the inline comments of GraphBoltEngine.h.
AggregateValue and VertexValue initialization:
- initializeAggregationValue()
- initializeVertexValue()
- aggregationValueIdentity()
- vertexValueIdentity()
GraphBolt stores information for each vertex in the form of aggregation values. So, first, the user should identify the aggregation value and the vertex value for the algorithm. For example in PageRank, the vertex value is its pagerank (PR) and the aggregation value is the sum of (PR[u]/out_Degree[u]) values from all its inNeighbors.
Activate vertex / Compute vertex for a given iteration:
- forceActivateVertexForIteration()
- forceComputeVertexForIteration()
- shouldUseDelta()
In iterative graph algorithms, at a given iteration i, a set of vertices will push some value to their outNeighbors. These are the active vertices for that iteration. The outNeighbors which receive these values will then compute their updated values. The following functions are provided to force a vertex to be either active/compute at a given iteration. For example, in Label Propagation, all the vertices should compute their values at each iteration irrespective of whether they receive any new changes from their inNeighbors at that iteration (refer apps/LabelPropagation.C).
Add to or remove from aggregation:
- addToAggregation()
- addToAggregationAtomic()
- removeFromAggregation()
- removeFromAggregationAtomic()
These are the functions used to add a value to or remove some value from the aggregation value. For sum, it is simply adding and subtracting the values from the aggregation value passed. Note that addToAggregationAtomic() and removeFromAggregationAtomic() will be called by multiple threads on the same aggregation value. So, the update should be performed atomically using CAS.
Edge functions:
- sourceChangeInContribution()
- edgeFunction()
- edgeFunctionDelta()
The edge operation is split into 3 phases:
- Determine the source contribution - The computations for a given vertex which are dependent only on the source values are performed here. For example, in PageRank, a vertex
uadds the valuePR[u]/out_degree[u]to the aggregation value of all its outNeighbors. Since this computation ofPR[u]/out_degree[u]is common for processing all the outEdges ofu, we can compute this value (contribution of the source vertex) only once and perform the addition for all outEdges. - Transform the contribution depending on the edge data - In this step, the source vertex cont
