gfc-collective / gfc-aws-kinesis

Scala wrapper around AWS Kinesis Client Library

Version Matrix

gfc-aws-kinesis Maven Central Join the chat at https://gitter.im/gilt/gfc

Scala wrapper around AWS Kinesis Client Library. A fork and new home of the former Gilt Foundation Classes (com.gilt.gfc), now called the GFC Collective, maintained by some of the original authors.

Getting gfc-aws-kinesis

The latest version is 1.0.0, which is cross-built against Scala 2.12.x and 2.13.x.

If you're using SBT, add the following line to your build file:

libraryDependencies += "org.gfccollective" %% "gfc-aws-kinesis" % "1.0.0"

SBT Akka stream (2.5.x) dependency:

libraryDependencies += "org.gfccollective" %% "gfc-aws-kinesis-akka" % "1.0.0"

For Maven and other build tools, you can visit search.maven.org. (This search will also list other available libraries from the GFC Collective.)

Basic usage

Consume events:

  implicit object StringRecordReader extends KinesisRecordReader[String]{
    override def apply(r: Record) : String = new String(r.getData.array(), "UTF-8")
  }

  val config = KCLConfiguration("consumer-name", "kinesis-stream-name")

  KCLWorkerRunner(config).runAsyncSingleRecordProcessor[String](1 minute) { a: String =>
     // .. do something with A
     Future.successful(())
  }

Publish events:

  implicit object StringRecordWriter extends KinesisRecordWriter[String] {
    override def toKinesisRecord(a: String) : KinesisRecord = {
      KinesisRecord("partition-key", a.getBytes("UTF-8"))
    }
  }

  val publisher = KinesisPublisher()

  val messages = Seq("Hello World!", "foo bar", "baz bam")

  val result: Future[KinesisPublisherBatchResult] = publisher.publishBatch("kinesis-stream-name", messages)

DynamoDB streaming

Create the adapter client

val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
    new AmazonDynamoDBStreamsAdapterClient()

Pass the adapter client in the configuration

val streamSource = {
    val streamConfig = KinesisStreamConsumerConfig[Option[A]](
      applicationName,
      config.stream,
      regionName = Some(config.region),
      checkPointInterval = config.checkpointInterval,
      initialPositionInStream = config.streamPosition,
      dynamoDBKinesisAdapterClient = streamAdapterClient
    )
    KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)
  }

Pass an implicit kinesis record reader suitable for dynamodb events

implicit val kinesisRecordReader
      : KinesisRecordReader[Option[A]] =
      new KinesisRecordReader[Option[A]] {
        override def apply(record: Record): Option[A] = {
          record match {
            case recordAdapter: RecordAdapter =>
              val dynamoRecord: DynamoRecord =
                recordAdapter.getInternalObject
              dynamoRecord.getEventName match {
                case "INSERT" =>
                  ScanamoFree
                    .read[A](
                      dynamoRecord.getDynamodb.getNewImage)
                    .toOption
                case _ => None
              }
            case _ => None
          }
        }
      }

Consume e.g. using a sink

val targetSink = Sink.actorRefWithAck(target, startMsg, ackMsg, Done)

streamSource
  .filter(!_.isEmpty)
  .map(_.get)
  .log(applicationName)(log)
  .runWith(targetSink)

License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0