SparkPlugins
Code and examples of how to write and deploy Apache Spark Plugins. Spark plugins allow runnig custom code on the executors as they are initialized. This also allows extending the Spark metrics systems with user-provided monitoring probes.
Install / Use
/learn @cerndb/SparkPluginsREADME
SparkPlugins
Spark Plugins are an Apache Spark feature for extending Spark with custom metrics and actions. This repository provides ready-to-use examples for deploying Spark plugins across various use cases.
Key Features
- Custom Metrics: Extend Spark's instrumentation with user-defined metrics.
- Executor Actions: Trigger custom actions upon executor startup, useful for integrations (e.g., monitoring systems).
- Resource Monitoring: Measure Spark’s usage of cluster resources (YARN, K8S, Standalone).
- I/O Metrics: Monitor I/O performance from cloud filesystems, OS metrics, and custom application metrics.
- External Integrations: Connect with external systems like Pyroscope for performance insights.
Contents
- Getting started
- Demo and basic plugins
- Implementation notes
- Plugin for integrating Pyroscope with Spark
- Plugin for OS metrics instrumentation with Cgroups for Spark on Kubernetes
- Plugin to collect I/O storage statistics for HDFS and Hadoop-compatible filesystems
- Plugin for Cloud filesystem storage statistics
- Experimental plugins
Resources
- Spark Performance Dashboard - a solution to ingest and visualize Spark metrics
- link to the repo on how to deploy a Spark Performance Dashboard using Spark metrics
- DATA+AI summit 2020 talk What is New with Apache Spark Performance Monitoring in Spark 3.0
- DATA+AI summit 2021 talk Monitor Apache Spark 3 on Kubernetes using Metrics and Plugins
Author and contact: Luca.Canali@cern.ch
Getting Started - Your First Spark Plugins
To begin using Spark plugins from this repository, follow these steps:
1. Deploying Spark Plugins
You can deploy the Spark plugins directly using Maven Central:
For Scala 2.12:
spark-shell --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.4
For Scala 2.13:
spark-shell --packages ch.cern.sparkmeasure:spark-plugins_2.13:0.4
2. Building or Downloading the SparkPlugin JAR
You can either build the JAR from the source or download it directly:
Option 1: Build from Source
- Make sure you have SBT (Scala Build Tool) installed.
- Run the following command:
sbt +package
Option 2: Download from GitHub
- See jars in the Release version 0.4
- Or visit the GitHub Actions page for this repository and locate the latest successful build and download the JAR file.
Demo and Basic Plugins
- DemoPlugin
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.4 --conf spark.plugins=ch.cern.DemoPlugin- Basic plugin, demonstrates how to write Spark plugins in Scala, for demo and testing.
- DemoMetricsPlugin
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.4 --conf spark.plugins=ch.cern.DemoMetricsPlugin- Example plugin illustrating integration with the Spark metrics system.
- Metrics implemented:
ch.cern.DemoMetricsPlugin.DriverTest42: a gauge reporting a constant integer value, for testing.
- RunOSCommandPlugin
--conf spark.plugins=ch.cern.RunOSCommandPlugin- Example illustrating how to use plugins to run actions on the OS.
- Action implemented: runs an OS command on the executors, by default it runs:
/usr/bin/touch /tmp/plugin.txt - Configurable action:
--conf spark.cernSparkPlugin.command="command or script you want to run" - Example:
bin/spark-shell --master yarn \ --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.4 \ --conf spark.plugins=ch.cern.RunOSCommandPlugin- You can see if the plugin has run by checking that the file
/tmp/plugin.txthas been created on the executor machines.
- You can see if the plugin has run by checking that the file
Implementation Notes:
- Spark plugins implement the
org.apache.spark.api.Plugininterface, they can be written in Scala or Java and can be used to run custom code at the startup of Spark executors and driver. - Plugins basic configuration:
--conf spark.plugins=<list of plugin classes> - Plugin JARs need to be made available to Spark executors
- you can distribute the plugin code to the executors using
--jarsand--packages - for K8S you can also consider making the jars available directly in the container image
- you can distribute the plugin code to the executors using
- Most of the Plugins described in this repo are intended to extend the Spark Metrics System
- See the details on the Spark metrics system at Spark Monitoring documentation.
- You can find the metrics generated by the plugins in the Spark metrics system stream under the
namespace
namespace=plugin.<Plugin Class Name>
- See also: SPARK-29397, SPARK-28091, SPARK-32119.
Plugins in this Repository
Plugin for integrating with Pyroscope
Grafana Pyroscope is a tool for continuous profiling and Flame Graph visualization. This plugin allows to integrate Apache Spark and Pyroscope.
For details see:
How to profile Spark with Pyroscope
An example of how to put all the configuration together and start Spark on a cluster with Pyroscope Flame Graph continuous monitoring. Example:
- Start Pyroscope
- Download from https://github.com/grafana/pyroscope/releases
- CLI start:
./pyroscope -server.http-listen-port 5040 - Or use docker:
docker run -it -p 5040:4040 grafana/pyroscope
- Spark Spark (spark-shell, PySpark, spark-submit
bin/spark-shell --master yarn \
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.4,io.pyroscope:agent:2.1.2 \ # update to use the latest versions
--conf spark.plugins=ch.cern.PyroscopePlugin \
--conf spark.pyroscope.server="http://<myhostname>:5040" # match with the server and port used when starting Pyroscope
Spark configurations:
This plugin adds the following configurations:
--conf spark.pyroscope.server - > default "http://localhost:4040", update to match the server name and port used by Pyroscope
--conf spark.pyroscope.applicationName -> default spark.conf.get("spark.app.id")
--conf spark.pyroscope.eventType -> default ITIMER, possible values ITIMER, CPU, WALL, ALLOC, LOCK
Example:
This is an example of how to use the configuration programmatically (using PySpark):
from pyspark.sql import SparkSession
# Get the Spark session
spark = (SparkSession.builder.
appName("Instrumented app").master("yarn")
.config("spark.executor.memory","16g")
.config("spark.executor.cores","4")
.config("spark.executor.instances", 2)
.config("spark.jars.packages", "ch.cern.sparkmeasure:spark-plugins_2.12:0.4,io.pyroscope:agent:0.13.0")
.config("spark.plugins", "ch.cern.PyroscopePlugin")
.config("spark.pyroscope.server", "http://<myhostname>:5040")
.getOrCreate()
)
OS metrics instrumentation with cgroups, for Spark on Kubernetes
- CgroupMetrics
-
Configure with:
--conf spark.plugins=ch.cern.CgroupMetrics -
Optional configuration:
--conf spark.cernSparkPlugin.registerOnDriver(default false) -
Implemented using cgroup instrumentation of key system resource usage, intended mostly for Spark on Kubernetes
-
Collects metrics using CGroup stats from
/sys/fsand from/procfilesystem for CPU, Memory and Network usage. See also kernel documentation Note: the metrics are reported for the entire cgroup to which the executor belongs to. This is mostly intended for Spark running on Kubernetes. In other cases, the metrics reported may not be easily correlated with executor's activity, as the cgroup metrics may include more processes, up to the entire system. -
Metrics implemented (gauges), with prefix
ch.cern.CgroupMetrics:CPUTimeNanosec: reports the CPU time used by the processes in the cgroup.MemoryRss: number of bytes of anonymous and swap cache memory.MemorySwap: number of bytes of swap usage.MemoryCache: number of bytes of page cache memory.NetworkBytesIn: network traffic inbound.NetworkBytesOut: network traffic outbound.
-
Example:
bin/spark-shell --master k8s://https://<K8S URL>:6443 --driver-memory 1g \ --num-executors 2 --executor-cores 2 --executor-memory 2g \ -- -
