SkillAgentSearch skills...

Celeborn

Apache Celeborn is an elastic and high-performance service for shuffle and spilled data.

Install / Use

/learn @apache/Celeborn
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Apache Celeborn

Celeborn CI
Celeborn (/ˈkeləbɔ:n/) is dedicated to improving the efficiency and elasticity of different map-reduce engines and provides an elastic, high-efficient management service for intermediate data including shuffle data, spilled data, result data, etc. Currently, Celeborn is focusing on shuffle data.

Internals

Architecture

Celeborn architecture Celeborn has three primary components: Master, Worker, and Client. Master manages all resources and syncs shared states based on Raft. Worker processes read-write requests and merges data for each reducer. LifecycleManager maintains metadata of each shuffle and runs within the Spark driver.

Feature

  1. Disaggregate Computing and storage.
  2. Push-based shuffle write and merged shuffle read.
  3. High availability and high fault tolerance.

Shuffle Process

Celeborn shuffle

  1. Mappers lazily ask LifecycleManager to registerShuffle.
  2. LifecycleManager requests slots from Master.
  3. Workers reserve slots and create corresponding files.
  4. Mappers get worker locations from LifecycleManager.
  5. Mappers push data to specified workers.
  6. Workers merge and replicate data to its peer.
  7. Workers flush to disk periodically.
  8. Mapper tasks accomplish and trigger MapperEnd event.
  9. When all mapper tasks are complete, workers commit files.
  10. Reducers ask for file locations.
  11. Reducers read shuffle data.

Load Balance

Load Balance

We introduce slots to achieve load balance. We will equally distribute partitions on every Celeborn worker by tracking slot usage. The Slot is a logical concept in Celeborn Worker that represents how many partitions can be allocated to each Celeborn Worker. Celeborn Worker's slot count is decided by total usable disk size / average shuffle file size. Celeborn worker's slot count decreases when a partition is allocated and increments when a partition is freed.

Build

  1. Celeborn supports Spark 2.4/3.0/3.1/3.2/3.3/3.4/3.5/4.0, Flink 1.16/1.17/1.18/1.19/1.20 and Hadoop MapReduce 2/3.
  2. Celeborn tested under Scala 2.11/2.12/2.13 and Java 8/11/17 environment.

Build Celeborn via make-distribution.sh:

./build/make-distribution.sh -Pspark-2.4/-Pspark-3.0/-Pspark-3.1/-Pspark-3.2/-Pspark-3.3/-Pspark-3.4/-Pspark-3.5/-Pspark-4.0/-Pspark-4.1/-Pflink-1.16/-Pflink-1.17/-Pflink-1.18/-Pflink-1.19/-Pflink-1.20/-Pflink-2.0/-Pflink-2.1/-Pflink-2.2/-Pmr

Package apache-celeborn-${project.version}-bin.tgz will be generated.

NOTE: The following table indicates the compatibility of Celeborn Spark and Flink clients with different versions of Spark and Flink for various Java and Scala versions.

| | Java 8/Scala 2.11 | Java 8/Scala 2.12 | Java 11/Scala 2.12 | Java 17/Scala 2.12 | Java 8/Scala 2.13 | Java 11/Scala 2.13 | Java 17/Scala 2.13 | |------------|-------------------|-------------------|--------------------|--------------------|-------------------|--------------------|--------------------| | Spark 2.4 | ✔ | ✔ | ❌ | ❌ | ❌ | ❌ | ❌ | | Spark 3.0 | ❌ | ✔ | ✔ | ❌ | ❌ | ❌ | ❌ | | Spark 3.1 | ❌ | ✔ | ✔ | ❌ | ❌ | ❌ | ❌ | | Spark 3.2 | ❌ | ✔ | ✔ | ❌ | ✔ | ✔ | ❌ | | Spark 3.3 | ❌ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | | Spark 3.4 | ❌ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | | Spark 3.5 | ❌ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | | Spark 4.0 | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ✔ | | Spark 4.1 | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ✔ | | Flink 1.16 | ❌ | ✔ | ✔ | ❌ | ❌ | ❌ | ❌ | | Flink 1.17 | ❌ | ✔ | ✔ | ❌ | ❌ | ❌ | ❌ | | Flink 1.18 | ❌ | ✔ | ✔ | ❌ | ❌ | ❌ | ❌ | | Flink 1.19 | ❌ | ✔ | ✔ | ❌ | ❌ | ❌ | ❌ | | Flink 1.20 | ❌ | ✔ | ✔ | ❌ | ❌ | ❌ | ❌ | | Flink 2.0 | ❌ | ❌ | ✔ | ✔ | ❌ | ✔ | ✔ | | Flink 2.1 | ❌ | ❌ | ✔ | ✔ | ❌ | ✔ | ✔ | | Flink 2.2 | ❌ | ❌ | ✔ | ✔ | ❌ | ✔ | ✔ |

