tradecloud / kafka-akka-extension

Akka extension to publish and subscribe to Kafka topics

Version Matrix

Kafka Akka

Build Status Maven Central License

A wrapper around Akka's reactive kafka providing resilience and re-use of Akka defined serialization for Kafka messages.


Add the dependency in the build.sbt, like:

libraryDependencies ++= Seq(
    "nl.tradecloud" %% "kafka-akka-extension" % "0.55"

Configure in the application.conf file, like:

tradecloud.kafka {
  brokers = "localhost:9092"
  topicPrefix = ""
  groupPrefix = ""

As this library is a wrapper around Akka's reactive kafka, you can also use the configuration options of Reactive Kafka.



implicit val materializer: Materializer = ActorMaterializer()

new KafkaSubscriber(
    serviceName = "my_example_service",
    group = "some_group_name",
    topics = Set("some_topic"),
    minBackoff = 15.seconds,
    maxBackoff = 3.minutes,
    system = actorSystem
      .map { wrapper: KafkaMessage[String] =>
        // do something
        println(wrapper.msg + "-world")
        // return the offset


// promise is completed when publish is added to Kafka
implicit val materializer: Materializer = ActorMaterializer()

val publisher = new KafkaPublisher(actorSystem)

publisher.publish("topic", msg)


Serialization is handled using the Akka Remoting component, see: Akka Remoting Serialization