Code and examples of how to write and deploy Apache Spark Plugins with Spark 3.x. Spark plugins allow runnig custom code on the executors as they are initialized. This also allows extending the Spark metrics systems with user-provided monitoring probes.

SparkPlugins

SparkPlugins CI Maven Central

This repository contains code and examples of how to use Apache Spark Plugins.
Spark plugins are part of Spark core since version 3.0 and provide an interface, and related configuration, for injecting custom code on executors as they are initialized. Spark plugins can also be used to implement custom extensions to the Spark metrics system.

Motivations

  • Instrumenting parts of the Spark workload with plugins provides additional flexibility compared to extending instrumentation in the Apache Spark code, as only users who want to activate it can do so, moreover they can play with configuration that may be customized for their environment, so not necessarily suitable for all possible uses of Apache Spark code.
  • One important use case is extending Spark instrumentation with custom metrics.
  • This repo provides code and examples of plugins applied to measuring Spark on K8S, Spark I/O from cloud Filesystems, OS metrics, and custom application metrics.
  • Note: The code in this repo is for Spark 3.x.
    For Spark 2.x, see instead Executor Plugins for Spark 2.4

Implementation Notes:

  • Spark plugins implement the org.apache.spark.api.Plugin interface, they can be written in Scala or Java and can be used to run custom code at the startup of Spark executors and driver.
  • Plugins basic configuration: --conf spark.plugins=<list of plugin classes>
  • Plugin JARs need to be made available to Spark executors
    • you can distribute the plugin code to the executors using --jars and --packages.
    • for K8S you can also consider making the jars available directly in the container image.
  • Most of the Plugins described in this repo are intended to extend the Spark Metrics System.
    • See the details on the Spark metrics system at Spark Monitoring documentation.
    • You can find the metrics generated by the plugins in the Spark metrics system stream under the namespace namespace=plugin.<Plugin Class Name>
  • See also: SPARK-29397, SPARK-28091, SPARK-32119.

Related Work and Spark Performance Dashboard

Author and contact: Luca.Canali@cern.ch


Getting Started

  • Deploy the jar from maven central
    • --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.2
  • Build or download the SparkPlugin jar. For example:
    • Build from source with sbt +package
    • Or download the jar from the automatic build in github actions

Demo and Basic Plugins

  • DemoPlugin
    • --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.2 --conf spark.plugins=ch.cern.DemoPlugin
    • Basic plugin, demonstrates how to write Spark plugins in Scala, for demo and testing.
  • DemoMetricsPlugin
    • --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.2 --conf spark.plugins=ch.cern.DemoMetricsPlugin
    • Example plugin illustrating integration with the Spark metrics system.
    • Metrics implemented:
      • ch.cern.DemoMetricsPlugin.DriverTest42: a gauge reporting a constant integer value, for testing.
  • RunOSCommandPlugin
    • --conf spark.plugins=ch.cern.RunOSCommandPlugin
    • Example illustrating how to use plugins to run actions on the OS.
    • Action implemented: runs an OS command on the executors, by default it runs: /usr/bin/touch /tmp/plugin.txt
    • Configurable action: --conf spark.cernSparkPlugin.command="command or script you want to run"
    • Example:
      bin/spark-shell --master yarn \ 
        --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.2 \
        --conf spark.plugins=ch.cern.RunOSCommandPlugin 
      
      • You can see if the plugin has run by checking that the file /tmp/plugin.txt has been created on the executor machines.

Plugins in this Repository