To compile the client for Spark 2.4 with Scala 2.12, please use the following command:

  • Scala 2.12.8/2.12.9/2.12.10
./build/make-distribution.sh -DskipTests -Pspark-2.4 -Dscala.version=${scala.version} -Dscala.binary.version=2.12 -Dmaven.plugin.scala.version=3.2.2 -Dmaven.plugin.silencer.version=1.6.0
  • Scala 2.12.13-2.12.18
./build/make-distribution.sh -DskipTests -Pspark-2.4 -Dscala.version=${scala.version} -Dscala.binary.version=2.12

To compile for Spark 3.5 with Java21, please use the following command

./build/make-distribution.sh -Pspark-3.5 -Pjdk-21
./build/make-distribution.sh --sbt-enabled -Pspark-3.5 -Pjdk-21

To compile for Spark 4.0 with Java21, please use the following command

./build/make-distribution.sh -Pspark-4.0 -Pjdk-21
./build/make-distribution.sh --sbt-enabled -Pspark-4.0 -Pjdk-21

NOTE: Celeborn supports automatic builds on linux aarch64 platform via aarch64 profile. aarch64 profile requires glibc version 3.4.21. There is potential problematic frame C [libc.so.6+0x8412a] for other glibc version like 2.x etc.

To build Celeborn with AWS S3 support MPU, please use the following command

./build/make-distribution.sh --sbt-enabled -Pspark-3.4 -Pjdk-8 -Paws

To build Celeborn with Aliyun OSS support MPU, please use the following command

./build/make-distribution.sh --sbt-enabled -Pspark-3.4 -Pjdk-8 -Paliyun

Package Details

Build procedure will create a compressed package.

General package layout:

    ├── RELEASE                         
    ├── bin                             
    ├── conf                            
    ├── jars           // common jars for master and worker                 
    ├── master-jars                     
    ├── worker-jars                  
    ├── cli-jars     
    ├── spark          // Spark client jars if spark profiles are activated
    ├── flink          // Flink client jars if flink profiles are activated
    ├── mr             // MapReduce client jars if mr profile is activated
    └── sbin

Compatibility

Celeborn server is compatible with all clients inside various engines. However, Celeborn clients must be consistent with the version of the specified engine. For example, if you are running Spark 2.4, you must compile Celeborn client with -Pspark-2.4; if you are running Spark 3.2, you must compile Celeborn client with -Pspark-3.2; if you are running flink 1.16, you must compile Celeborn client with -Pflink-1.16.

Usage

Celeborn cluster composes of Master and Worker nodes, the Master supports both single and HA mode(Raft-based) deployments.

Deploy Celeborn

Deploy on host

  1. Unzip the tarball to $CELEBORN_HOME.
  2. Modify environment variables in $CELEBORN_HOME/conf/celeborn-env.sh.

EXAMPLE:

#!/usr/bin/env bash
CELEBORN_MASTER_MEMORY=4g
CELEBORN_WORKER_MEMORY=2g
CELEBORN_WORKER_OFFHEAP_MEMORY=4g
  1. Modify configurations in $CELEBORN_HOME/conf/celeborn-defaults.conf.

EXAMPLE: single master cluster

# used by client and worker to connect to master
celeborn.master.endpoints clb-master:9097

# used by master to bootstrap
celeborn.master.host clb-master
celeborn.master.port 9097

celeborn.metrics.enabled true
celeborn.worker.flusher.buffer.size 256k

# If Celeborn workers have local disks and HDFS. Following configs should be added.
# If Celeborn workers have local disks, use following config.
# Disk type is HDD by default.
celeborn.worker.storage.dirs /mnt/disk1:disktype=SSD,/mnt/disk2:disktype=SSD

# If Celeborn workers don't have local disks. You can use HDFS.
# Do not set `celeborn.worker.storage.dirs` and use following configs.
celeborn.storage.availableTypes HDFS
celeborn.worker.sortPartition.threads 64
celeborn.worker.commitFiles.timeout 240s
celeborn.worker.commitFiles.threads 128
celeborn.master.slot.assign.policy roundrobin
celeborn.rpc.askTimeout 240s
celeborn.worker.flusher.hdfs.buffer.size 4m
celeborn.storage.hdfs.dir hdfs://<namenode>/
View on GitHub
GitHub Stars1.0k
CategoryDevelopment
Updated1d ago
Forks429

Languages

Java

Security Score

100/100

Audited on Mar 30, 2026

No findings