data-quality-monitoring

Data Quality Monitoring Tool for Big Data implemented using Spark

Build Status codecov.io License

Table of contents

Goals

  • Validate data using provided business rules
  • Log result
  • Send alerts

Getting started

Include dependency:

"com.github.piotr-kalanski" % "data-quality-monitoring_2.11" % "0.3.2"

or

<dependency>
    <groupId>com.github.piotr-kalanski</groupId>
    <artifactId>data-quality-monitoring_2.11</artifactId>
    <version>0.3.2</version>
</dependency>

Data quality monitoring process

Data quality monitoring process consists from below steps:

  • Load configuration with business rules
  • Run data validation
  • Log validation results
  • Send alerts

Load configuration

Configuration can be loaded from:

  • file
  • directory
  • RDBMS

Additionally there are plans to support:

  • Dynamo DB

Example configuration

tablesConfiguration = [
  {
    location = {type = Hive, table = clients}, // location of first table that should be validated
    rules = { // validation rules 
      rowRules = [ // validation rules working on single row level
        {
          field = client_id, // name of field that should be validated
          rules = [
            {type = NotNull}, // this field shouldn't be null
            {type = min, value = 0} // minimum value for this field is 0
          ]
        },
        {
          field = client_name,
          rules = [
            {type = NotNull} // this field shouldn't be null
          ]
        }
      ]
    }
  },
  {
    location = {type = Hive, table = companies}, // location of first table that should be validated
    rules = {
      rowRules = [
        {
          field = company_id, // name of field that should be validated
          rules = [
            {type = NotNull}, // this field shouldn't be null
            {type = max, value = 100} // maximum value for this field is 100
          ]
        },
        {
          field = company_name, // name of field that should be validated
          rules = [
            {type = NotNull} // this field shouldn't be null
          ]
        }
      ]
    }
  }
]

Load configuration from file

Use class: FileSingleTableConfigurationLoader or FileMultipleTablesConfigurationLoader.

Example:

import com.datawizards.dqm.configuration.loader.FileMultipleTablesConfigurationLoader
val configurationLoader = new FileMultipleTablesConfigurationLoader("configuration.conf")
configurationLoader.loadConfiguration()

Load configuration from directory

Use class: DirectoryConfigurationLoader.

One file should contain configuration for one table (TableConfiguration).

Load configuration from database

Use class: DatabaseConfigurationLoader.

One table row should contain configuration for one table (TableConfiguration).

Validation rules

Currently supported categories of data validation rules:

  • field rules - validating value of single field e.g.: not null, min value, max value
  • group rules - validating result of group by expression e.g.: expected groups (countries, types)
  • table trend rules - validating table trend rules e.g.: comparing current day row count vs previous day row count

Field rules

Field rules should be defined in section rules.rowRules:

tablesConfiguration = [
  {
    location = [...],
    rules = {
      rowRules = [
        {
          field = Field name,
          rules = [...]
        }
      ]
    }
  }
]

Supported field validation rules:

  • not null

    {type = NotNull}

  • dictionary

    {type = dict, values=[1,2,3]}

  • regex

    {type = regex, value = """\s.*"""}

  • min value

    {type = min, value = 0}

  • max value

    {type = max, value = 100}

Group rules

Group rules should be defined in section groups.rules:

tablesConfiguration = [
  {
    location = [...],
    rules = [...],
    groups = [
      {
        name = Group name,
        field = Group by field name,
        rules = [
          {
            type = NotEmptyGroups,
            expectedGroups = [c1,c2,c3,c4]
          }
        ]
      }
    ]
  }
]

Supported group validation rules:

  • not empty groups

    {type = NotEmptyGroups, expectedGroups = [c1,c2,c3,c4]}

Table trend rules

Table trend rules should be defined in section rules.tableTrendRules:

tablesConfiguration = [
  {
    location = [...],
    rules = {
      rowRules = [...],
      tableTrendRules = [
        {type = CurrentVsPreviousDayRowCountIncrease, tresholdPercentage = 20}
      ]
    }
  }
]

Supported table trends validation rules:

  • current vs previous day row count

    {type = CurrentVsPreviousDayRowCountIncrease, tresholdPercentage = 20}

Log validation results

Validation results can be logged into:

  • Elasticsearch using class ElasticsearchValidationResultLogger

    val logger = new ElasticsearchValidationResultLogger(
        esUrl = "http://localhost:9200", // Elasticsearch URL
        invalidRecordsIndexName = "invalid_records", // Index name where to store invalid records
        tableStatisticsIndexName = "table_statistics", // Index name where to store table statistics
        columnStatisticsIndexName = "column_statistics", // Index name where to store column statistics
        groupsStatisticsIndexName = "group_statistics", // Index name where to store group statistics
        invalidGroupsIndexName = "invalid_groups" // Index name where to store group statistics
    )
  • RDBMS using class DatabaseValidationResultLogger

    val logger = new DatabaseValidationResultLogger(
      driverClassName = "org.h2.Driver", // JDBC driver class name
      dbUrl = connectionString, // DB connection string
      connectionProperties = new Properties(), // JDBC connection properties, especially user and password
      invalidRecordsTableName = "INVALID_RECORDS", // name of table where to insert invalid records
      tableStatisticsTableName = "TABLE_STATISTICS", // name of table where to insert table statistics records
      columnStatisticsTableName = "COLUMN_STATISTICS", // name of table where to insert column statistics records
      groupsStatisticsTableName = "GROUP_STATISTICS", // name of table where to insert group by statistics records
      invalidGroupsTableName = "INVALID_GROUPS" // name of table where to insert invalid groups
    )

Send alerts

Alerts can be send to:

  • Slack using class SlackAlertSender

Additionally there are plans to support:

  • email

Full example

Example

import com.datawizards.dqm.configuration.loader.FileConfigurationLoader
import com.datawizards.dqm.logger.ElasticsearchValidationResultLogger
import com.datawizards.dqm.alert.SlackAlertSender
import com.datawizards.dqm.DataQualityMonitor

val configurationLoader = new FileConfigurationLoader("configuration.conf")
val esUrl = "http://localhost:9200"
val invalidRecordsIndexName = "invalid_records"
val tableStatisticsIndexName = "table_statistics"
val columnStatisticsIndexName = "column_statistics"
val groupsStatisticsIndexName = "group_statistics"
val invalidGroupsIndexName = "invalid_groups"
private val logger = new ElasticsearchValidationResultLogger(esUrl, invalidRecordsIndexName, tableStatisticsIndexName, columnStatisticsIndexName, groupsStatisticsIndexName, invalidGroupsIndexName)
val alertSender = new SlackAlertSender("webhook url", "Slack channel", "Slack user name")
val processingDate = new java.util.Date()
DataQualityMonitor.run(processingDate, configurationLoader, logger, alertSender)

configuration.conf:

tablesConfiguration = [
  {
    location = {type = Hive, table = clients},
    rules = {
      rowRules = [
        {
          field = client_id,
          rules = [
            {type = NotNull}
          ]
        }
      ]
    }
  }
]