gilt / gfc-aws-kinesis

Scala wrapper around AWS Kinesis Client Library

Github

gfc-aws-kinesis Join the chat at https://gitter.im/gilt/gfc

Scala wrapper around AWS Kinesis Client Library. Part of the Gilt Foundation Classes.

Getting gfc-aws-kinesis

The latest version is 0.14.2, which is cross-built against Scala 2.11.x and 2.12.x.

SBT dependency:

libraryDependencies += "com.gilt" %% "gfc-aws-kinesis" % "0.14.2"

SBT Akka stream dependency:

libraryDependencies += "com.gilt" %% "gfc-aws-kinesis-akka" % "0.14.2"

Basic usage

Consume events:

  implicit object StringRecordReader extends KinesisRecordReader[String]{
    override def apply(r: Record) : String = new String(r.data.array(), "UTF-8")
  }

  val config = KCLConfiguration("consumer-name", "kinesis-stream-name")

  KCLWorkerRunner(config).runAsyncSingleRecordProcessor[A](1 minute) { a: String =>
     // .. do something with A
  }

Publish events:

  implicit object StringRecordWriter extends KinesisRecordWriter[String] {
    override def toKinesisRecord(a: String) : KinesisRecord = {
      KinesisRecord("partition-key", a.getBytes("UTF-8"))
    }
  }

  val publisher = KinesisPublisher()

  val messages = Seq("Hello World!", "foo bar", "baz bam")

  val result: Future[KinesisPublisherBatchResult] = publisher.publishBatch("kinesis-stream-name", messages)

DynamoDB streaming

Create the adapter client

val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
    new AmazonDynamoDBStreamsAdapterClient()

Pass the adapter client in the configuration

val streamSource = {
    val streamConfig = KinesisStreamConsumerConfig[Option[A]](
      applicationName,
      config.stream,
      regionName = Some(config.region),
      checkPointInterval = config.checkpointInterval,
      initialPositionInStream = config.streamPosition,
      dynamoDBKinesisAdapterClient = streamAdapterClient
    )
    KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)
  }

Pass an implicit kinesis record reader suitable for dynamodb events

implicit val kinesisRecordReader
      : KinesisRecordReader[Option[A]] =
      new KinesisRecordReader[Option[A]] {
        override def apply(record: Record): Option[A] = {
          record match {
            case recordAdapter: RecordAdapter =>
              val dynamoRecord: DynamoRecord =
                recordAdapter.getInternalObject
              dynamoRecord.getEventName match {
                case "INSERT" =>
                  ScanamoFree
                    .read[A](
                      dynamoRecord.getDynamodb.getNewImage)
                    .toOption
                case _ => None
              }
            case _ => None
          }
        }
      }

Consume e.g. using a sink

val targetSink = Sink.actorRefWithAck(target, startMsg, ackMsg, Done)

streamSource
  .filter(!_.isEmpty)
  .map(_.get)
  .log(applicationName)(log)
  .runWith(targetSink)