nickburkard / flink-type-info   0.1.1

GitHub

Derive Flink type information with Shapeless

Scala versions: 2.12 2.11

Flink Type Info

flink-type-info replaces the default macro based implicit provider for TypeInformation[A] in Apache Flink's Scala API with automatic type class derivation based on Shapeless. It is a fork of flink-shapeless built for modern Flink & Scala versions.

Versions

Flink: 1.15

Scala: 2.13

Note

Flink 1.15 is not out yet, but will support arbitrary Scala versions. This project is now aiming to support Scala 2.13 & 3 on Flink. Lower versions are ignored.

This will not work on lower versions of Flink, since 1.14 and below force a specific Scala version (2.11 or 2.12) per cluster.

Usage

The primary use case of flink-type-info is to enable custom implicit TypeInformation instances in scope to override the default.

// TypeInformation auto-derivation.
import io.burkard.flink.derived.auto._

// Override TypeInformation[String]
implicit val strTypeInfo: TypeInformation[String] = MyASCIIStringTypeInfo

// Strings below are serialized with ASCII encoding,
// even when nested in tuples, data structures, etc.
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("/path/to/file")
val counts = text
  .flatMap(_.toLowerCase.split("\\W+"))
  .filter(_.nonEmpty)
  .map(_ -> 1)
  .groupBy(0)
  .sum(1)

Features

There are a couple of advantages to automatic type class derivation over the default macro based approach.

Customizability

Automatic derivation uses a modified version of the Scala implicit resolution mechanism with lowest priority. Thus it can be overridden for specific types by providing an implicit instance anywhere in scope, including in a companion object as idiomatic in Scala.

final case class Foo(x: Int)

object Foo {
  implicit val info: TypeInformation[Foo] = MyOptimizedFooTypeInfo
}

final case class Bar(foo: Foo, y: Double)

// All instances below use the optimized version.
implicitly[TypeInformation[Foo]]
implicitly[TypeInformation[List[Foo]]]
implicitly[TypeInformation[(Foo, Long)]]
implicitly[TypeInformation[Bar]]

Data Type Mappings

Creating custom serializers from scratch is usually not what you want to do. Most often, you want to map your custom data type to one with an existing serializer. This is where the Invariant[A, B] type class comes in. It represents an invariant relationship between two types. Given F[A], A => B and B => A, you can derive F[B].

import breeze.linalg.Vector

implicit def invariantVector[A]: Invariant[Array[A], Vector[A]] =
  Invariant(Vector(_), _.toArray)

Recursive ADTs

The default macro based implementation cannot handle Recursive data types or Coproducts without the use of reflection based serializers like Kryo. Only product types (tuples and case classes) are handled natively.

flink-type-info extends the native Flink support to arbitrary Algebraic data types (ADTs) and will fail at compile time rather than default to runtime reflection. In Scala ADTs are encoded as sealed traits and case classes.

// Example: Recursive product
final case class NTree[+A](v: A, children: List[NTree[A]])

// Example: Recursive coproduct
sealed trait BTree[+A]
case object BLeaf extends BTree[Nothing]
final case class BNode[+A](l: BTree[A], v: A, r: BTree[A]) extends BTree[A]

Limitations

There are a few well known limitations of automatic type class derivation with Shapeless.

  • Long compile times for large case classes and sealed trait hierarchies. Your mileage may vary.