Provides FS2 Kafka Serializer
s and Deserializer
s that provide integration with Confluent Schema Registry for JSON messages with JSON Schemas.
Note: This library only works with Scala 3.3.x and above. For Scala 2.x, see here.
This functionality is backed by the following libraries:
Add the following to your build.sbt
resolvers ++= Seq("confluent" at "https://packages.confluent.io/maven")
libraryDependencies += "io.kaizen-solutions" %% "fs2-kafka-jsonschema" % "<latest-version>"
Define the datatype that you would like to send/receive over Kafka via the JSON + JSON Schema format. You do this by defining your datatype and providing a Pickler
instance for it.
The Pickler
instance comes from the Tapir library.
import sttp.tapir.Schema.annotations.*
import sttp.tapir.json.pickler.*
final case class Book(
@description("name of the book") name: String,
@description("international standard book number") isbn: Int
)
object Book:
given Pickler[Book] = Pickler.derived
Next, you can create a fs2 Kafka Serializer
and Deserializer
for this datatype and use it when building your FS2 Kafka producer/consumer.
import io.kaizensolutions.jsonschema.*
import cats.effect.*
import fs2.kafka.*
def bookSerializer[F[_]: Sync]: Resource[F, ValueSerializer[F, Book]] =
JsonSchemaSerializerSettings.default
.withSchemaRegistryUrl("http://localhost:8081")
.forValue[F, Book]
def bookDeserializer[F[_]: Sync]: Resource[F, ValueDeserializer[F, Book]] =
JsonSchemaDeserializerSettings.default
.withSchemaRegistryUrl("http://localhost:8081")
.forValue[F, Book]