Scala wrapper around AWS Kinesis Client Library.
A fork and new home of the now unmaintained Gilt Foundation Classes (com.gilt.gfc
), now called the GFC Collective, maintained by some of the original authors.
The latest version is 1.0.0, released on 21/Jan/2020 and cross-built against Scala 2.12.x and 2.13.x.
If you're using SBT, add the following line to your build file:
libraryDependencies += "org.gfccollective" %% "gfc-aws-kinesis" % "1.0.0"
SBT Akka stream (2.5.x) dependency:
libraryDependencies += "org.gfccollective" %% "gfc-aws-kinesis-akka" % "1.0.0"
For Maven and other build tools, you can visit search.maven.org. (This search will also list other available libraries from the GFC Collective.)
Consume events:
implicit object StringRecordReader extends KinesisRecordReader[String]{
override def apply(r: Record) : String = new String(r.getData.array(), "UTF-8")
}
val config = KCLConfiguration("consumer-name", "kinesis-stream-name")
KCLWorkerRunner(config).runAsyncSingleRecordProcessor[String](1 minute) { a: String =>
// .. do something with A
Future.successful(())
}
Publish events:
implicit object StringRecordWriter extends KinesisRecordWriter[String] {
override def toKinesisRecord(a: String) : KinesisRecord = {
KinesisRecord("partition-key", a.getBytes("UTF-8"))
}
}
val publisher = KinesisPublisher()
val messages = Seq("Hello World!", "foo bar", "baz bam")
val result: Future[KinesisPublisherBatchResult] = publisher.publishBatch("kinesis-stream-name", messages)
Create the adapter client
val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
new AmazonDynamoDBStreamsAdapterClient()
Pass the adapter client in the configuration
val streamSource = {
val streamConfig = KinesisStreamConsumerConfig[Option[A]](
applicationName,
config.stream,
regionName = Some(config.region),
checkPointInterval = config.checkpointInterval,
initialPositionInStream = config.streamPosition,
dynamoDBKinesisAdapterClient = streamAdapterClient
)
KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)
}
Pass an implicit kinesis record reader suitable for dynamodb events
implicit val kinesisRecordReader
: KinesisRecordReader[Option[A]] =
new KinesisRecordReader[Option[A]] {
override def apply(record: Record): Option[A] = {
record match {
case recordAdapter: RecordAdapter =>
val dynamoRecord: DynamoRecord =
recordAdapter.getInternalObject
dynamoRecord.getEventName match {
case "INSERT" =>
ScanamoFree
.read[A](
dynamoRecord.getDynamodb.getNewImage)
.toOption
case _ => None
}
case _ => None
}
}
}
Consume e.g. using a sink
val targetSink = Sink.actorRefWithAck(target, startMsg, ackMsg, Done)
streamSource
.filter(!_.isEmpty)
.map(_.get)
.log(applicationName)(log)
.runWith(targetSink)
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0