spark-commons

Build

spark-commons is a library offering commonly needed routines, classes and functionality. It consists of three modules.

  • spark2-commons
  • spark3-commons
  • spark-commons-test

spark2-commons and spark3-commons both offer the same logic for the respective major versions of Spark addressing usual needs of Spark applications.

spark-commons-test then brings routines to help in testing Spark applications (and it's independent of Spark version used)

spark2-commons spark3-commons spark-commons-test
Scala 2.11 Maven Central Maven Central
Scala 2.12 Maven Central Maven Central Maven Central

SparkX-Commons

NonFatalQueryExecutionListenerAdapter

A trait that when is mixed with another QueryExecutionListener implementation, makes sure the later is not called with any fatal exception.

See AbsaOSS/commons#50

val myListener = new MyQueryExecutionListener with NonFatalQueryExecutionListenerAdapter
spark.listenerManager.register(myListener)

TransformAdapter

A trait that brings Spark version independent implementation of transform function.

SchemaUtils

SchemaUtils provides methods for working with schemas, its comparison and alignment.

  1. Returns the parent path of a field. Returns an empty string if a root level field name is provided.

      SchemaUtils.getParentPath(columnName)
  2. Get paths for all array subfields of this given datatype

      SchemaUtils.getAllArraySubPaths(other)
  3. For a given list of field paths determines if any path pair is a subset of one another.

      SchemaUtils.isCommonSubPath(paths)
  4. Append a new attribute to path or empty string.

      SchemaUtils.appendPath(path, fieldName)

JsonUtils

Json Utils provides methods for working with Json, both on input and output.

  1. Create a Spark DataFrame from a JSON document(s).

      JsonUtils.getDataFrameFromJson(json)
      JsonUtils.getDataFrameFromJson(json, schema)(implicit spark)
  2. Creates a Spark Schema from a JSON document(s).

      JsonUtils.getSchemaFromJson(json)

ColumnImplicits

ColumnImplicits provide implicit methods for transforming Spark Columns

  1. Transforms the column into a booleaan column, checking if values are negative or positive infinity

      column.isInfinite()
  2. Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java. The provided starting position where to start the substring from, if negative it will be counted from end

      column.zeroBasedSubstr(startPos)
  3. Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java. If the provided starting position where to start the substring from is negative, it will be counted from end. The length of the desired substring, if longer then the rest of the string, all the remaining characters are taken.

      column.zeroBasedSubstr(startPos, length)

StructFieldImplicits

StructFieldImplicits provides implicit methods for working with StructField objects.

Of them, metadata methods are:

  1. Gets the metadata Option[String] value given a key

      structField.metadata.getOptString(key)
  2. Gets the metadata Char value given a key if the value is a single character String, it returns the char, otherwise None

      structField.metadata.getOptChar(key)
  3. Gets the metadata boolean value of a given key, given that it can be transformed into boolean

      structField.metadata.getStringAsBoolean(key)
  4. Checks the structfield if it has the provided key, returns a boolean

      structField.metadata.hasKey(key)

ArrayTypeImplicits

ArrayTypeImplicits provides implicit methods for working with ArrayType objects.

  1. Checks if the arraytype is equivalent to another

      arrayType.isEquivalentArrayType(otherArrayType)
  2. For an array of arrays, get the final element type at the bottom of the array

      arrayType.getDeepestArrayType()

DataTypeImplicits

DataTypeImplicits provides implicit methods for working with DataType objects.

  1. Checks if the datatype is equivalent to another

      dataType.isEquivalentDataType(otherDt)
  2. Checks if a casting between types always succeeds

      dataType.doesCastAlwaysSucceed(otherDt)
  3. Checks if type is primitive

      dataType.isPrimitive()

StructTypeImplicits

StructTypeImplicits provides implicit methods for working with StructType objects.

  1. Get a field from a text path

      structType.getField(path)
  2. Get a type of a field from a text path

      structType.getFieldType(path)
  3. Checks if the specified path is an array of structs

      structType.isColumnArrayOfStruct(path)
  4. Get nullability of a field from a text path

      structType.getFieldNullability(path)
  5. Checks if a field specified by a path exists

      structType.fieldExists(path)
  6. Get paths for all array fields in the schema

      structType.getAllArrayPaths()
  7. Get a closest unique column name

      structType.getClosestUniqueName(desiredName)
  8. Checks if a field is the only field in a struct

      structType.isOnlyField(columnName)
  9. Checks if 2 structtypes are equivalent

      structType.isEquivalent(other)
  10. Returns a list of differences in one utils to the other

      structType.diffSchema(otherSchema, parent)
  11. Checks if a field is of the specified type

      structType.isOfType[ArrayType](path)
  12. Checks if a field is a subset of the specified type

          structType.isSubset(other)
  13. Returns data selector that can be used to align utils of a data frame.

          structType.getDataFrameSelector()

StructTypeArrayImplicits

  1. Get first array column's path out of complete path

      structType.getFirstArrayPath(path)
  2. Get all array columns' paths out of complete path.

      structType.getAllArraysInPath(path)
  3. For a given list of field paths determines the deepest common array path

      structType.getDeepestCommonArrayPath(fieldPaths)
  4. For a field path determines the deepest array path

      structType.getDeepestArrayPath(path)
  5. Checks if a field is an array that is not nested in another array

      structType.isNonNestedArray(path)

DataFrameImplicits

  1. Changes the fields structure of the DataFrame to adhere to the provided schema or selector. Data types remain intact
  dataFrame.alignSchema
  1. Persist this Dataset with the default storage level, avoiding the warning in case the cache has happened already before
  dataFrame.cacheIfNotCachedYet()
  1. Get the string representation of the data in the format as Dataset.show()]]` displays them
  dataFrame.dataAsString()
  1. Adds a column to a dataframe if it does not exist
  dataFrame.withColumnIfDoesNotExist(path)

Spark Version Guard

A class which checks if the Spark job version is compatible with the Spark Versions supported by the library

Default mode checking

SparkVersionGuard.fromDefaultSparkCompatibilitySettings.ensureSparkVersionCompatibility(SPARK_VERSION)

Checking for 2.X versions

SparkVersionGuard.fromSpark2XCompatibilitySettings.ensureSparkVersionCompatibility(SPARK_VERSION)

Checking for 3.X versions

SparkVersionGuard.fromSpark3XCompatibilitySettings.ensureSparkVersionCompatibility(SPARK_VERSION)

DataFrameImplicits

DataFrameImplicits provides methods for transformations on Dataframes

  1. Getting the string of the data of the dataframe in similar fashion as the show function present them.

          df.dataAsString() 
      
          df.dataAsString(truncate)
      
          df.dataAsString(numRows, truncate)
    
          df.dataAsString(numRows, truncateNumber)
      
          df.dataAsString(numRows, truncate, vertical)
  2. Adds a column to a dataframe if it does not exist. If it exists, it will apply the provided function

       df.withColumnIfDoesNotExist((df: DataFrame, _) => df)(colName, colExpression)
  3. Aligns the utils of a DataFrame to the selector for operations where utils order might be important (e.g. hashing the whole rows and using except)

       df.alignSchema(structType)
       df.alignSchema(listColumns)

Spark Commons Test

Usage:

class MyTest extends SparkTestBase {
}

By default, it will instantiate a local Spark. There is also the possibility to use it in yarn mode:

class MyTest extends SparkTestBase {
override lazy val spark: SparkSession = initSpark(new YarnSparkConfiguration(confDir, distJarsDir))
}