benfradet / struct-type-encoder

Deriving Spark DataFrame schemas from case classes

GitHub

struct-type-encoder

Build Status Join the chat at https://gitter.im/struct-type-encoder/Lobby Maven Central Stories in Ready

Deriving Spark DataFrame schemas from case classes.

Installation

struct-type-encoder is available on maven central with the following coordinates:

"com.github.benfradet" %% "struct-type-encoder" % "0.5.0"

Motivation

When reading a DataFrame/Dataset from a data source the schema of the data has to be inferred. In practice, this translates into looking at every record of all the files and coming up with a schema that can satisfy every one of these records, as shown here for JSON.

As anyone can guess, this can be a very time-consuming task, especially if you know in advance the schema of your data. A common pattern is to do the following:

case class MyCaseClass(a: Int, b: String, c: Double)
val inferred = spark
  .read
  .json("/some/dir/*.json")
  .as[MyCaseClass]

In this case, there is no need to spend time inferring the schema as the DataFrame is directly converted to a Dataset of MyCaseClass. However, it can be a lot of boilerplate to bypass the inference by specifying your own schema.

import org.apache.spark.sql.types._
val schema = SructType(
  StructField("a", IntegerType) ::
  StructField("b", StringType) ::
  StructField("c", DoubleType) :: Nil
)
val specified = spark
  .read
  .schema(schema)
  .json("/some/dir/*.json")
  .as[MyCaseClass]

struct-type-encoder derives instances of StructType (how Spark represents a schema) from your case class automatically:

import ste.StructTypeEncoder
import ste.StructTypeEncoder._
val derived = spark
  .read
  .schema(StructTypeEncoder[MyCaseClass].encode)
  .json("/some/dir/*.json")
  .as[MyCaseClass]

No inference, no boilerplate!

Additional features

Spark Metadata support

It is possible to add Metada information to StructFields with the Meta annotation:

import org.apache.spark.sql.types._
import ste._

val metadata = new MetadataBuilder()
  .putLong("foo", 10)
  .putString("bar", "baz")
  .build()

case class Foo(a: String, @Meta(metadata) b: Int)

Flattening schemas

Using the ste.Flatten annotation we can eliminate repetitions from case class definitions. Take the following example:

import ste._
case class Foo(a: String, b: Int)
case class Bar(@Flatten(2) a: Seq[Foo], @Flatten(1, Seq("x", "y")) b: Map[String, Foo], @Flatten c: Foo)

StructTypeEncoder[Bar].encode

The derived schema is the following:

StructType(
  StructField("a.0.a", StringType, false) ::
  StructField("a.0.b", IntegerType, false) ::
  StructField("a.1.a", StringType, false) ::
  StructField("a.1.b", IntegerType, false) ::
  StructField("b.x.a", StringType, false) ::
  StructField("b.x.b", IntegerType, false) ::
  StructField("b.y.a", StringType, false) ::
  StructField("b.y.b", IntegerType, false) ::
  StructField("c.a", StringType, false) ::
  StructField("c.b", IntegerType, false) :: Nil
)

Now we want to read our data source with a flat schema:

import ste.StructTypeEncoder
import ste.StructTypeEncoder._
val df = spark
  .read
  .schema(StructTypeEncoder[Bar].encode)
  .csv("/some/dir/*.csv")

struct-type-encoder can derive the nested projection of a Dataframe and convert it to a Dataset by providing the class:

import StructTypeSelector._

val ds: Dataset[Bar] = df.asNested[Bar]

Benchmarks

This project includes JMH benchmarks to prove that inferring schemas and coming up with the schema satisfying all records is expensive. The benchmarks compare the average time spent parsing a thousand files each containing a hundred rows when the schema is inferred (by Spark, not user-specified) and derived (thanks to struct-type-encoder).

derived inferred
CSV 5.936 ± 0.035 s 6.494 ± 0.209 s
JSON 5.092 ± 0.048 s 6.019 ± 0.049 s

We see that when deriving the schemas we spend 16.7% less time reading JSON data and a 8.98% for CSV.