FlinkRunner 4
is available on maven central
, built against Flink 1.19 with Scala 2.12 and JDK 11. You can add it to your sbt project:
libraryDependencies += "io.epiphanous" %% "flinkrunner" % <flinkrunner-version>
replacing <flinkrunner-version> with the currently released version
of FlinkRunner on maven
.
FlinkRunner is built to support many common data sources and sinks, including:
| Connector | Dependency | Source | Sink |
|---|---|---|---|
| file system | flink-connector-files | yes | yes |
| kafka | flink-connector-kafka | yes | yes |
| kinesis streams source | flink-connector-kinesis | yes | no |
| kinesis streams sink | flink-connector-aws-kinesis-streams | no | yes |
| kinesis firehose | flink-connector-aws-kinesis-firehose | no | yes |
| iceberg | iceberg-flink-runtime-<flink.minor.version> | yes | yes |
| cassandra | flink-connector-cassandra | no | yes |
| elasticsearch | flink-connector-elasticsearch7 | no | yes |
| jdbc | flink-connector-jdbc | no | yes |
| rabbit mq | flink-connector-rabbitmq | yes | yes |
You can add a dependency for a connector by dropping the library into flink's lib
directory during deployment of your jobs. You should make sure to match the library's
version with the compiled flink and scala versions of FlinkRunner.
To run tests locally in your IDE, add a connector library to your dependencies in provided scope, like this:
"org.apache.flink" % "flink-connector-kafka" % <flink-version> % Provided
replacing <flink-version> with the version of flink used in FlinkRunner.
S3 configuration is important for most flink usage scenarios. Flink has two different
implementations to support S3: flink-s3-fs-presto and flink-s3-fs-hadoop.
flink-s3-fs-prestois registered under the schemess3://ands3p://and is preferred for checkpointing to s3.flink-s3-fs-hadoopis registered under the schemess3://ands3a://and is required for using the streaming file sink.
During deployment, you should copy both s3 dependencies from flink's opt directory into
the plugins directory:
cd $FLINK_DIR
mkdir -p ./plugins/s3-fs-presto
cp ./opt/flink-s3-fs-presto-<flink-version>.jar .plugins/s3-fs-presto
mkdir -p ./plugins/s3-fs-hadoop
cp ./opt/flink-s3-fs-hadoop-<flink-version>.jar .plugins/s3-fs-hadoopreplacing <flink-version> with the version of flink used in FlinkRunner.
NOTE: Do not copy them into flink's
libdirectory, as this will not work! They need to be in their own, individual subdirectories of flink's deployedpluginsdirectory.
NOTE: You will also need to configure access from your job to AWS S3. That is outside the scope of this readme.
FlinkRunner supports reading and writing avro messages with kafka and file systems. For kafka sources and sinks, FlinkRunner uses binary encoding with support for either Confluent or AWS Glue schema registry.
For file sources and sinks, you can select either standard or parquet avro encoding.
Add the following dependencies if you need Avro and schema registry support:
"org.apache.flink" % "flink-avro" % <flink-version>,
"org.apache.flink" % "flink-avro-confluent-registry" % <flink-version>,
"io.confluent" % "kafka-avro-serializer" % <confluent-version>
"software.amazon.glue" % "schema-registry-flink-serde" % <aws-glue-serde-version>,
Note, kafka-avro-serializer requires you add an sbt resolver to your
project's build.sbt file to point at Confluent's maven repository:
resolvers += "Confluent Repository" at "https://packages.confluent.io/maven/"
FlinkRunner provides automatic support for serializing and deserializing in and out of kafka using either Confluent Avro schema registry or AWS Glue Avro schema registry.
To make use of this support, you need to do three things:
- Include the appropriate serde library (
kafka-avro-serializerfor confluent orschema-registry-flink-serdefor glue) library in your project (as described above). - Configure your kafka sources and sinks with a
schema.registryblock like so:sinks { kafka-sink { bootstrap.servers = "kafka:9092" topic = my-topic schema-registry { // for confluent url = "http://schema-registry:8082" // required // other settings are possible //cache.capacity = 500 // (1000 by default) //use.logical.type.converters = false // (true by default) //specific.avro.reader = false // (true by default) // for glue (one or both of region or endpoint is required) aws.region = "us-east-1" //aws.endpoint = "localhost:4566" //other configs in AWSSchemaRegistryConstants } } }
- Mixin the
EmbeddedAvroRecordandEmbeddedAvroRecordFactorytraits for your event types that need to decode/encode Confluent schema registry compatible avro records in Kafka. - If your output event type is an avro record, use the
AvroStreamJobbase class for your flink jobs instead ofStreamJob. Note, that bothAvroStreamJobandStreamJobcan read from avro source streams, butAvroStreamJobuses an avro sink. This is described in more detail below.
To enable file sinks to write in parquet format, add the following dependency to your build:
"org.apache.parquet" % "parquet-avro" % 1.14.2
Add use the format = parquet directive in your file sink configuration (more details
below).
The iceberg source and sink also support parquet output by default.
FlinkRunner uses scala-logging for logging
on top of slf4j. In your implementation, you must provide a logging backend compatible
with slf4j, such as logback:
libraryDependencies += "ch.qos.logback" % "logback-classic" % <logback-version>
If you want to use the complex event processing library, add this dependency:
"org.apache.flink" % "flink-cep" % <flink-version>
FlinkRunner helps you think about your datastream api flink jobs at a high level, so you
can focus on the event pipeline, not the plumbing. It is not as high-level as Flink SQL,
but when you need to write multiple related data stream API jobs, FlinkRunner helps you to
avoid repetitive boiler plate code, simplifies configuration and simplifies usage of many
common flink streaming patterns.
You have a set of related flink jobs that deal in a related set of data event
types. FlinkRunner helps you build one application to run those related jobs and
coordinate the types. It also simplifies setting up common sources and sinks, so you can
control them purely with configuration, not code. FlinkRunner supports a variety of
sources and sinks out of the box, including kafka, kinesis, jdbc
, elasticsearch 7+ (sink only), cassandra (sink only),
filesystems (including s3 using parquet as well as delimited and json text
files) andsockets. It also has many common operators to help you in writing your own
transformation logic. Finally, FlinkRunner
makes it easy to test your transformation logic with property-based testing.
First, you need to define an algebraic data
type (ADT) containing
the types that will be used in your flink jobs. Your top level type should inherit
the FlinkEvent trait. This requires your types to implement the following members:
$id: String- a unique id for the event$timestamp: Long- the event time (millis since unix epoch)$key: String- a key to group events
Additionally, a FlinkEvent has three additional members that you can optionally override
to enable/configure various features.
$bucketId: String- for bucketing files written to a file sink$dedupId: String- for use with FlinkRunner's deduplication filter$active:Boolean- for use with FlinkRunner's filterByControl source stream algorithm
sealed trait MyEventADT extends FlinkEvent
final case class MyEventA(a:Int, $id:String, $key:String, $timestamp:Long)
extends MyEventADT
final case class MyEventB(b:Int, $id:String, $key:String, $timestamp:Long)
extends MyEventADTNext, you should create your own runner subclass of the abstract FlinkRunner base class.
import io.epiphanous.flinkrunner.model.FlinkConfig
import io.epiphanous.flinkrunner.FlinkRunner
class MyRunner(config:FlinkConfig) extends FlinkRunner[MyEventADT](config) {
override def invoke(jobName:String):Unit = jobName matches {
case "MyJob1" => new MyJob1(this).run()
case "MyJob2" => new MyJob2(this).run()
case _ => throw new RuntimeException(s"Unknown job $jobName")
}
}Next, write some jobs! This is the fun part.
class MyJob1(runner:FlinkRunner[MyEventADT]) extends StreamJob[MyEventA](runner) {
override def transform:DataStream[MyEventA] =
singleSource[MyEventA]()
}
This job takes MyEventA type input events and passes them through unchanged to the sink.
While this sounds useless at first blush, and your transform method will usually do much
more interesting things, identity transforms like this can be useful to copy data from one
storage system to another.
You could something a little more exciting, say transform a stream of MyEventA
events into a stream of MyEventB events:
class MyJob2(runner:FlinkRunner[MyEventADT]) extends StreamJob[MyEventB](runner) {
override def transform:DataStream[MyEventB] =
singleSource[MyEventA]().map { a:MyEventA => MyEventB(b = a.a * 2) }
}Next, you need to configure your job. The following configuration defines a file source that reads csv files from one s3 bucket and a file sink that writes json files to a different s3 bucket.
jobs {
MyJob1 {
sources {
csv-file-source {
path = "s3://my-events-csv-files"
format = csv
}
}
sinks {
json-file-sink {
path = "s3://my-events-json-files"
format = json
}
}
}
}Note that this configuration would work for either MyJob1 or MyJob2.
Next, wire up your runner to a main method.
object Main {
def main(args:Array[String]) =
new MyRunner(new FlinkConfig(args))
}
Finally, assemble an uber jar with all your dependencies, deploy it to your flink cluster, and run a job:
flink run myRunner.jar MyJob1
FlinkRunner provides a StreamJob base class from which you can build and run your
flink jobs. If your output event types require avro support, you should instead use the
AvroStreamJob base class to build and run your flink job.
class StreamJob[
OUT <: ADT,
ADT <: FlinkEvent](runner:FlinkRunner[ADT])
A StreamJob must specify the output stream event type in its definition. That output
stream event type must be a subclass of your FlinkRunner algebraic data type (ADT). Your
job class will be passed an instance of your FlinkRunner type, from which you can access
runner.config:FlinkRunnerconfiguration (instance ofFlinkConfig).runner.env: Flink's stream execution environment (instance ofStreamExecutionEnvironment), for interfacing with the DataStream API.runner.tableEnv: Flink's stream table environment (instance ofStreamTableEnvironment), for interfacing with the Table API.runner.mockEdges: A boolean indicating if your runner instance will mock interaction with sources and sinks and redirect transformed output to your specifiedCheckResults.checkOutputEventsmethod.
Your StreamJob must provide a transform method that defines and transforms your
sources into an output event stream. StreamJob provides several factory methods to
create source streams from your configuration:
-
singleSource[IN <: ADT](name:String): produces a single input source stream, configured under the provided name, of typeDataStream[IN], whereINis an event type within your runner'sADT. The configurednameparameter defaults to the first configured source for convenience. -
connectedSource[IN1 <: ADT, IN2 <: ADT, KEY](source1:DataStream[IN1], source2:DataStream[IN2], key1:IN1=>KEY, key2:IN2=>KEY): ConnectedStreams[IN1,IN2]: connects the two provided source streams producing a single stream of typeConnectedStreams[IN1,IN2]. A connected stream combines the two streams. An event on the connected stream is either of typeIN1orIN2. The key functions are used for anykeyByoperations done on the connected stream. It is important to realize a connected stream is NOT a join between the two streams, and the keys are not used to perform a join. The use case for a connected stream is where the data on one stream dynamically impacts how you want to process the other stream. -
filterByControlSource[CONTROL <: ADT, DATA <: ADT, KEY](control:DataStream[CONTROL], data:DataStream[DATA], key1:CONTROL=>KEY, key2:DATA=>KEY): DataStream[DATA]: is a special instance of a connected source, where the first source is a control stream and the second source is a data stream. The control events indicate when the data event stream should be considered active, meaning any data events seen should be emitted. When the control type's$activemethod returns true, the data stream will be considered active, and any data events seen on the connected stream will be emitted to the output. Conversely, when the control type's$activemethod returnsfalse, the data stream will be considered inactive, and any data events seen on the connected stream will not be emitted to the output. -
broadcastConnectedSource[ IN <: ADT: TypeInformation, BC <: ADT: TypeInformation, KEY: TypeInformation]( keyedSource: DataStream[IN], broadcastSource: DataStream[BC], keyedSourceGetKeyFunc: IN => KEY): BroadcastConnectedStream[IN, BC]: another special connected source that implements Flink's [Broadcast State Pattern](https://nightlies.apache. org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/). ThisStreamJobmethod keys and connects a regular data stream, with a so-called broadcast data stream. A broadcast stream sends copies of all of its elements to all downstream tasks. So in this case, we key the source data function and effectively send a connected stream of the data and broadcast elements to each keyed task. A common use case broadcasts a stream of rule changes that impact how the data stream should be processed. TheBroadcastConnectedStream[IN,BC]should be processed with a special type ofCoProcessFunctioncalled aKeyedBroadcastProcessFunction[KEY, IN, BC, OUT], which produces your transformed output data stream of typeDataStream[OUT].
StreamJob also provides avro versions of all these source factory methods. If your
sources are Confluent schema-registry aware avro encoded Kafka streams, you should use the
avro aware versions of these factory methods. For instance, the singleAvroSource()
method can be used to produce such an input datatream. The signatures of these methods are
more complicated and rely on you to use FlinkRunner's EmbeddedAvroRecord and
EmbeddedAvroRecordFactory traits when implementing your event types.
singleAvroSource[
IN <: ADT with EmbeddedAvroRecord[INA],
INA <: GenericRecord](
name: String)(implicit
fromKV: EmbeddedAvroRecordInfo[INA] => IN): DataStream[IN]
Besides source factory methods, StreamJob also provides a method to easily perform
[windowed aggregations](https://nightlies.apache.
org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/).
def windowedAggregation[
E <: ADT: TypeInformation,
KEY: TypeInformation,
WINDOW <: Window: TypeInformation,
AGG <: Aggregate: TypeInformation,
QUANTITY <: Quantity[QUANTITY]: TypeInformation,
PWF_OUT <: ADT: TypeInformation](
source: KeyedStream[E, KEY],
initializer: WindowedAggregationInitializer[
E,
KEY,
WINDOW,
AGG,
QUANTITY,
PWF_OUT,
ADT
]): DataStream[PWF_OUT]
Finally, StreamJob also provides a run() method that builds and executes the flink job
graph defined by your transform method.
class AvroStreamJob[
OUT <: ADT with EmbeddedAvroRecord[A],
A<:GenericRecord,
ADT <: FlinkEvent](runner:FlinkRunner[ADT])
An AvroStreamJob is a specialized StreamJob class to support outputting to an avro
encoded sink (kafka or parquet-avro files).
trait EmbeddedAvroRecord[A <: GenericRecord {
def $recordKey: Option[String] = None
def $record: A
def $recordHeaders: Map[String, String] = Map.empty
def toKV: EmbeddedAvroRecordInfo[A] =
EmbeddedAvroRecordInfo($record, $recordKey, $recordHeaders)
}
trait EmbeddedAvroRecordFactory[
E <: FlinkEvent with EmbeddedAvroRecord[A],
A <: GenericRecord] {
implicit def fromKV(recordInfo: EmbeddedAvroRecordInfo[A]): E
implicit def avroParquetRecordFormat: StreamFormat[A] = ???
}
case class EmbeddedAvroRecordInfo[A <: GenericRecord](
record: A,
keyOpt: Option[String] = None,
headers: Map[String, String] = Map.empty)
FlinkRunner uses lightbend config for its
configuration, integrating environment variables and command line arguments to provide
easy, environment specific, 12-factor style configuration. FlinkRunner makes this
configuration accessible through a FlinkConfig object that you construct and pass to
your runner.
trait SourceConfig[ADT <: FlinkEvent]
trait SinkConfig[ADT <: FlinkEvent]
trait Aggregate {
def name:String
// the kind of measurement being aggregated
def dimension: String
// the preferred unit of measurements being aggregated
def unit: String
// the current aggregated value
def value: Double
// the current count of measurements included in this aggregate
def count: BigInt
// the timestamp of the most recent aggregated event
def aggregatedLastUpdated: Instant
// the timestamp when this aggregate was last updated
def lastUpdated: Instant
// other aggregations this aggregation depends on
def dependentAggregations: Map[String, Aggregate]
// configuration parameters
def params: Map[String, String]
// a method to update the aggregate with a new value
def update(value: Double, unit: String,
aggLU: Instant): Try[Aggregate]
}
Some useful utilities.