maven scalaci


tele is a Kinesis client providing fs2 interfaces to produce and consume from Kinesis streams.

object Producer {

  def putRecord[F[_]: Async, A: SchemaEncoder](
      client: KinesisAsyncClient,
      stream: String,
      opt: Options[A]
    ): fs2.Pipe[F, A, RichPutRecordResponse[A]]

  def putRecords[F[_]: Async, A](
      client: KinesisAsyncClient,
      stream: String
    ): fs2.Pipe[F, Batcher.Batch[A], RichPutRecordsResponse[A]]


trait Consumer[F[_], A] {
  def subscribe: Resource[F, fs2.Stream[F, A]]


libraryDependencies +=  "io.github.benoitlouy" %% "tele" % "0.3.0-SNAPSHOT"


For the following example we will be publishing and consuming User records represented as

final case class User(name: String, age: Int)

We also need to provide a way to serialize this type. This is accomplished by declaring a implicit instance of Schema[User]. In this example we will be using circe to serialize to JSON.

import cats.syntax.all._
import, io.circe.parser, io.circe.syntax._
import tele.Schema

implicit val userSchema: Schema[User] = new Schema[User] {
  override def encode(a: User): Array[Byte] = a.asJson.noSpaces.getBytes()

  override def decode(bytes: Array[Byte]): Either[tele.DecodingFailure, User] =
    parser.decode[User](new String(bytes))
      .leftMap(e => tele.DecodingFailure("failed decoding User", e))



We create a stream containing a few users:

val users = fs2.Stream(User("Alice", 35), User("Bob", 28), User("Carol", 24))
// users: fs2.Stream[Nothing, User] = Stream(..)

tele provides 2 ways of publishing records to kinesis:

  • putRecord which publishes records one by one.
  • putRecords which publishes records by batch.


import cats.effect._
import tele.Producer

val result: Vector[Producer.RichPutRecordResponse[User]] =
    .through(Producer.putRecord(kinesisClient, "user-stream", Producer.Options()))


Before being able to call putRecords, records must be batched. For this purpose tele provides a Batcher which groups records in Batcher.Batch[A] where A is the record type (User in this example).

Because Kinesis limits the size of individual records, the batcher will detect large records and will also produce instances of Batcher.TooLarge[A].

Batcher.Batch[A] and Batcher.TooLarge[A] form the ADT Batcher.Result[A] and the batcher is an fs2.Pipe with the signature fs2.Pipe[F, A, Batcher.Result[A]]

import cats.effect._
import tele.{ Batcher, Producer }

val result: Vector[Producer.RichPutRecordsResponse[User]] =
    .collect { case batch: Batcher.Batch[User] => batch }
    .through(Producer.putRecords(kinesisClient, "user-stream"))


import cats.effect._
import tele.{ Consumer, Record }

val consumer = Consumer.make[IO](
  capacity = 500,

val result: Vector[Record.WithData[IO, User]] = consumer.subscribe.use { users =>

Record.WithData[F[_], A] is an ADT which indicates if the data received could be deserialized to an instance of A. The members of this ADT are:

  • CommitableRecord.WithValue[F[_], A] when deserialization was successful
  • CommitableRecord.WithError[F[_]] when deserialization failed