OS metrics instrumentation with cgroups, for Spark on Kubernetes

  • CgroupMetrics

    • Configure with: --conf spark.plugins=ch.cern.CgroupMetrics

    • Optional configuration: --conf spark.cernSparkPlugin.registerOnDriver (default false)

    • Implemented using cgroup instrumentation of key system resource usage, intended mostly for Spark on Kubernetes

    • Collects metrics using CGroup stats from /sys/fs and from /proc filesystem for CPU, Memory and Network usage. See also kernel documentation Note: the metrics are reported for the entire cgroup to which the executor belongs to. This is mostly intended for Spark running on Kubernetes. In other cases, the metrics reported may not be easily correlated with executor's activity, as the cgroup metrics may include more processes, up to the entire system.

    • Metrics implemented (gauges), with prefix ch.cern.CgroupMetrics:

      • CPUTimeNanosec: reports the CPU time used by the processes in the cgroup.
      • MemoryRss: number of bytes of anonymous and swap cache memory.
      • MemorySwap: number of bytes of swap usage.
      • MemoryCache: number of bytes of page cache memory.
      • NetworkBytesIn: network traffic inbound.
      • NetworkBytesOut: network traffic outbound.
    • Example:

    bin/spark-shell --master k8s://https://<K8S URL>:6443 --driver-memory 1g \ 
      --num-executors 2 --executor-cores 2 --executor-memory 2g \
      --conf spark.kubernetes.container.image=<registry>/spark:v330 \
      --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.2 \
      --conf spark.plugins=ch.cern.HDFSMetrics,ch.cern.CgroupMetrics \
      --conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"   \
      --conf "spark.metrics.conf.*.sink.graphite.host"=mytestinstance \
      --conf "spark.metrics.conf.*.sink.graphite.port"=2003 \
      --conf "spark.metrics.conf.*.sink.graphite.period"=10 \
      --conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \
      --conf "spark.metrics.conf.*.sink.graphite.prefix"="youridhere"
    
  • Visualize the metrics using the Spark dashboard, see Spark_Perf_Dashboard_v03_with_SparkPlugins

Plugins to collect I/O storage statistics for HDFS and Hadoop Compatible Filesystems

HDFS extended storage statistics

This Plugin measures HDFS extended statistics. In particular, it provides information on read locality and erasure coding usage (for HDFS 3.x).

  • HDFSMetrics

    • Configure with: --conf spark.plugins=ch.cern.HDFSMetrics

    • Optional configuration: --conf spark.cernSparkPlugin.registerOnDriver (default true)

    • Collects extended HDFS metrics using Hadoop's GlobalStorageStatistics implemented using Hadoop extended statistics metrics introduced in Hadoop 2.8.

    • Use this with Spark built with Hadoop 3.2 or higher (it does not work with Spark built with Hadoop 2.7).

    • Metrics (gauges) implemented have the prefix ch.cern.HDFSMetrics. List of metrics:

      • bytesRead
      • bytesWritten
      • readOps
      • writeOps
      • largeReadOps
      • bytesReadLocalHost
      • bytesReadDistanceOfOneOrTwo
      • bytesReadDistanceOfThreeOrFour
      • bytesReadDistanceOfFiveOrLarger
      • bytesReadErasureCoded
    • Example

    bin/spark-shell --master yarn \
      --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.2 \
      --conf spark.plugins=ch.cern.HDFSMetrics \
      --conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"   \
      --conf "spark.metrics.conf.*.sink.graphite.host"=mytestinstance \
      --conf "spark.metrics.conf.*.sink.graphite.port"=2003 \
      --conf "spark.metrics.conf.*.sink.graphite.period"=10 \
      --conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \
      --conf "spark.metrics.conf.*.sink.graphite.prefix"="youridhere"
    
  • Visualize the metrics using the Spark dashboard, see Spark_Perf_Dashboard_v03_with_SparkPlugins

Cloud filesystem storage statistics for Hadoop Compatible Filesystems

CloudFSMetrics

This Plugin provides I/O statistics for Cloud Filesystem metrics (for s3a, gs, wasbs, oci, root, and any other storage system exposed as a Hadoop Compatible Filesystem).

  • Configure with:
    • --conf spark.plugins=ch.cern.CloudFSMetrics

    • --conf spark.cernSparkPlugin.cloudFsName=<name of the filesystem> (example: "s3a", "gs", "wasbs", "root", "oci", etc.)

    • Optional configuration: --conf spark.cernSparkPlugin.registerOnDriver (default true)

    • Collects I/O metrics for Hadoop-compatible filesystems using Hadoop's GlobalStorageStatistics API.

      • Note: use this with Spark built with Hadoop 3.x (requires Hadoop client version 2.8 or higher).
      • Spark also allows to measure filesystem metrics using --conf spark.executor.metrics.fileSystemSchemes=<filesystems to measure> (default: file,hdfs) however in Spark (up to 3.1) this is done using Hadoop Filesystem getAllStatistics, deprecated in recent versions of Hadoop.
    • Metrics (gauges) implemented have the prefix ch.cern.S3AMetricsGSS. List of metrics:

      • bytesRead
      • bytesWritten
      • readOps
      • writeOps
    • Example:

      bin/spark-shell --master k8s://https://<K8S URL>:6443 --driver-memory 1g \ 
       --num-executors 2 --executor-cores 2 --executor-memory 2g \
       --conf spark.kubernetes.container.image=<registry>/spark:v311 \
       --packages org.apache.hadoop:hadoop-aws:3.3.2,ch.cern.sparkmeasure:spark-plugins_2.12:0.2 \
       --conf spark.plugins=ch.cern.CloudFSMetrics,ch.cern.CgroupMetrics \
       --conf spark.cernSparkPlugin.cloudFsName="s3a" \
       --conf spark.hadoop.fs.s3a.secret.key="<SECRET KEY HERE>" \
       --conf spark.hadoop.fs.s3a.access.key="<ACCESS KEY HERE>" \
       --conf spark.hadoop.fs.s3a.endpoint="https://<S3A URL HERE>" \
       --conf spark.hadoop.fs.s3a.impl="org.apache.hadoop.fs.s3a.S3AFileSystem" \
       --conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"   \
       --conf "spark.metrics.conf.*.sink.graphite.host"=mytestinstance \
       --conf "spark.metrics.conf.*.sink.graphite.port"=2003 \
       --conf "spark.metrics.conf.*.sink.graphite.period"=10 \
       --conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \
       --conf "spark.metrics.conf.*.sink.graphite.prefix"="youridhere"
      
    • Visualize the metrics using the Spark dashboard, see Spark_Perf_Dashboard_v03_with_SparkPlugins

