SkillAgentSearch skills...

SparkMeasure

This repository contains the development code for sparkMeasure, an Apache Spark performance analysis and troubleshooting library. It simplifies collecting, aggregating, and exporting Spark task/stage metrics, and is designed for practical use by developers and data engineers in interactive analysis, testing, and production monitoring workflows.

Install / Use

/learn @LucaCanali/SparkMeasure
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

SparkMeasure - a performance tool for Apache Spark

Test Maven Central DOI PyPI PyPI - Downloads API Documentation

SparkMeasure is a tool and a library designed to ease performance measurement and troubleshooting of Apache Spark jobs. It focuses on easing the collection and analysis of Spark metrics, making it a practical choice for both developers and data engineers. With sparkMeasure, users gain a deeper understanding of their Spark job performance, enabling faster and more reliable data processing workflows.

✨ Highlights

  • Interactive Troubleshooting: Ideal for real-time analysis of Spark workloads in notebooks and spark-shell/pyspark environments.
  • Development & CI/CD Integration: Facilitates testing, measuring, and comparing execution metrics of Spark jobs under various configurations or code changes.
  • Batch Job Analysis: With Flight Recorder mode sparkMeasure transparently records batch job metrics for later analysis.
  • Monitoring Capabilities: Integrates with external systems like Apache Kafka, Prometheus Push Gateway, Prometheus JMX Exporter, and InfluxDB for extensive monitoring.
  • Educational Tool: Serves as a practical example of implementing Spark Listeners for the collection of detailed Spark task metrics.
  • Language Compatibility: Fully supports Scala, Java, and Python, making it versatile for a wide range of Spark applications.

📚 Table of Contents

Links to related work on Spark Performance

Main author and contact: Luca.Canali@cern.ch


🚀 Quick start

Watch the video Watch sparkMeasure's getting started demo tutorial

Examples of sparkMeasure on notebooks

Examples of sparkMeasure on the CLI

  • Run locally or on hosted resources
    • Open in GitHub Codespaces

Python CLI

# Python CLI
# pip install pyspark
pip install sparkmeasure
pyspark --packages ch.cern.sparkmeasure:spark-measure_2.13:0.27

# Import sparkMeasure
from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)
# Simple one-liner to run a Spark SQL query and measure its performance
stagemetrics.runandmeasure(globals(), 'spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()')

# Alternatively, you can use the begin() and end() methods to measure performance
# Start measuring
stagemetrics.begin()

spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()

# Set a stop point for measuring metrics delta values
stagemetrics.end()
# Print the metrics report
stagemetrics.print_report()
stagemetrics.print_memory_report()

# get metrics as a dictionary
metrics = stagemetrics.aggregate_stage_metrics()

Note: for Spark 3.x with Scala 2.12, use --packages ch.cern.sparkmeasure:spark-measure_2.12:0.27 instead of --packages ch.cern.sparkmeasure:spark-measure_2.13:0.27

Scala CLI

spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.13:0.27

val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show())

The output should look like this:

+----------+
|  count(1)|
+----------+
|1000000000|
+----------+

Time taken: 3833 ms

Scheduling mode = FIFO
Spark Context default degree of parallelism = 8

Aggregated Spark stage metrics:
numStages => 3
numTasks => 17
elapsedTime => 1112 (1 s)
stageDuration => 864 (0.9 s)
executorRunTime => 3358 (3 s)
executorCpuTime => 2168 (2 s)
executorDeserializeTime => 892 (0.9 s)
executorDeserializeCpuTime => 251 (0.3 s)
resultSerializationTime => 72 (72 ms)
jvmGCTime => 0 (0 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 36 (36 ms)
resultSize => 16295 (15.9 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 2000
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 8
shuffleTotalBlocksFetched => 8
shuffleLocalBlocksFetched => 8
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 472 (472 Bytes)
shuffleLocalBytesRead => 472 (472 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 472 (472 Bytes)
shuffleRecordsWritten => 8

Average number of active tasks => 3.0

Stages and their duration:
Stage 0 duration => 355 (0.4 s)
Stage 1 duration => 411 (0.4 s)
Stage 3 duration => 98 (98 ms)

Memory report

Stage metrics collection mode has an optional memory report command:

(scala)> stageMetrics.printMemoryReport
(python)> stagemetrics.print_memory_report()

Additional stage-level executor metrics (memory usage info updated at each heartbeat):

Stage 0 JVMHeapMemory maxVal bytes => 322888344 (307.9 MB)
Stage 0 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 1 JVMHeapMemory maxVal bytes => 322888344 (307.9 MB)
Stage 1 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
Stage 3 JVMHeapMemory maxVal bytes => 322888344 (307.9 MB)
Stage 3 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)

Notes:

  • this report makes use of per-stage memory (executor metrics) data which is sent by the executors at each heartbeat to the driver, there could be a sm

Related Skills

View on GitHub
GitHub Stars817
CategoryDevelopment
Updated4d ago
Forks159

Languages

Scala

Security Score

100/100

Audited on Mar 24, 2026

No findings