DatasetLogger allows to investigate Spark Dataset (and DataFrame) content during job execution without messing the code
or performance. With configuration provided on launch you can track specific records and investigate tables at
different stages of your application.
Customer with ID
10101have value12.34instead of43.21in fieldXand customer98989is not available in the output
that's kind of message we all hate to hear. DatasetLogger was designed to address such investigation problems - it's
capable of tracking particular records in any Dataset/DataFrame in your Spark app without any extra development and deployment.
All you need to do is to launch your job with configuration specifying what values should be logged during execution -
e.g. all the records with customerId in [10101, 98989].
dataset-logger package is published to Maven Central and cross-built for Scala
versions 2.12 and 2.13. To add the package to your project include it in dependencies, e.g. add:
libraryDependencies += "io.datumo" %% "dataset-logger" % "0.1.0"to build.sbt if your project is managed with SBT. An example of full build.sbt can be found here
Extend your class with DatasetLogger, or initialize an instance of it, to use its features. DatasetLogger takes single parameter - either String or
Option[String] containing logger's configuration in JSON format. circe library is used
for parsing.
Logger should be invoked on demand, with specific configuration for each usage. You may pass it as an optional spark-submit
command line argument or load it from a file, transformed to a Scala object (String or Option[String]) at the beginning
of application's execution. Afterwards it should be used to initialize DatasetLogger.
logDataset method is an entry point for the whole magic. It takes a Dataset/DataFrame as one of the arguments and filters
it with columns/values specified in the configuration. You may also specify a SQL query that will be executed on your data,
with dataset denoted as the table name.
It's recommended to call logDataset inside logger.debug() method of your job's logger. Thanks to Scala macros used by most
logging libraries, the debug messages are evaluated only when they are invoked during application execution with logging
level set to DEBUG. Therefore, expensive Spark actions needed to collect and present the data by DatasetLogger are not
executed during regular app run. The Scala macros exclude these lines from test coverage statistics.
Check example job for JSON with configuration and class' usage reference.
| parameter | type | required | description | 
|---|---|---|---|
| message | String | 
yes | log message to be printed at the beginning, it's also the only content produced when configuration is not passed. | 
| dataset | Dataset[A] | 
yes | Spark Dataset/DataFrame to be inspected. | 
| messageId | String | 
yes | allows identifying specific logDataset call in your job, so you can specify some extra actions for particular step. | 
| maxRowsNum | Int | 
no | how many records can be logged on the output. Default: 100 | 
| cacheLevel | Option[StorageLevel] | 
no | specifies whether the Dataset should be persisted and if so - which storage level should be used for caching the data. Default: Some(StorageLevel.MEMORY_AND_DISK) | 
The last, optional argument - cacheLevel: Option[StorageLevel] specifies how the passed dataset should be cached. As computations
performed within logDataset contain Spark actions (collecting matching records, counting their size, running extra query)
and most probably dataset is used by further transformations, the Dataset/DataFrame should be cached to avoid repeated computations.
cacheLevel specifies storage approach used for caching (MEMORY_AND_DISK by default). If for some reason you prefer not to
cache dataset, you should set this argument to None.
val notCachedDf: DataFrame = ...
logger.debug(
  logDataset("Constructed dataset that shouldn't be cached", notCachedDf, "not_cached", cacheLevel = None)
)
val diskOnlyCachedDf: DataFrame = ...
logger.debug(
  logDataset("Constructed dataset that should be cached on disk only",  diskOnlyCachedDf, "disk_cached", cacheLevel = Some(DISK_ONLY))
)The JSON with DatasetLogger configuration should contain following params:
columnFiltersConfig- list of objects representing column and its values used to filter a Dataset/DataFrame. Each object should contain two key/value pairs -columnNamewith a string specifying the column andcolumnValues- list of strings representing values used for table's content filtering. Example:
"columnFiltersConfig": [
  {
    "columnName": "customerId",
    "columnValues": ["123", "456"]
  }
]extraQuery(optional) - mapping/object that allows to run a SQL query on the Dataset/DataFrame.datasetis the name used to denote your table. The query (value inextraQuery) is executed duringlogDatasetcall with specifiedmessageIdpassed asextraQuerykey. Example of query executed forlogDatasetwithmessageId = "customerCountries"":
"extraQuery": {
  "customerCountries": "SELECT customerType, COUNT(*) FROM dataset WHERE customerCountry IN ['PL', 'US'] GROUP BY customerType"
}logOnly(optional) - List ofmessageIdvalues that should be the only invokedlogDatasetcalls (for all the others onlymessageis displayed). When not specified all the method's call are fully evaluated.logOnlyis especially useful when you have a lot oflogDatasetcalls, but you are interested only in a specific step of the job, so executing all the others (each of them taking some time to run Spark actions) is redundant.
"logOnly": ["id1", "id14", "id18"]columnRenameMapping(optional) - Useful when schema of the Dataset/DataFrame evolves (e.g. after joining Datasets or performing some.select/.withColumnRenamedon a DataFrame) andcolumnNamespecified incolumnFiltersConfigis not valid on particular stage. Let's say that in thecolumnFiltersConfigyou configure to trackcustomerIdcolumn during your job. You use Dataset API and at some point you join two of them withval dataset3 = dataset1.joinWith(dataset2, ...). If you passdataset3tologDatasetthen you won't be able to findcustomerIdcolumn in it. However,dataset3has this value denoted as e.g._1.customerId. Similar case could happen when your job uses DataFrame API and you callval df2 = df1.select($"customerId" as "cId", ...)- to trackcustomerIdvalues fordf2logDatasetneeds to filter oncIdcolumn.columnRenameMappingis meant to specify such column name changes forlogDatasetspecified with particularmessageId. Example:
"columnRenameMapping": {
  "orders": {
    "customerId": "cId",
    "orderId": "oId"
  },
  "delivery": {
    "customerId": "cId",
    "orderId": "orderFinalId"
  }
}