Important
This project is a maintenance fork of https://github.com/StreetContxt/kcl-akka-stream, for the usage of projects at the Guardian. The library has been migrated from akka-stream to pekko-stream, and will be kept up to date with the latest dependency versions. Please note that beyond this and critical bugs, no feature work is currently planned, and patches are unlikely to be accepted. Consumers who require new functionality are encouraged to create their own forks to suit their own requirements.
Pekko Streaming Source backed by Kinesis Client Library (KCL 2.x).
This library combines the convenience of Pekko Streams with KCL 2.x checkpoint management, failover, load-balancing, and re-sharding capabilities.
This library is thoroughly tested and currently used in production.
libraryDependencies += "com.gu" %% "kcl-pekko-stream" % "0.1.0"
Here are two simple examples on how to initialize the Kinesis consumer and listen for string messages.
The first example shows how to process Kinesis records in at-least-once fashion:
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.Sink
import com.gu.kinesis.{ConsumerConfig, KinesisSource}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random
object Main {
def main(args: Array[String]): Unit = {
val consumerConfig = ConsumerConfig("myStream", "atLeastOnceApp")
case class KeyMessage(key: String, message: String, markProcessed: () => Unit)
val atLeastOnceSource = KinesisSource(consumerConfig)
.map { kinesisRecord =>
KeyMessage(kinesisRecord.partitionKey, kinesisRecord.data.utf8String, kinesisRecord.markProcessed)
}
// Records may be processed out of order without affecting checkpointing.
.grouped(10)
.map(batch => Random.shuffle(batch))
.mapConcat(identity)
.map { message =>
// After a record is marked as processed, it is eligible to be checkpointed in DynamoDb.
message.markProcessed()
message
}
implicit val system = ActorSystem("Main")
atLeastOnceSource.runWith(Sink.foreach(println))
Thread.sleep(10.seconds.toMillis)
Await.result(system.terminate(), Duration.Inf)
}
}
The second examples shows how to implement no-guarantees processing:
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.Sink
import com.gu.kinesis.{ConsumerConfig, KinesisSource}
import scala.concurrent.Await
import scala.concurrent.duration._
object Main {
def main(args: Array[String]): Unit = {
val consumerConfig = ConsumerConfig("myStream", "noGuaranteesApp")
case class KeyMessage(key: String, message: String)
val noGuaranteesSource = KinesisSource(consumerConfig)
.map { kinesisRecord =>
/* Every record must be marked as processed to allow stream state to be checkpointed in
* DynamoDb. Failure to mark at least one record as processed will cause the application
* to run out of memory. */
kinesisRecord.markProcessed()
kinesisRecord
}
.map { kinesisRecord =>
KeyMessage(kinesisRecord.partitionKey, kinesisRecord.data.utf8String)
}
implicit val system = ActorSystem("Main")
noGuaranteesSource.runWith(Sink.foreach(println))
Thread.sleep(10.seconds.toMillis)
Await.result(system.terminate(), Duration.Inf)
}
}
Notice that each Kinesis record must be eventually marked as processed in both at-least-once and no-guarantees scenarios. This is due to how Kinesis checkpointing is implemented.
A shard in a Kinesis stream is an ordered sequence of records. The shard is checkpointed by storing an offset of the last processed record. However, if a record is not processed (for example, because of an exception), then no further records after it can be checkpointed.
KinesisSource keeps track of all the uncheckpointed records and their ordering. This means you can process
records out of order in an asynchronous fashion. Each record must be eventually marked as processed by
calling markProcessed()
, or the steam must be terminated with an exception. If the stream continues
after failing to process a record, and not marking it as processed, then no further records can be checkpointed,
eventually causing the system to run out of memory.
The Kinesis Consumer ConsumerConfig
can be configured via HOCON configuration, which is common for Pekko projects
com.gu.kinesis {
consumer {
application-name = "test-app" # name of the application (consumer group)
stream-name = "test-stream" # name of the stream to connect to
position {
initial = "latest" # (latest, trim-horizon, at-timestamp) Defaults to latest.
time = "" # Only required if position is at-timestamp. Supports a valid Java Date parseable datetime string
}
# Note: Configurations below need to be in this location (com.gu.kinesis.consumer) to be found
# Optional stats reporting class that implements com.gu.kinesis.ConsumerStats
stats-class-name = "com.gu.kinesis.NoopConsumerStats"
# Optional checkpoint configuration
shard-checkpoint-config {
checkpoint-period = 60 seconds
checkpoint-after-processing-nr-of-records = 10000
max-wait-for-completion-on-stream-shutdown = 5 seconds
}
}
}
Then configure the consumer using the convenience method ConsumerConfig.fromConfig
.
ConsumerConfig.fromConfig(system.settings.config.getConfig("com.gu.kinesis.consumer"))
The ConsumerConfig
class also has methods for accepting raw AWS SDK clients which can be configured.
If you require very custom configuration, this option is available.
KCL license is not compatible with open source licenses! See this discussion for more details.
As such, the licensing terms of this library is Apache 2 license PLUS whatever restrictions are imposed by the KCL license.
Kinesis consumer does not guarantee mutually exclusive processing of shards during failover or load-balancing. See Kinesis Troubleshooting Guide for more details.
Kinesis producer library does not provide message ordering guarantees at a reasonable throughput, see this ticket for more details.
To run integration tests:
- Setup local AWS credentials (for example, via
~/.aws/credentials
file) - Set
KINESIS_TEST_REGION
environmental variable - Run
sbt it:test
- Cancelled tests will leave temporary Kinesis streams and DynamoDb tables prefixed with
deleteMe_