Data quality tools for Big Data
- Scala 2.11.7
- JDK 8
- Spark 2.1.1
See this video for in-depth coverage of the project
centrifuge will be published to Maven Central soon, so when it is done you will be able to get centrifuge with adding this to your build.sbt
:
val sparkV = "2.1.1"
libraryDependencies ++= Seq(
"io.univalence" %% "centrifuge" % "0.1"
)
We import relevant libraries and define 3 case classes
package io.univalence.centrifuge.sql
import io.univalence.centrifuge.{Annotation, AnnotationSql, Result}
import org.apache.spark.sql.SparkSession
import io.univalence.centrifuge.implicits._
import ss.implicits._
case class Person(name: String, age: Int)
case class PersonWithAnnotations(name: String, age: Int, annotations: Seq[Annotation])
case class BetterPerson(name:String,age:Option[Int])
Here we create 3 instances of Person
val joe = Person("Joe", 14)
val timmy = Person("Timmy",8)
val invalid = Person("",-1)
Making a Dataset from the 3 Person we just created
val ds = ss.sparkContext.makeRDD(Seq(joe,timmy,invalid)).toDS()
ds.createOrReplaceTempView("personRaw")
bla
ss.registerTransformation[Int,Int]("checkAge", {
case i if i < 0 => Result.fromError("INVALID_AGE")
case i if i < 13 => Result.fromWarning(i, "UNDER_13")
case i if i > 140 => Result.fromError("OVER_140")
case i => Result.pure(i)
})
Basic select
ss.sql("select name, checkAge(age) as age, age as ageRaw from personRaw").show(false)
Result
+-----+----+------+
|name |age |ageRaw|
+-----+----+------+
|Joe |14 |14 |
|Timmy|8 |8 |
| |null|-1 |
+-----+----+------+
This time using BetterPerson (a Person with Option)
ss.sql("select name, checkAge(age) as age from personRaw").as[BetterPerson].collect().foreach(println)
As expected
BetterPerson(Joe,Some(14))
BetterPerson(Timmy,Some(8))
BetterPerson(,None)
Now we include Annotations
ss.sql("select name, checkAge(age) as age from personRaw").includeAnnotations.show(false)
Result
+-----+----+--------------------------------------------+
|name |age |annotations |
+-----+----+--------------------------------------------+
|Joe |14 |[] |
|Timmy|8 |[[UNDER_13,age,WrappedArray(age),false,1]] |
| |null|[[INVALID_AGE,age,WrappedArray(age),true,1]]|
+-----+----+--------------------------------------------+
We now have annotations for each Person in the DataSet.
Let's take a look at what an Annotation is:
Annotation(message: String,
onField: Option[String] = None,
fromFields: Vector[String] = Vector.empty,
isError: Boolean,
count: Long)
- message contains a relevant information
- onField is the name of the field the message is from
- fromFields is
- isError is true if the data is not within acceptable values else it's false
- count is the number of time the value has appeared for the same field
Integrate a reworked version of Excelsius in order to have a better DataFrame.show()
To go from
+-----+----+--------------------------------------------+
|name |age |annotations |
+-----+----+--------------------------------------------+
|Joe |14 |[] |
|Timmy|8 |[[UNDER_13,age,WrappedArray(age),false,1]] |
| |null|[[INVALID_AGE,age,WrappedArray(age),true,1]]|
+-----+----+--------------------------------------------+
To
++-----+----+------------------------------------------------+
+|name |age |annotations |
+| | |message |onField |fromFields |isError |count|
++-----+----+------------+--------+-----------+--------+-----+
+|Joe |14 | --- |
+|Timmy|8 |UNDER_13 |age |age |false |1 |
+| |null|INVALID_AGE |age |age |true |1 |
+| ------ |EMPTY_STRING|name |name1 |true |1 |
+| ------ | --------- |name2 | ---- |
++-----+----+------------+--------+-----------+--------+-----+
"-" is a placeholder and a better symbol/ascii art will be used
centrifuge is licensed under the Apache License, Version 2.0 (the “License”); you may not use this software except in compliance with the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.