saksdirect / fs2-aws

fs2 utilities to interact with AWS

Version Matrix


Build Status Maven Central Coverage Status

fs2 Streaming utilities for interacting with AWS

Scope of the project

fs2-aws provides an fs2 interface to AWS services

The design goals are the same as fs2:

compositionality, expressiveness, resource safety, and speed


Streaming a file from S3

Creates a stream of Bytes; size of each part downlaoded is the chunkSize.

Example using IO for effects (any monad F <: Effect can be used):

readS3FileMultipart[IO]("testBucket", "testFile", 25)

Writing to a file in S3

A Pipe and Sink allow for writing a stream of Bytes to S3; size of each part uploaded is the chunkSize.

Example using IO for effects (any monad F <: Effect can be used):

Stream("test data")
  .uploadS3FileMultipart[IO]("testBucket", "testFile")


Streaming records from Kinesis with KCL

Example using IO for effects (any monad F <: ConcurrentEffect can be used):

val stream: Stream[IO, CommittableRecord] = readFromKinesisStream[IO]("appName", "streamName")

There are a number of other stream constructors available where you can provide more specific configuration for the KCL worker.


TODO: Implement better test consumer

For now, you can stubbed CommitableRecord and create a fs2.Stream to emit these records:

val record = new Record()
  .withApproximateArrivalTimestamp(new Date())

val testRecord = CommittableRecord(

Checkpointing records

Records must be checkpointed in Kinesis to keep track of which messages each consumer has received. Checkpointing a record in the KCL will automatically checkpoint all records upto that record. To checkpoint records, a Pipe and Sink are available. To help distinguish whether a record has been checkpointed or not, a CommittableRecord class exists to denote a record that hasn't been checkpointed, while the base Record class denotes a commited record.

readFromKinesisStream[IO]("appName", "streamName")

Publishing records to Kinesis with KPL

A Pipe and Sink allow for writing a stream of tuple2 (paritionKey, ByteBuffer) to a Kinesis stream.


  .map { d => ("partitionKey", ByteBuffer.wrap(d.getBytes))}

AWS credential chain and region can be configured by overriding the respective fields in the KinesisProducerClient parameter to writeToKinesis. Defaults to using the default AWS credentials chain and us-east-1 for region.

Kinesis Firehose

TODO: Stream get data, Stream send data



implicit val messageDecoder: Message => Either[Throwable, Quote] = { sqs_msg =>
      .sqsStream[IO, Quote](
        (config, callback) => SQSConsumerBuilder(config, callback))


//create stream for testing
def stream(deferedListener: Deferred[IO, MessageListener]) =
              .sqsStream[IO, Quote](deferedListener)
//create the program for testing the stream               
import io.circe.syntax._
val quote = Quote(...)
val program : IO[List[(Quote, MessageListener)]] = for {
            d <- Deferred[IO, MessageListener]
            r <- IO.racePair(stream(d), d.get).flatMap {
              case Right((streamFiber, listener)) =>
                //simulate SQS stream fan-in here
                listener.onMessage(new SQSTextMessage(Printer.noSpaces.pretty(quote.asJson)))
              case _ => IO(Nil)
          } yield r
//Assert results
val result = program
result should be(...)

TODO: Stream send SQS messages