tradecloud / akka-kafka

Akka Kafka wrapper to publish and subscribe to Kafka topics

GitHub

Kafka Akka

Build Status Maven Central License

A wrapper around Akka's reactive kafka providing resilience and re-use of Akka defined serialization for Kafka messages.

Configuration

Add the dependency in the build.sbt, like:

libraryDependencies ++= Seq(
    "nl.tradecloud" %% "akka-kafka" % "0.65"
)

Configure in the application.conf file, like:


tradecloud.kafka {
  serviceName = "test"
  brokers = "localhost:9092"
  topicPrefix = ""
  groupPrefix = ""
}

As this library is a wrapper around Akka's reactive kafka, you can also use the configuration options of Reactive Kafka.

Usage

Subscribe

implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()

new KafkaSubscriber(
    group = "some_group_name",
    topics = Set("some_topic")
  ).atLeastOnce(
    Flow[String]
      .map { wrapper: KafkaMessage[String] =>
        // do something
        println(wrapper.msg + "-world")
        
        // return the offset
        msg.offset
      }
  )

Publish

// promise is completed when publish is added to Kafka
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()

val publisher = new KafkaPublisher()

publisher.publish("topic", msg)

Serialization

Serialization is handled using Akka Serialization, see: Akka Serialization