Travis Codecov Version

FS2 Kafka

Tiny library providing an FS2 wrapper around the official Kafka Java client.
The API is inspired by Alpakka Kafka, and migration should be relatively easy.

This is a new project under active development. Feedback and contributions are welcome.

Getting Started

To get started with sbt, simply add the following line to your build.sbt file.

libraryDependencies += "com.ovoenergy" %% "fs2-kafka" % "0.17.1"

The library is published for Scala 2.11 and 2.12.

Backwards binary compatibility for the library is guaranteed between patch versions.
For example, 0.17.x is backwards binary compatible with 0.17.y for any x > y.

Usage

Start with import fs2.kafka._ and use consumerStream and producerStream to create a consumer and producer, by providing a ConsumerSettings and ProducerSettings, respectively. The consumer is similar to committableSource in Alpakka Kafka, wrapping records in CommittableMessage. The producer accepts records wrapped in ProducerMessage, allowing offsets, and other elements, as passthrough values.

import cats.Id
import cats.data.NonEmptyList
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.functor._
import fs2.kafka._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object Main extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val consumerSettings = (executionContext: ExecutionContext) =>
      ConsumerSettings(
        keyDeserializer = new StringDeserializer,
        valueDeserializer = new StringDeserializer,
        executionContext = executionContext
      )
      .withAutoOffsetReset(AutoOffsetReset.Earliest)
      .withBootstrapServers("localhost")
      .withGroupId("group")

    val producerSettings =
      ProducerSettings(
        keySerializer = new StringSerializer,
        valueSerializer = new StringSerializer,
      )
      .withBootstrapServers("localhost")

    val topics =
      NonEmptyList.one("topic")

    def processRecord(record: ConsumerRecord[String, String]): IO[(String, String)] =
      IO.pure(record.key -> record.value)

    val stream =
      for {
        executionContext <- consumerExecutionContextStream[IO]
        consumer <- consumerStream[IO].using(consumerSettings(executionContext))
        producer <- producerStream[IO].using(producerSettings)
        _ <- consumer.subscribe(topics)
        _ <- consumer.stream
          .mapAsync(25)(message =>
            processRecord(message.record)
              .map {
                case (key, value) =>
                  val record = new ProducerRecord("topic", key, value)
                  ProducerMessage.single[Id].of(record, message.committableOffset)
              })
            .evalMap(producer.produceBatched)
            .map(_.map(_.passthrough))
            .to(commitBatchWithinF(500, 15.seconds))
      } yield ()

    stream.compile.drain.as(ExitCode.Success)
  }
}