Uniffle
Uniffle is a high performance, general purpose Remote Shuffle Service.
Install / Use
/learn @apache/UniffleREADME
Apache Uniffle
Uniffle is a high performance, general purpose remote shuffle service for distributed computing engines. It provides the ability to push shuffle data into centralized storage service, changing the shuffle style from "local file pull-like style" to "remote block push-like style". It brings in several advantages like supporting disaggregated storage deployment, super large shuffle jobs, and high elasticity. Currently it supports Apache Spark, Apache Hadoop MapReduce and Apache Tez.
Architecture
Uniffle cluster consists of three components, a coordinator cluster, a shuffle server cluster and an optional remote storage (e.g., HDFS).
Coordinator will collect the status of shuffle servers and assign jobs based on some strategy.
Shuffle server will receive the shuffle data, merge them and write to storage.
Depending on different situations, Uniffle supports Memory & Local, Memory & Remote Storage(e.g., HDFS), Memory & Local & Remote Storage(recommendation for production environment).
Shuffle Process with Uniffle
- Spark driver ask coordinator to get shuffle server for shuffle process
- Spark task write shuffle data to shuffle server with following step:

- Send KV data to buffer
- Flush buffer to queue when buffer is full or buffer manager is full
- Thread pool get data from queue
- Request memory from shuffle server first and send the shuffle data
- Shuffle server cache data in memory first and flush to queue when buffer manager is full
- Thread pool get data from queue
- Write data to storage with index file and data file
- After write data, task report all blockId to shuffle server, this step is used for data validation later
- Store taskAttemptId in MapStatus to support Spark speculation
- Depending on different storage types, the spark task will read shuffle data from shuffle server or remote storage or both of them.
Shuffle file format
The shuffle data is stored with index file and data file. Data file has all blocks for a specific partition and the index file has metadata for every block.