CloudFSMetrics27

This Plugin provides I/O statistics for Cloud Filesystem metrics (for s3a, gs, wasbs, oci, root, and any other storage system exposed as a Hadoop Compatible Filesystem). Use this for Spark built using Hadoop 2.7.

  • Configure with:
    • --conf spark.plugins=ch.cern.CloudFSMetrics27
    • --conf spark.cernSparkPlugin.cloudFsName=<name of the filesystem> (example: "s3a", "oci", "gs", "root", etc.)
    • Optional configuration: --conf spark.cernSparkPlugin.registerOnDriver (default true)
    • Collects I/O metrics for Hadoop-compatible filesystem using Hadoop 2.7 API, use with Spark built with Hadoop 2.7
    • The metrics are the same as for CloudFSMetrics described above, with except for the prefix: ch.cern.CloudFSMetrics27.

Experimental Plugins for I/O Time Instrumentation

This section details a few experimental Spark plugins used to expose metrics for I/O-time instrumentation of Spark workloads using Hadoop-compliant filesystems.
These plugins use instrumented experimental/custom versions of the Hadoop client API for HDFS and other Hadoop-Compliant File Systems.

  • S3A Time Instrumentation

    • Instruments the Hadoop S3A client.

    • Note: this requires custom S3A client implementation, see experimental code at: HDFS and S3A custom instrumentation

    • Spark config:

      • Use this with Spark 3.1.x (which uses hadoop version 3.2.0)
      • --conf spark.plugins=ch.cern.experimental.S3ATimeInstrumentation
      • Custom jar needed: --jars hadoop-aws-3.2.0.jar
    • Metrics implemented (gauges), with prefix ch.cern.experimental.S3ATimeInstrumentation:

      • S3AReadTimeMuSec
      • S3ASeekTimeMuSec
      • S3ACPUTimeDuringReadMuSec
      • S3ACPUTimeDuringSeekMuSec
      • S3AReadTimeMinusCPUMuSec
      • S3ASeekTimeMinusCPUMuSec
      • S3ABytesRead
      • S3AGetObjectMetadataMuSec
      • S3AGetObjectMetadataMinusCPUMuSec
    • Example:

      bin/spark-shell --master k8s://https://<K8S URL>:6443 --driver-memory 1g \ 
       --num-executors 2 --executor-cores 2 --executor-memory 2g \
       --conf spark.kubernetes.container.image=<registry>/spark:v311 \
       --jars <PATH>/hadoop-aws-3.2.0.jar
       --packages com.amazonaws:aws-java-sdk-bundle:1.11.880,ch.cern.sparkmeasure:spark-plugins_2.12:0.1 \
       --conf spark.hadoop.fs.s3a.secret.key="<SECRET KEY HERE>" \
       --conf spark.hadoop.fs.s3a.access.key="<ACCESS KEY HERE>" \
       --conf spark.hadoop.fs.s3a.endpoint="https://<URL HERE>" \
       --conf spark.hadoop.fs.s3a.impl="org.apache.hadoop.fs.s3a.S3AFileSystem" \
       --conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"   \
       --conf "spark.metrics.conf.*.sink.graphite.host"=mytestinstance \
       --conf "spark.metrics.conf.*.sink.graphite.port"=2003 \
       --conf "spark.metrics.conf.*.sink.graphite.period"=10 \
       --conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \
       --conf "spark.metrics.conf.*.sink.graphite.prefix"="youridhere"
      
  • Visualize the metrics using the Spark dashboard, see Spark_Perf_Dashboard_v03_with_SparkPlugins_Experimental

  • HDFS Time Instrumentation

    • Instruments the Hadoop HDFS client.

    • Note: this requires custom HDFS client implementation, see experimental code at: HDFS and S3A custom instrumentation

    • Spark config:

      • Use this with Spark 3.1.x (which uses hadoop version 3.2.0)
      • --conf spark.plugins=ch.cern.experimental.HDFSTimeInstrumentation
      • --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.1
      • Non-standard configuration required for using this instrumentation:
    • Metrics implemented (gauges), with prefix ch.cern.experimental.HDFSTimeInstrumentation:

      • HDFSReadTimeMuSec
      • HDFSCPUTimeDuringReadMuSec
      • HDFSReadTimeMinusCPUMuSec
      • HDFSBytesRead
      • HDFSReadCalls
    • Example:

    bin/spark-shell --master yarn --num-executors 2 --executor-cores 2 \
     --jars <PATH>/sparkplugins_2.12-0.1.jar \
     --conf spark.plugins=ch.cern.experimental.HDFSTimeInstrumentation 
     ...NOTE: ADD here spark.metrics.conf parameters or configure metrics.conf> 
    
    • Visualize the metrics with the Spark dashboard spark_perf_dashboard_spark3-0_v02_with_sparkplugins_experimental
  • Hadoop-XRootD Time Instrumentation

    • Collects metrics for the Hadoop-XRootD connector
    • Intended use is when using Spark with the CERN EOS (and CERN Box) storage system.
    • Additional details: Hadoop-XRootD connector instrumentation
    • Spark config:
      --conf spark.plugins=ch.cern.experimental.ROOTTimeInstrumentation \
      --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.1 \
      --conf spark.driver.extraClassPath=<path>/hadoop-xrootd-1.0.5.jar \
      --conf spark.executor.extraClassPath=<path>/hadoop-xrootd-1.0.5.jar \
      --files <path_to_krbtgt>#krbcache \
      --conf spark.executorEnv.KRB5CCNAME="FILE:krbcache"
      
    • Metrics implemented (gauges), with prefix ch.cern.experimental.ROOTTimeInstrumentation:
      • ROOTBytesRead
      • ROOTReadOps
      • ROOTReadTimeMuSec
  • Visualize the metrics using the Spark dashboard, see Spark_Perf_Dashboard_v03_with_SparkPlugins_Experimental

  • OCITimeInstrumentation - instruments the HDFS connector to Oracle OCI storage. - Note: this requires a custom hdfs-oci connector implementation, see experimental code at: OCI-Hadoop connector instrumentation - Spark config:

    • --conf spark.plugins=ch.cern.experimental.OCITimeInstrumentation
    • Hack required for testing using Spark on Kubernetes: copy oci-hdfs-connector-2.9.2.6.jar built from this fork into $SPARK_HOME/jars copy and install the relevant [oracle/oci-java-sdk release jars](oci-java-sdk release jars): version 1.17.5: oci-java-sdk-full-1.17.5.jar and related third party jars.
    --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.1 \ # Note for K8S rather add this to the container \
    --conf spark.hadoop.fs.oci.client.auth.pemfilepath="<PATH>/oci_api_key.pem" \
    --conf spark.hadoop.fs.oci.client.auth.tenantId=ocid1.tenancy.oc1..TENNANT_KEY \
    --conf spark.hadoop.fs.oci.client.auth.userId=ocid1.user.oc1.USER_KEY \
    --conf spark.hadoop.fs.oci.client.auth.fingerprint=<fingerprint_here> \
    --conf spark.hadoop.fs.oci.client.hostname="https://objectstorage.REGION_NAME.oraclecloud.com" \
    
    • Metrics implemented (gauges), with prefix ch.cern.experimental.OCITimeInstrumentation:
      • OCIReadTimeMuSec
      • OCISeekTimeMuSec
      • OCICPUTimeDuringReadMuSec
      • OCICPUTimeDuringSeekMuSec
      • OCIReadTimeMinusCPUMuSec
      • OCISeekTimeMinusCPUMuSec
      • OCIBytesRead