joan38 / kafka-streams-circe   0.6

Apache License 2.0 GitHub

Generic Serdes with Circe for Kafka Streams

Scala versions: 2.13 2.12

Kafka Streams Circe

Latest version

Generic Serdes with Circe for Kafka Streams

Installation

Mill:

ivy"com.goyeau::kafka-streams-circe:<latest version>"

or

SBT:

"com.goyeau" %% "kafka-streams-circe" % "<latest version>"

Example

import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import com.goyeau.kafka.streams.circe.CirceSerdes._
import io.circe.generic.auto._

case class Person(firstname: String, lastname: String, age: Int)

object Streams extends App {
  println("Starting streams")

  val streamsBuilder = new StreamsBuilder
  val testStream = streamsBuilder.stream[String, Person]("some-topic")
}

If you need to customize the json serialization, a custom implicit instance of Printer can be provided in scope.

For example, in the code below a custom printer is used to omit null values:

import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import com.goyeau.kafka.streams.circe.CirceSerdes._
import io.circe.generic.auto._

case class Person(firstname: String, lastname: String, age: Int)

object Streams extends App {
  implicit val printer = Printer.noSpaces.copy(dropNullValues = true)

  val streamsBuilder = new StreamsBuilder
  val testStream = streamsBuilder.stream[String, Person]("some-topic")
}