allwefantasy / spark-adhoc-kafka Edit

This is a datasource implementation for quick query in Kafka with Spark

Version Matrix

Spark AdHoc Kafka Datasource

This is a datasource implementation for quick query in Kafka with Spark. You can control the parallelism of data fetching from kafka, and is not limited by the original size of kafka partitions. It is useful especially when you just want to filter some data from kafka sometimes and it's not a daily job. It saves you a lot of time since the traditional way is consuming kafka and write the data to HDFS/ES first.

Requirements

This library requires Spark 2.4+ (tested) and Kafka 0.10.0+. Some older versions of Spark may work too but they are not officially supported.

Liking

You can link against this library in your program at the following coordinates:

Scala 2.11

groupId: tech.mlsql
artifactId: spark-adhoc-kafka_2.11
version: 0.1.2

Usage

Spark DataFrame:

val df = spark
      .read
      .format("org.apache.spark.sql.kafka010.AdHocKafkaSourceProvider")
      .option("kafka.bootstrap.servers","127.0.0.1:9200")
      .option( "startingOffsets", "earliest")
      .option( "endingOffsets" , "latest")                          
      .option("multiplyFactor","2") // the parallelism is  multiplyFactor * num of kafka partitions
      .option("maxSizePerPartition","100000") // the parallelism is  total records/ maxSizePerPartition  
      .option("subscribe", topic)
      .load().selectExpr("CAST(value AS STRING)")

The priority of multiplyFactor is higher then maxSizePerPartition.

MLSQL:

load adHocKafka.`topic` where 
kafka.bootstrap.servers="127.0.0.1:9200"
and multiplyFactor="2" 
as table1;

select count(*) from table1 where value like "%yes%" as output;

With Spark AdHoc Kafka, you can use startingOffsets/endingOffsets to restrict the range and speed up the query. But in most case, we hope we can query the data within a specific time interval.

Spark DataFrame:

val df = spark
      .read
      .format("org.apache.spark.sql.kafka010.AdHocKafkaSourceProvider")
      .option("kafka.bootstrap.servers","127.0.0.1:9200")
      .option("multiplyFactor","2") // the parallelism is  multiplyFactor * num of kafka partitions
      .option("maxSizePerPartition","100000") // the parallelism is  total records/ maxSizePerPartition
      .option("timeFormat","yyyyMMdd") 
      .option("startingTime","20170101") 
      .option("endingTime","20180101")   
      .option("subscribe", topic)
      .load().selectExpr("CAST(value AS STRING)")

MLSQL:

load adHocKafka.`topic` where 

kafka.bootstrap.servers="127.0.0.1:9200"

and multiplyFactor="2" 

and timeFormat="yyyyMMdd"
and startingTime="20170101"
and endingTime="20180101"

as table1;

select cast(value as string) as textValue from table1 
as table2;

select count(*) from table2 where textValue like "%yes%" 
as output;