Akka extension for Kafka consumer. Features include
- Automatically set up actors that poll message from Kafka
- All blocking IO is hidden from user's perspective
- Acknowledgement based back pressure implementation
- Minimal code/configuration
resolvers += "Sonatype OSS" at "https://oss.sonatype.org/content/groups/public"
libraryDependencies += "org.lsun" %% "kafka-akka-extension" % "0.0.1"
kafka {
producer {
metadata-broker-list = "host1:port1,host2:port2,host3:port3",
}
}
IO(Kafka) ! Message("topic1", key, message)
kafka {
consumer {
group-id = "consumer-group-name"
zookeeper-connect = "host1:port1,host2:port2,host3:port3"
maximum-backlog = 5
}
}
maximum-backlog is the maximum number of messages in backlog, set it to 0 to disable back pressure
val settings = KafkaConsumer.ConsumerSettings
.withTopc("topic1")
.withConsumerActor("topic", actor1)
IO(KafkaConsumer) ! StartConsumer(settings)
For the receiver actors, it receives
Received(key, message, topic, partition, offset)
for incoming messages. It is responsible to reply
Acknowledge(Received(key, message, topic, partition, offset))
no matter if the message is processed successful or not. Otherwise the consumer will be suspended when it reach the configured maximum messages in backlog.