Comms platform Kafka Avro serialisation
This contains various utility functions to generate Kakfa serialisers, deserialisers, producers and consumers for case classes with Avro schemas. It consists of three modules:
- comms-kafka-serialisation
- comms-kafka-cakesolutions-helpers
- comms-kafka-akka-helpers
How to use
Add a resolver in sbt for the Maven repo on Bintray:
resolvers := Resolver.withDefaultResolvers(
Seq(
Resolver.bintrayRepo("ovotech", "maven"),
"confluent-release" at "http://packages.confluent.io/maven/"
)
)
Then add a dependency on the library:
libraryDependencies += Seq("com.ovoenergy" %% "comms-kafka-serialisation" % "version",
"com.ovoenergy" %% "comms-kafka-helpers" % "version",
"com.ovoenergy" %% "comms-kafka-test-helpers" % "version"
)
See the Bintray badge above for the latest version.
Serialisation
Then in your code, for vanilla avro json (de)serialisers with no dependency on a schema registry:
import com.ovoenergy.comms.serialisation._
val deserializer = Serialisation.avroDeserializer[MyLovelyKafkaEvent]
val result: Option[MyLovelyKafkaEvent] = deserializer.deserialize("my-topic", messageBytes)
Or for binary avro (de)serialisers with schema registry support:
import com.ovoenergy.comms.serialisation._
import com.ovoenergy.kafka.serialization.avro.SchemaRegistryClientSettings
val schemaRegistrySettings = SchemaRegistryClientSettings("schemaRegistryEndpoint:666", Authentication.None, 10, 5)
val deserializerMaybe: Either[Retry.Failed, Deserializer[Option[T]]]= Serialisation.avroBinarySchemaRegistryDeserializer[MyLovelyKafkaEvent](schemaRegistrySettings, false, "my-topic")
// For the sake of a simple example ;)
val deserializer: Deserializer[Option[T]] = deserializerMaybe.right.get
val result: Option[MyLovelyKafkaEvent] = deserializer.deserialize("my-topic", messageBytes)
Note that this functionality is a thin wrapper around the kafka-serialization library, more information on how the schema registry works on its readme. The only difference in functionality is that this wrapper will register the schema with the schema registry immediately on startup when the de(serialiser) is created rather than lazily, and deserialised values will be optional, returning Optional.None when deserialisation fails. Additionally, if the call to register the schema fails on creation of a serialiser or deserialiser this will return a Retry.Failed
Helpers
This is a collection of classes for dealing with the kafka topics. It enumerates all of the topics and events on offer and allows users to make type-safe producers and consumers for aiven.
import com.ovoenergy.comms.helpers.{Kafka, Topic}
import com.ovoenergy.comms.serialisation.Codecs._
val consumer: Either[Retry.Failed, KafkaConsumer[String, Option[TriggeredV3]]] = Kafka.aiven.triggered.v3.consumer
val producer: Either[Retry.Failed, KafkaProducer[String, TriggeredV3]] = Kafka.aiven.triggered.v3.producer
Test Helpers
Currently this consists of a utility to iterate over available topics
import com.ovoenergy.comms.helpers.{Kafka, Topic}
import ArbGenerator._
import TopicListTraverser._
import com.ovoenergy.comms.serialisation.Codecs._
import shapeless._
import org.scalacheck.Shapeless._
val visitor = new TopicListVisitor {
override def apply[E: SchemaFor : Arbitrary : ToRecord : FromRecord : ClassTag](topic: Topic[E]): Unit = {
println(topic.name)
}
}
TopicListTraverser(Kafka.aiven.allTopics, visitor)
To release a new version
You will need to be a member of the ovotech
organisation on Bintray.
$ sbt release
will run the tests, bump the version, build all the artifacts and publish them to Bintray.
The first time you run it, you will need to configure sbt with your Bintray credentials beforehand: $ sbt bintrayChangeCredentials