Tiny library improves Spark's dataset join API by allowing you to specify join columns with lambdas instead of strings, ensuring typesafety and allows you using autocomplete features of your IDE. Also improves unit-testing experience of (some) Spark transformations
scalacOptions += "-Ydelambdafy:inline"
libraryDependencies += "io.github.salamahin" %% "joinwiz_core" % joinwiz_version
def doJoin(as: Dataset[A], bs: Dataset[B]): Dataset[(A, Option[B])] = {
import joinwiz.syntax._
import joinwiz.spark._
as.leftJoin(bs) {
case (left, right) => left(_.field) =:= right(_.field)
}
}
Note, that result has a type of Dataset[(A, Option[B])]
which means you won't get an NPE when would try a map it to a different type.
In addition the library checks if both left and right columns can be used in the joining expression, meaning they need to have
the comparable type.
You are not limited to equal join only, one can use >
, <
, &&
, consts and more
ComputationEngine
allows to make an abstraction over exact kind, which means it's possible to run the
code in 2 modes: with and without spark:
def foo[F[_]: ComputationEngine](as: F[A], bs: F[B]): F[C] = {
import joinwiz.syntax._
as
.innerJoin(bs) {
case (a, b) => a(_.field) =:= b(_.field)
}
.map {
case (a, b) => C(a, b)
}
}
def runWithSpark(as: Dataset[A], bs: Dataset[B]): Dataset[C] = {
import joinwiz.spark._
foo(as, bs)
}
//can be used in unit-testing
def runWithoutSpark(as: Seq[A], bs: Seq[B]): Seq[C] = {
import joinwiz.testkit._
foo(as, bs)
}
In case when several joins are made one-by-one it might be tricky to reference the exact column with a string identifier,
usually you would see something like _1._1._1.field
from left or right side.
With help of wiz
unapplication you can transform that to a nice lambdas:
def doSequentialJoin(as: Dataset[A],
bs: Dataset[B],
cs: Dataset[C],
ds: Dataset[D]): Dataset[(((A, Option[B]), Option[C]), Option[D])] = {
import joinwiz.syntax._
import joinwiz.spark._
as
.leftJoin(bs) {
case (a, b) => a(_.field) =:= b(_.field)
}
.leftJoin(cs) {
case (_ wiz b, c) => b(_.field) =:= c(_.field)
}
.leftJoin(ds) {
case (_ wiz _ wiz c, d) => c(_.field) =:= d(_.field)
}
}
Unapply can be used to extract a members from a product type even if the type of option kind
Assuming your case-class contains some nested structs, in such case you can still can use joinwiz to extract necessary column:
def doJoin[F[_]: ComputationEngine](as: F[A], bs: F[B]): F[(A, Option[B])] = {
import joinwiz.syntax._
as
.leftJoin(bs) {
case (left, right) => left >> (_.innerStruct) >> (_.field) =:= bs >> (_.field)
}
}
Operation >>
is an alias for apply
One can use UDF as a joining expressions
def doJoin[F[_] : ComputationEngine](as: F[A], bs: F[B]): F[(A, Option[B])] = {
import joinwiz.syntax._
as.leftJoin(bs) {
case (left, right) =>
udf(
left(_.field),
right(_.field)
)(_ + _ < 3)
}
}
To add a new window function one has to inherit joinwiz.window.WindowFunction
. After this can be used like following:
def addRowNumber[F[_]: ComputationEngine](as: F[A]): F[(A, Int)] = {
import joinwiz.syntax._
as.withWindow { window =>
window
.partitionBy(_.field1)
.partitionBy(_.field2)
.orderByAsc(_.field3)
.call(row_number)
}
}
ComputationEngine
provides syntax for generic operations like:
- inner/left outer/left anti joins
- map
- flatMap
- distinct
- groupByKey + mapGroups, reduceGroups, count, cogroup
- filter
- collect
You can find more examples of usage in the appropriate test