autoscout24 / rxscala-kafka   1.1.2

GitHub

Library abstracting away Kafka Consumers as Observables

Scala versions: 2.11

rxscala-kafka

Library containing RxScala Observables to consume Kafka Topics

Status

Build Status

Download

Setup

Add to your build.sbt following resolver with dependency:

resolvers += Resolver.bintrayRepo("autoscout24", "maven")

libraryDependencies += "com.autoscout24" %% "rxscala-kafka" % "(see version number above)",

Configuration

For detailed list of config options please check kafka-client documentation.

Instantiating an observable

import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import com.autoscout.rxscala.kafka.consumer.KafkaTopicObservable

...

def createKafkaConsumer(config: Configuration): KafkaConsumer[String, String] = {
    new KafkaConsumer[String, String](
      config.kafkaConsumerProperties
        + (CommonClientConfigs.CLIENT_ID_CONFIG -> config.kafkaClientName)
        + (ConsumerConfig.GROUP_ID_CONFIG -> config.consumerGroupId)
    )
  }

val kafkaObservable = KafkaTopicObservable(config.listingsTopicName, () => createKafkaConsumer(config))

val notNullRecords = kafkaObservable.filter(_.value != null)