rxscala-kafka
Library containing RxScala Observables to consume Kafka Topics
Status
Setup
Add to your build.sbt
following resolver with dependency:
resolvers += Resolver.bintrayRepo("autoscout24", "maven")
libraryDependencies += "com.autoscout24" %% "rxscala-kafka" % "(see version number above)",
Configuration
For detailed list of config options please check kafka-client documentation.
Instantiating an observable
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import com.autoscout.rxscala.kafka.consumer.KafkaTopicObservable
...
def createKafkaConsumer(config: Configuration): KafkaConsumer[String, String] = {
new KafkaConsumer[String, String](
config.kafkaConsumerProperties
+ (CommonClientConfigs.CLIENT_ID_CONFIG -> config.kafkaClientName)
+ (ConsumerConfig.GROUP_ID_CONFIG -> config.consumerGroupId)
)
}
val kafkaObservable = KafkaTopicObservable(config.listingsTopicName, () => createKafkaConsumer(config))
val notNullRecords = kafkaObservable.filter(_.value != null)