A wrapped producer and consumer for Kafka
<dependency>
<groupId>com.github.thanhtien522</groupId>
<artifactId>scala-kafka-client_2.11</artifactId>
<version>0.1.0</version>
</dependency>
libraryDependencies += "com.github.thanhtien522" %% "scala-kafka-client" % "0.1.0"
val producer = new ProducerBuilder[String, String]()
.setBootstrapServers("<your bootstrap servers>")
.setSerializer(new StringSerializer, new StringSerializer)
.build()
producer.send("topic", "Key", "Value")
Support config functions:
- setBootstrapServers: Configure kafka bootstrap servers
- setSerialize: Configure data serializer
- setConfig(key, value): Configure with key-value pair
- withConfig( typesafe config): Append configuration from TypeSafe Config object
val consumer = new ConsumerBuilder[String, String]()
.setGroupId("group_id")
.setClientId("client_id")
.setBootstrapServers("servers")
.setDeserializer(new StringDeserializer, new StringDeserializer)
.setSubscribeTopics(Seq("topic"))
.setPollInterval(100)
.setConsumer(new KafkaRecordConsumer[String, String] {
override def consume(record: ConsumerRecord[String, String]): Unit = {
println(s"${record.topic()} - ${record.partition()} - ${record.offset()} - ${record.key()} - ${record.value()}")
}
})
.build()
consumer.start()
Support config functions:
- setBootstrapServers: Configure kafka bootstrap servers
- setDeserialize: Configure data deserializer
- setPollInterval(itv: Int): Configure polling interval in milliseconds
- setSubscribeTopics: Configure topic subscribe strategy
- setGroupId: Configure consumer group id
- setClientId: Configure consumer client id
- setWarningCallback: Configure callback for ignored error
- setFatalCallback: Configure callback for critical error
- setConfig(key, value): Configure with key-value pair
- withConfig( typesafe config): Append configuration from TypeSafe Config object
- Multiple topic subscribe: Subscribe multiple topic
- Pattern topic subscribe: Subscribe topic name match pattern
All Exception that extend from KafkaException is considered as Ignore error
and Warning Callback will be called
All others exception is considered as critical error
, Fatal Callback will be called and consumer will be stop
Future work