bwsw / kafka-reader

Kafka CloudStack Events Reader and Parallel Evaluator Framework



Kafka CloudStack Events Reader and Evaluator Framework

The aim of the library is the convenient handling of Kafka messages. It provides the mechanisms to:

  1. Buffer the messages to vary a count of processing messages without changing a consumer properties
  2. Implement the logic of storing consumer offsets to any place

Install with SBT

Add the following to your build.sbt

libraryDependencies += "com.bwsw" %% "kafka-reader" % "0.10.1"

Getting Started

The diagram below is a simple illustration of how the library's classes should be used.
Implement your own EventHandler and EventManager in the way as it is displayed on the diagram.
Sequence where:
* K - type of ConsumerRecord key
* V - type of ConsumerRecord value
* T - type of data after handle a ConsumerRecord by the instance of EventHandler implementation

Example Usage

The example below shows how to print messages from Kafka to the console. The call to the method is performed in Future:

class SimpleEventHandler(messageQueue: MessageQueue[String,String], messageCount: Int)
  extends EventHandler[String,String,Future[Unit]](messageQueue, messageCount) {

  override def handle(flag: AtomicBoolean): List[OutputEnvelope[Future[Unit]]] = {
    val inputEnvelopes = messageQueue.take(messageCount) { x =>
      OutputEnvelope[Future[Unit]](x.topic, x.partition, x.offset, Future(println(


According to the diagram above the main class looks like this:

object EventManager {
    def main(args: Array[String]): Unit = {
          val dummyFlag = new AtomicBoolean(true)
          val consumer = new Consumer[String,String](Consumer.Settings("localhost:9092", "group01", 3000))
          val checkpointInfoProcessor = new CheckpointInfoProcessor[String,String,Future[Unit]](
            TopicInfoList(List(TopicInfo(topic = "topic1"))),
          val messageQueue = new MessageQueue[String,String](consumer)
          val eventHandler = new SimpleEventHandler(messageQueue, countOfMessages = 1)
          val outputEnvelopes = eventHandler.handle(dummyFlag)
            case Success(x) => 
            case Failure(e) =>
              prinln(s"something went wrong, exception was thrown: $e")
              throw e



Unit tests

Run tests: sbt test

Integration tests

  1. Add local environment variables:
    • KAFKA_HOST - host of Kafka, for example - "localhost"
    • KAFKA_PORT - port of Kafka, for example - "9092"
  2. Run Kafka server in docker container:
    docker run -d --rm -p 2181:2181 -p $KAFKA_PORT:$KAFKA_PORT \ 
  1. Run tests: sbt it:test


Library has the same version as Apache Kafka library


This project is licensed under the Apache License - see the LICENSE file for details