flink-extended / flink-scala-api

Apache License 2.0 GitHub

Flink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version

Scala versions: 3.x 2.13 2.12

Scala 2.12/2.13/3.x API for Apache Flink

CI Status Maven Central License: Apache 2 Last commit Last release

This project is a community-maintained fork of official Apache Flink Scala API, cross-built for scala 2.12, 2.13 and 3.x.


New magnolia-based serialization framework

Official Flink's serialization framework has two important drawbacks complicating the upgrade to Scala 2.13+:

  • it used a complicated TypeInformation derivation macro, which required a complete rewrite to work on Scala 3.
  • for serializing a Traversable[_] it serialized an actual scala code of the corresponding CanBuildFrom[_] builder, which was compiled and executed on deserialization. There is no more CanBuildFrom[_] on Scala 2.13+, so there is no easy way of migration

This project relies on the Flink-ADT library to derive serializers for all types with the following perks:

  • ADT support: so your sealed trait members won't fall back to extremely slow Kryo serializer
  • case objects: no more problems with None
  • uses implicits (and typeclasses in Scala 3) to customize the serialization

But there are some drawbacks:

  • Savepoints written using Flink's official serialization API are not compatible, so you need to re-bootstrap your job from scratch.
  • As serializer derivation happens in a compile-time and uses zero runtime reflection, for deeply-nested rich case classes the compile times are quite high.

See Flink-ADT readme for more details.

Using a POJO-only Flink serialization framework

If you don't want to use a Flink-ADT for serialization for some reasons, you can always fall back to a flink's POJO serializer, explicitly calling it:

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api._

implicit val intInfo: TypeInformation[Int] = TypeInformation.of(classOf[Int]) // explicit call

val env = StreamExecutionEnvironment.getExecutionEnvironment
  .fromElements(1, 2, 3)
  .map(x => x + 1)

With this approach:

  • savepoint compatibility between this and official Flink API
  • slower serialization type due to frequent Kryo fallback
  • larger savepoint size (again, due to Kryo)

Closure cleaner from Spark 3.x

Flink historically used quite an old forked version of the ClosureCleaner for scala lambdas, which has some minor compatibility issues with Java 17 and Scala 2.13+. This project uses a more recent version, hopefully with less compatibility issues.

No Legacy DataSet API

Sorry, but it's already deprecated and as a community project we have no resources to support it. If you need it, PRs are welcome.


flink-scala-api uses a different package name for all api-related classes like DataStream, so you can do gradual migration of a big project and use both upstream and this versions of scala API in the same project.

The actual migration should be straightforward and simple, replace old import to the new ones:

// original api import
import org.apache.flink.streaming.api.scala._

// flink-scala-api imports
import org.apache.flink.api._
import io.findify.flinkadt.api._


flink-scala-api is released to Maven-central for 2.12, 2.13 and 3. For SBT, add this snippet to build.sbt:

libraryDependencies += "org.flinkextended" %% "flink-scala-api" % ""

For Ammonite:

import $ivy.`org.flinkextended::flink-scala-api:`
// you might need flink-client too in order to run in the REPL
import $ivy.`org.apache.flink:flink-clients:1.16.1`

Flink version notes:

  • flink-scala-api contains Flink version in its onw version to help users to find right version for their Flink based project
  • First three numbers correspond to Flink Version, for example 1.16.1
  • Last forth digit is an internal project build version. You should just use the last build number in your dependency configuration.

We suggest to remove the official flink-scala and flink-streaming-scala dependencies altogether to simplify the migration and do not to mix two flavors of API in the same project. But it's technically possible and not required.

Scala 3

Scala 3 support is highly experimental and not well-tested in production. Good thing is that most of the issues are compile-time, so quite easy to reproduce. If you have issues with flink-adt not deriving TypeInformation[T] for the T you want, submit a bug report!

Compile times

They may be quite bad for rich nested case classes due to compile-time serializer derivation. Derivation happens each time flink-scala-api needs an instance of the TypeInformation[T] implicit/type class:

import org.apache.flink.api._
import io.findify.flinkadt.api._

case class Foo(x: Int) {
  def inc(a: Int) = copy(x = x + a)

val env = StreamExecutionEnvironment.getExecutionEnvironment
  .fromElements(Foo(1), Foo(2), Foo(3))
  .map(x => x.inc(1)) // here the TypeInformation[Foo] is generated
  .map(x => x.inc(2)) // generated one more time again

If you're using the same instances of data structures in multiple jobs (or in multiple tests), consider caching the derived serializer in a separate compile unit and just importing it when needed:

import org.apache.flink.api._
import org.apache.flink.api.common.typeinfo.TypeInformation
import io.findify.flinkadt.api._

// file FooTypeInfo.scala
object FooTypeInfo {
  lazy val fooTypeInfo: TypeInformation[Foo] = deriveTypeInformation[Foo]

// file SomeJob.scala
case class Foo(x: Int) {
  def inc(a: Int) = copy(x = x + a)

import FooTypeInfo._

val env = StreamExecutionEnvironment.getExecutionEnvironment
  .map(x => x.inc(1)) // taken as an implicit
  .map(x => x.inc(2)) // again, no re-derivation


Define two environment variables before starting SBT shell:

export SONATYPE_USERNAME=<your user name for Sonatype>
export SONATYPE_PASSWORD=<your password for Sonatype> 

Release new version:

RELEASE_VERSION_BUMP=true sbt test 'release with-defaults'

Increment to next SNAPSHOT version and push to Git server:

RELEASE_PUBLISH=true sbt 'release with-defaults'


This project is using parts of the Apache Flink codebase, so the whole project is licensed under an Apache 2.0 software license.