spark-commons
is a library offering commonly needed routines, classes and functionality. It consists of three modules.
- spark-commons-spark2.4
- spark-commons-spark3.2
- spark-commons-spark3.3
- spark-commons-test
spark2-commons and spark3-commons both offer the same logic for the respective major versions of Spark addressing usual needs of Spark applications.
spark-commons-test then brings routines to help in testing Spark applications (and it's independent of Spark version used)
spark-commons-spark2.4 | spark-commons-spark3.2 | spark-commons-spark3.3 | spark-commons-test | |
---|---|---|---|---|
Scala 2.11 | ||||
Scala 2.12 |
A trait that when is mixed with another QueryExecutionListener
implementation,
makes sure the later is not called with any fatal exception.
val myListener = new MyQueryExecutionListener with NonFatalQueryExecutionListenerAdapter
spark.listenerManager.register(myListener)
A trait that brings Spark version independent implementation of transform
function.
SchemaUtils provides methods for working with schemas, its comparison and alignment.
-
Extracts the parent path of a field. Returns an empty string if a root level column name is provided.
SchemaUtils.getParentPath(columnName)
-
Extracts the field name of a fully qualified column name.
SchemaUtils.stripParentPath(columnName)
-
Get paths for all array subfields of this given datatype.
SchemaUtils.getAllArraySubPaths(other)
-
For a given list of field paths determines if any path pair is a subset of one another.
SchemaUtils.isCommonSubPath(paths)
-
Append a new attribute to path or empty string.
SchemaUtils.appendPath(path, fieldName)
-
Separates the field name components of a fully qualified column name as their hierarchy goes from root down to the deepest one.
SchemaUtils.splitPath(columnName, keepEmptyFields = True)
Json Utils provides methods for working with Json, both on input and output.
-
Create a Spark DataFrame from a JSON document(s).
JsonUtils.getDataFrameFromJson(json) JsonUtils.getDataFrameFromJson(json, schema)(implicit spark)
-
Creates a Spark Schema from a JSON document(s).
JsonUtils.getSchemaFromJson(json)
ColumnImplicits provide implicit methods for transforming Spark Columns
-
Transforms the column into a boolean column, checking if values are negative or positive infinity
column.isInfinite()
-
Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java. The provided starting position where to start the substring from, if negative it will be counted from end
column.zeroBasedSubstr(startPos)
-
Returns column with requested substring. It shifts the substring indexation to be in accordance with Scala/ Java. If the provided starting position where to start the substring from is negative, it will be counted from end. The length of the desired substring, if longer then the rest of the string, all the remaining characters are taken.
column.zeroBasedSubstr(startPos, length)
StructFieldImplicits provides implicit methods for working with StructField objects.
Of them, metadata methods are:
-
Gets the metadata Option[String] value given a key
structField.metadata.getOptString(key)
-
Gets the metadata Char value given a key if the value is a single character String, it returns the char, otherwise None
structField.metadata.getOptChar(key)
-
Gets the metadata boolean value of a given key, given that it can be transformed into boolean
structField.metadata.getStringAsBoolean(key)
-
Checks the structfield if it has the provided key, returns a boolean
structField.metadata.hasKey(key)
ArrayTypeImplicits provides implicit methods for working with ArrayType objects.
-
Checks if the arraytype is equivalent to another
arrayType.isEquivalentArrayType(otherArrayType)
-
For an array of arrays, get the final element type at the bottom of the array
arrayType.getDeepestArrayType()
DataTypeImplicits provides implicit methods for working with DataType objects.
-
Checks if the datatype is equivalent to another
dataType.isEquivalentDataType(otherDt)
-
Checks if a casting between types always succeeds
dataType.doesCastAlwaysSucceed(otherDt)
-
Checks if type is primitive
dataType.isPrimitive()
StructTypeImplicits provides implicit methods for working with StructType objects.
-
Get a field from a text path
structType.getField(path)
-
Get a type of a field from a text path
structType.getFieldType(path)
-
Checks if the specified path is an array of structs
structType.isColumnArrayOfStruct(path)
-
Get nullability of a field from a text path
structType.getFieldNullability(path)
-
Checks if a field specified by a path exists
structType.fieldExists(path)
-
Get paths for all array fields in the schema
structType.getAllArrayPaths()
-
Get a closest unique column name
structType.getClosestUniqueName(desiredName)
-
Checks if a field is the only field in a struct
structType.isOnlyField(columnName)
-
Checks if 2 structtypes are equivalent
structType.isEquivalent(other)
-
Returns a list of differences in one utils to the other
structType.diffSchema(otherSchema, parent)
-
Checks if a field is of the specified type
structType.isOfType[ArrayType](path)
-
Checks if a field is a subset of the specified type
structType.isSubset(other)
-
Returns data selector that can be used to align utils of a data frame.
structType.getDataFrameSelector()
-
Get first array column's path out of complete path
structType.getFirstArrayPath(path)
-
Get all array columns' paths out of complete path.
structType.getAllArraysInPath(path)
-
For a given list of field paths determines the deepest common array path
structType.getDeepestCommonArrayPath(fieldPaths)
-
For a field path determines the deepest array path
structType.getDeepestArrayPath(path)
-
Checks if a field is an array that is not nested in another array
structType.isNonNestedArray(path)
- Changes the fields structure of the DataFrame to adhere to the provided schema or selector. Data types remain intact
dataFrame.alignSchema
- Persist this Dataset with the default storage level, avoiding the warning in case the cache has happened already before
dataFrame.cacheIfNotCachedYet()
- Get the string representation of the data in the format as
Dataset.show()
]]` displays them
dataFrame.dataAsString()
- Adds a column to a dataframe if it does not exist
dataFrame.withColumnIfDoesNotExist(path)
- Casts all
NullType
fields of the DataFrame to their corresponding types in targetSchema.
dataFrame.enforceTypeOnNullTypeFields(targetSchema)
A class which checks if the Spark job version is compatible with the Spark Versions supported by the library
Default mode checking
SparkVersionGuard.fromDefaultSparkCompatibilitySettings.ensureSparkVersionCompatibility(SPARK_VERSION)
Checking for 2.X versions
SparkVersionGuard.fromSpark2XCompatibilitySettings.ensureSparkVersionCompatibility(SPARK_VERSION)
Checking for 3.X versions
SparkVersionGuard.fromSpark3XCompatibilitySettings.ensureSparkVersionCompatibility(SPARK_VERSION)
Abstract class to help attach/register UDFs and similar object only once to a spark session.
Usage: Extend this abstract class and implement the method register
. On initialization the register
method gets
executed only if the class + spark session combination is unique.
This way we ensure only single registration per spark session.
DataFrameImplicits provides methods for transformations on Dataframes
-
Getting the string of the data of the dataframe in similar fashion as the
show
function present them.df.dataAsString() df.dataAsString(truncate) df.dataAsString(numRows, truncate) df.dataAsString(numRows, truncateNumber) df.dataAsString(numRows, truncate, vertical)
-
Adds a column to a dataframe if it does not exist. If it exists, it will apply the provided function
df.withColumnIfDoesNotExist((df: DataFrame, _) => df)(colName, colExpression)
-
Aligns the utils of a DataFrame to the selector for operations where utils order might be important (e.g. hashing the whole rows and using except)
df.alignSchema(structType)
df.alignSchema(listColumns)
-
Similarly to
col
function evaluates the column based on the provided column name. But here, it can be a full path even of nested fields. It also evaluates arrays and maps where the array index or map key is in brackets[]
.def col_of_path(fullColName: String): Column
-
Provides a column of NULL values.
def nul_coll(): Column
-
Provides a column of NULL values, but the actual type is per specification
def nul_coll(dataType: DataType): Column
A trait
and a set of supporting classes and other traits to enable errors channeling between libraries and
application during Spark data processing.
-
It has an implicit dataFrame for easier usage of the methods provided by the error handler trait.
-
It provides four basic implementations
- ErrorHandlerErrorMessageIntoArray - An implementation of error handler trait that collects errors into columns of struct based on [za.co.absa.spark.commons.errorhandler.ErrorMessage ErrorMessage] case class.
- ErrorHandlerFilteringErrorRows - An implementation of error handler that implements the functionality of filtering rows that have some error (any of the error columns is not NULL).
- ErrorHandlerIgnoringErrors - An implementation of error handler trait that ignores the errors detected during the dataFrame error aggregation
- ErrorHandlerThrowingException - An implementation of error handler trait that throws an exception on error detected.
class MyTest extends SparkTestBase {
}
By default, it will instantiate a local Spark. There is also the possibility to use it in yarn mode:
class MyTest extends SparkTestBase {
override lazy val spark: SparkSession = initSpark(new YarnSparkConfiguration(confDir, distJarsDir))
}
sbt jacoco
Code coverage will be generated on path:
{project-root}/spark-commons/target/spark{spark_version}-jvm-{scala_version}/jacoco/report/html
{project-root}/spark-commons-test/target/jvm-{scala_version}/jacoco/report/html
Please see this file for more details.