Kafka CloudStack Events Reader and Evaluator Framework
The aim of the library is the convenient handling of Kafka messages. It provides the mechanisms to:
- Buffer the messages to vary a count of processing messages without changing a consumer properties
- Implement the logic of storing consumer offsets to any place
Add the following to your build.sbt
libraryDependencies += "com.bwsw" %% "kafka-reader" % "0.10.1"
The diagram below is a simple illustration of how the library's classes should be used.
Implement your own EventHandler and EventManager in the way as it is displayed on the diagram.
where:
* K
- type of ConsumerRecord key
* V
- type of ConsumerRecord value
* T
- type of data after handle a ConsumerRecord by the instance of EventHandler implementation
The example below shows how to print messages from Kafka to the console. The call to the method is performed in Future:
class SimpleEventHandler(messageQueue: MessageQueue[String,String], messageCount: Int)
extends EventHandler[String,String,Future[Unit]](messageQueue, messageCount) {
override def handle(flag: AtomicBoolean): List[OutputEnvelope[Future[Unit]]] = {
val inputEnvelopes = messageQueue.take(messageCount)
inputEnvelopes.map { x =>
OutputEnvelope[Future[Unit]](x.topic, x.partition, x.offset, Future(println(x.data)))
}
}
}
According to the diagram above the main class looks like this:
object EventManager {
def main(args: Array[String]): Unit = {
val dummyFlag = new AtomicBoolean(true)
val consumer = new Consumer[String,String](Consumer.Settings("localhost:9092", "group01", 3000))
val checkpointInfoProcessor = new CheckpointInfoProcessor[String,String,Future[Unit]](
TopicInfoList(List(TopicInfo(topic = "topic1"))),
consumer
)
val messageQueue = new MessageQueue[String,String](consumer)
val eventHandler = new SimpleEventHandler(messageQueue, countOfMessages = 1)
checkpointInfoProcessor.load()
val outputEnvelopes = eventHandler.handle(dummyFlag)
outputEnvelopes.data.foreach {
case Success(x) =>
case Failure(e) =>
prinln(s"something went wrong, exception was thrown: $e")
throw e
}
checkpointInfoProcessor.save(outputEnvelopes)
consumer.close()
}
}
Run tests: sbt test
Run tests: sbt it:test
Library has the same version as Apache Kafka library
This project is licensed under the Apache License - see the LICENSE file for details