zio / zio-kafka   2.11.0

Apache License 2.0 Website GitHub

A Kafka client for ZIO and ZIO Streams

Scala versions: 3.x 2.13

ZIO Kafka

ZIO Kafka is a Kafka client for ZIO. It provides a purely functional, streams-based interface to the Kafka client and integrates effortlessly with ZIO and zio-streams. Often zio-kafka programs have a higher throughput than programs that use the Java Kafka client directly (see section Performance below).

Production Ready CI Badge Sonatype Releases Sonatype Snapshots javadoc ZIO Kafka Scala Steward badge

Introduction

Apache Kafka is a distributed event streaming platform that acts as a distributed publish-subscribe messaging system. It enables us to build distributed streaming data pipelines and event-driven applications.

Kafka has a mature Java client for producing and consuming events, but it has a low-level API. Zio-kafka is a ZIO native client for Apache Kafka. It has a high-level streaming API on top of the Java client. So we can produce and consume events using the declarative concurrency model of zio-streams. In addition, zio-kafka supports an even higher level API where you only write the processing part and the rest is handled by zio-kafka.

Features

  • Exposes the Java Kafka consumer, producer and admin clients with a ZIO based interface.
  • Consuming:
    • 2 APIs: streaming and ZIO workflow based
    • supports custom deserialization
    • process each partition in parallel for highest throughput
    • allows batched processing for highest throughput
    • configurable per partition pre-fetching (with back-pressure)
    • the only async Kafka consumer without duplicates after a rebalance (as far as we know)
    • very configurable
    • automatic or manual starting offset
    • supports external commits
    • retries after authentication/authorization errors
    • exposes metrics
    • diagnostics API
  • Producing:
    • 2 APIs: streaming and ZIO workflow based
    • supports custom serialization
    • allows for batches for highest throughput
    • optionally await broker acknowledgements
    • optional retries after authentication/authorization errors
  • Admin API:
    • exposes all the admin client methods with a ZIO based interface
  • Proper errors when broker expects SSL (no OOM crashes)
  • Test kit with embedded kafka broker
  • Well documented
  • Community support via Discord
  • Commercial support via Ziverge

Getting started

See the zio-kafka tutorial for a grand tour of the different ways you can use zio-kafka.

In order to use this library, we need to add the following line in our build.sbt file:

libraryDependencies += "dev.zio" %% "zio-kafka"         % "2.11.0"
libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.11.0" % Test

Snapshots are available on Sonatype's snapshot repository https://oss.sonatype.org/content/repositories/snapshots. Browse here to find available versions.

For zio-kafka-testkit together with Scala 3, you also need to add the following to your build.sbt file:

excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"

Example

Let's write a simple Kafka producer and consumer using zio-kafka with zio-streams. Before everything, we need a running instance of Kafka. We can do that by saving the following docker-compose script in the docker-compose.yml file and run docker compose up:

services:
  broker:
    image: apache/kafka:3.9.0
    container_name: broker
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_BROKER_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

Now, we can run our ZIO Kafka Streaming application:

import zio._
import zio.kafka.consumer._
import zio.kafka.producer.{ Producer, ProducerSettings }
import zio.kafka.serde._
import zio.stream.ZStream

object ReadmeExample extends ZIOAppDefault {

  private val producerRun: ZIO[Any, Throwable, Unit] =
    ZIO.scoped {
      for {
        producer <-
          Producer.make(
            ProducerSettings(List("localhost:9092"))
          )
        _ <- ZStream
          .fromSchedule(Schedule.fixed(2.seconds))
          .mapZIO(_ => Random.nextIntBetween(0, Int.MaxValue))
          .mapZIO { random =>
            producer.produce[Any, Long, String](
              topic = "random-topic",
              key = (random % 4).toLong,
              value = random.toString,
              keySerializer = Serde.long,
              valueSerializer = Serde.string
            )
          }
          .runDrain
      } yield ()
    }

  private val consumerRun: ZIO[Any, Throwable, Unit] =
    ZIO.scoped {
      for {
        consumer <-
          Consumer.make(
            ConsumerSettings(List("localhost:9092"))
              .withGroupId("group")
          )
        _ <- consumer
          .plainStream(Subscription.topics("random"), Serde.long, Serde.string)
          .tap(r => Console.printLine(r.value))
          .map(_.offset)
          .aggregateAsync(Consumer.offsetBatches)
          .mapZIO(_.commit)
          .runDrain
      } yield ()
    }

  override def run: ZIO[Any, Throwable, Unit] =
    ZIO.raceFirst(producerRun, List(consumerRun))
}

Resources

Articles

Video

Example projects

  • Kafka BigQuery Express by Adevinta (November 2024) A production system to copy data from Kafka to BigQuery, safely and cost-effectively. (See also the video "Optimizing Data Transfer...".)
  • zio-kafka-showcase by Jorge Vásquez, Example project that demonstrates how to build Kafka based microservices with Scala and ZIO
  • zio-kafka-demo1 (December 2022), example consumer and producer using zio-kafka 2.0.5
  • zio-kafka-example-app by Ziverge (December 2020), example application using zio-kafka 0.8.0

Adopters

Here is a partial list of companies using zio-kafka in production.

Want to see your company here? Submit a PR!

Performance

Often, zio-kafka programs consume with a higher throughput than programs that use the java-kafka client directly. Read on for the details.

By default, zio-kafka programs process partitions in parallel. The default java-kafka client does not provide parallel processing. Of course, there is some overhead in buffering records and distributing them to the fibers that need them. On 2024-11-23, we estimated that zio-kafka consumes faster than the java-kafka client when processing takes more than ~1.2ms per 1000 records. The precise time depends on many factors. Please see this article for more details.

If you do not care for the convenient ZStream based API that zio-kafka brings, and latency is of absolute importance, using the java based Kafka client directly is still the better choice.

Developers

Documentation

Learn more on the ZIO Kafka homepage!

Contributing

For the general guidelines, see ZIO contributor's guide.

Code of Conduct

See the Code of Conduct

Support

Come chat with us on Badge-Discord.

Credits

This library is heavily inspired and made possible by the research and implementation done in Alpakka Kafka, a library maintained by the Akka team and originally written as Reactive Kafka by SoftwareMill.

License

License

Copyright 2021-2025 Itamar Ravid and the zio-kafka contributors.