lucacanali / sparkmeasure

SparkMeasure is a tool for performance troubleshooting of Apache Spark workloads. It simplifies the collection and analysis of Spark task metrics data.

GitHub

sparkMeasure

Build Status Maven Central

SparkMeasure is a tool for performance troubleshooting of Apache Spark workloads

It simplifies the collection and analysis of Spark performance metrics. It is also intended as a working example of how to use Spark listeners for collecting and processing Spark executors task metrics data.

Use for interactive and batch workloads

  • Interactive: measure and analyze performance from shell or notebooks: using spark-shell (Scala), pyspark (Python) or Jupyter notebooks.
  • Code instrumentation: add calls in your code to deploy sparkMeasure custom Spark listeners and/or use the classes StageMetrics/TaskMetrics and related APIs for collecting, analyzing and saving metrics data.
  • "Flight Recorder" mode: this records all performance metrics automatically and saves data for later processing.

Documentation with examples

Architecture diagram

[TODO]

Main concepts underlying sparkMeasure

  • The tool is based on the Spark Listener interface. Listeners transport Spark executor task metrics data from the executor to the driver. They are a standard part of Spark instrumentation, used by the Spark Web UI for example.
  • Metrics can be collected using sparkMeasure at the granularity of stage completion and/or task completion (configurable)
  • Metrics are flattened and collected into local memory structures in the driver (ListBuffer of a custom case class).
  • Metrics data are transformed into Spark DataFrame for archival and to generate reports.
  • Metrics data and reports can be saved for offline analysis.

FAQ:

  • Why measuring performance with workload metrics instrumentation rather than just using time?
    • Measuring elapsed time, treats your workload as "a black box" and most often does not allow you to understand the root cause of the performance. With workload metrics you can (attempt to) go further in understanding and root cause analysis, bottleneck identification, resource usage measurement.
  • What are Apache Spark tasks metrics and what can I use them for?
    • Apache Spark measures several details of each task execution, including run time, CPU time, information on garbage collection time, shuffle metrics and on task I/O. See taskMetrics class
  • What are accumulables?
    • Metrics are first collected into accumulators. some metrics values are only available in these structures (notably SQL Metrics, such as "scan time")
  • What are known limitations and gotchas?
    • The currently available metrics are quite useful but the do not allow to fully perform workload time profiling and in general do not yet offer a full monitoring platform. Metrics are collected on the driver, which can be quickly become a bottleneck.
  • When should I use stage metrics and when should I use task metrics?
    • Use stage metrics whenever possible as they are much more lightweight. Collect metrics at the task granularity if you need the extra information, for example if you want to study effects of skew, long tails and task stragglers.
  • How can I save/sink the collected metrics?
    • You can print metrics data and reports to standard output or save them to files (local or n HDFS). Additionally you can sink metrics to external systems (such as Prometheus)
  • How can I process metrics data?
    • You can use Spark to read the saved metrics data and perform further post-processing and analysis. See the examples directory.
  • Can I contribute to sparkMeasure?
    • SparkMeasure has already profited from PR contributions. See the TODO_and_issues list if you need ideas on what to contribute.

Two simple examples of sparkMeasure usage

  1. Link to an example Jupyter Notebook

  2. An example using Scala REPL:

bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.11:0.13

val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark) 
stageMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show())

The output should look like this:

Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 3
sum(numTasks) => 17
elapsedTime => 9103 (9 s)
sum(stageDuration) => 9027 (9 s)
sum(executorRunTime) => 69238 (1.2 min)
sum(executorCpuTime) => 68004 (1.1 min)
sum(executorDeserializeTime) => 1031 (1 s)
sum(executorDeserializeCpuTime) => 151 (0.2 s)
sum(resultSerializationTime) => 5 (5 ms)
sum(jvmGCTime) => 64 (64 ms)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 26 (26 ms)
max(resultSize) => 17934 (17.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 2000
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 472 (472 Bytes)
sum(shuffleTotalBlocksFetched) => 8
sum(shuffleLocalBlocksFetched) => 8
sum(shuffleRemoteBlocksFetched) => 0
sum(shuffleBytesWritten) => 472 (472 Bytes)
sum(shuffleRecordsWritten) => 8