Supported Spark Version
Currently supports Spark 2.3.x, Spark 2.4.x, Spark 3.0.x, Spark 3.1.x, Spark 3.2.x, Spark 3.3.x, Spark 3.4.x, Spark 3.5.x
Note: To support dynamic allocation, the patch(which is included in patch/spark folder) should be applied to Spark
Supported MapReduce Version
Currently supports the MapReduce framework of Hadoop 2.8.5, Hadoop 3.2.1
Building Uniffle
note: currently Uniffle requires JDK 1.8 to build, adding later JDK support is on our roadmap.
Uniffle is built using Apache Maven. To build it, run:
./mvnw -DskipTests clean package
To fix code style issues, run:
./mvnw spotless:apply -Pspark3 -Pspark2 -Ptez -Pmr -Phadoop2.8 -Pdashboard
Build against profile Spark 2 (2.4.6)
./mvnw -DskipTests clean package -Pspark2
Build against profile Spark 3 (3.1.2)
./mvnw -DskipTests clean package -Pspark3
Build against Spark 3.2.x, Except 3.2.0
./mvnw -DskipTests clean package -Pspark3.2
Build against Spark 3.2.0
./mvnw -DskipTests clean package -Pspark3.2.0
Build against Hadoop MapReduce 2.8.5
./mvnw -DskipTests clean package -Pmr,hadoop2.8
Build against Hadoop MapReduce 3.2.1
./mvnw -DskipTests clean package -Pmr,hadoop3.2
Build against Tez 0.9.1
./mvnw -DskipTests clean package -Ptez
Build against Tez 0.9.1 and Hadoop 3.2.1
./mvnw -DskipTests clean package -Ptez,hadoop3.2
Build with dashboard
./mvnw -DskipTests clean package -Pdashboard
note: currently Uniffle build the project against Java 8. If you want to compile it against other Java versions, you can build the code with
-Dmaven.compiler.release=${release-version}.
To package the Uniffle, run:
./build_distribution.sh
Package against Spark 3.2.x, Except 3.2.0, run:
./build_distribution.sh --spark3-profile 'spark3.2'
Package against Spark 3.2.0, run:
./build_distribution.sh --spark3-profile 'spark3.2.0'
Package will build against Hadoop 2.8.5 in default. If you want to build package against Hadoop 3.2.1, run:
./build_distribution.sh --hadoop-profile 'hadoop3.2'
Package with hadoop jars, If you want to build package against Hadoop 3.2.1, run:
./build_distribution.sh --hadoop-profile 'hadoop3.2' -Phadoop-dependencies-included
rss-xxx.tgz will be generated for deployment
Deploy
If you have packaged tgz with hadoop jars, the env of HADOOP_HOME is needn't specified in rss-env.sh.
Deploy Coordinator
- unzip package to RSS_HOME
- update RSS_HOME/conf/rss-env.sh, e.g.,
JAVA_HOME=<java_home> HADOOP_HOME=<hadoop home> COORDINATOR_XMX_SIZE="16g" # You can set coordinator memory size by `XMX_SIZE` too, but it affects all components. # XMX_SIZE="16g" - update RSS_HOME/conf/coordinator.conf, e.g.,
rss.rpc.server.port 19999 rss.jetty.http.port 19998 rss.coordinator.server.heartbeat.timeout 30000 rss.coordinator.app.expired 60000 rss.coordinator.shuffle.nodes.max 5 # enable dynamicClientConf, and coordinator will be responsible for most of client conf rss.coordinator.dynamicClientConf.enabled true # config the path of client conf rss.coordinator.dynamicClientConf.path <RSS_HOME>/conf/dynamic_client.conf # config the path of excluded shuffle server rss.coordinator.exclude.nodes.file.path <RSS_HOME>/conf/exclude_nodes - update <RSS_HOME>/conf/dynamic_client.conf, rss client will get default conf from coordinator e.g.,
# MEMORY_LOCALFILE_HDFS is recommended for production environment rss.storage.type MEMORY_LOCALFILE_HDFS # multiple remote storages are supported, and client will get assignment from coordinator rss.coordinator.remote.storage.path hdfs://cluster1/path,hdfs://cluster2/path rss.writer.require.memory.retryMax 1200 rss.client.retry.max 50 rss.client.send.check.timeout.ms 600000 rss.client.read.buffer.size 14m - start Coordinator
bash RSS_HOME/bin/start-coordnator.sh
Deploy Shuffle Server
We recommend to use JDK 11+ if we want to have better performance when we deploy the shuffle server. Some benchmark tests among different JDK is as below: (using spark to write shuffle data with 20 executors. Single executor will total write 1G, and each time write 14M. Shuffle Server use GRPC to transfer data)
| Java version | ShuffleServer GC | Max pause time | ThroughOutput | | ------------- | ------------- | ------------- | ------------- | | 8 | G1 | 30s | 0.3 | | 11 | G1 | 2.5s | 0.8 | | 18 | G1 | 2.5s | 0.8 | | 18 | ZGC | 0.2ms | 0.99997 |
Deploy Steps:
- unzip package to RSS_HOME
- update RSS_HOME/conf/rss-env.sh, e.g.,
JAVA_HOME=<java_home> HADOOP_HOME=<hadoop home> SHUFFLE_SERVER_XMX_SIZE="80g" # You can set shuffle server memory size by `XMX_SIZE` too, but it affects all components. # XMX_SIZE="80g" - update RSS_HOME/conf/server.conf, e.g.,
rss.rpc.server.port 19999 rss.jetty.http.port 19998 rss.rpc.executor.size 2000 # it should be configured the same as in coordinator rss.storage.type MEMORY_LOCALFILE_HDFS rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999 # local storage path for shuffle server rss.storage.basePath /data1/rssdata,/data2/rssdata.... # it's better to config thread num according to local disk num rss.server.flush.thread.alive 5 rss.server.flush.localfile.threadPool.size 10 rss.server.flush.hadoop.threadPool.size 60 rss.server.buffer.capacity 40g rss.server.read.buffer.capacity 20g rss.server.heartbeat.interval 10000 rss.rpc.message.max.size 1073741824 rss.server.preAllocation.expired 120000 rss.server.commit.timeout 600000 rss.server.app.expired.withoutHeartbeat 120000 # note: the default value of rss.server.flush.cold.storage.threshold.size is 64m # there will be no data written to DFS if set it as 100g even rss.storage.type=MEMORY_LOCALFILE_HDFS # please set a proper v
