phymbert / spark-search

Spark Search - high performance advanced search features based on Apache Lucene

GitHub

Spark Search

Maven Central CI license LoC codecov

Spark Search brings advanced full text search features to your Dataframe, Dataset and RDD. Powered by Apache Lucene.

Context

Let's image you have a billion records dataset you want to query on and match against another one using full text search... You do not expect an external datasource or database system than Spark, and of course with the best performances. Spark Search fits your needs: it builds for all parent RDD partitions a one-2-one volatile Lucene index available during the lifecycle of your spark session across your executors local directories and RAM. Strongly typed, Spark Search API plans to support Java, Scala and Python Spark SQL, Dataset and RDD SDKs. Have a look and feel free to contribute!

Getting started

Dataset/DataFrame API (In progress)

  • Scala
import org.apache.spark.search.sql._

val sentences = spark.read.csv("...")
sentences.count("sentence:happy OR sentence:best or sentence:good")

// coming soon: SearchSparkStrategy/LogicPlan & column enhanced with search
sentences.where($"sentence".matches($"searchKeyword" ))

RDD API

  • Scala
import org.apache.spark.search.rdd._

val computersReviewsRDD = sc.parallelize(Seq(Review("AAAAA", Array(3, 3), 3.0, "Ok, this is a good computer to play Civilization IV or World of Warcraft", "11 29, 2010", "XXXXX", "Patrick H.", "Ok for an average user, but not much else.", 1290988800)))
// Number of partition is the number of Lucene index which will be created across your cluster
.repartition(4)

// Count positive review: indexation + count matched doc
computersReviewsRDD.count("reviewText:happy OR reviewText:best or reviewText:good")

// Search for key words
computersReviewsRDD.searchList("reviewText:\"World of Warcraft\" OR reviewText:\"Civilization IV\"", 100)
  .foreach(println)

// /!\ Important lucene indexation is done each time a SearchRDD is computed,
// if you do multiple operations on the same parent RDD, you might have a variable in the driver:
val computersReviewsSearchRDD = computersReviewsRDD.searchRDD(
  SearchOptions.builder[Review]() // See all other options SearchOptions, IndexationOptions and ReaderOptions
    .read((r: ReaderOptions.Builder[Review]) => r.defaultFieldName("reviewText"))
    .analyzer(classOf[EnglishAnalyzer])
    .build())

// Boolean queries and boosting examples returning RDD
computersReviewsSearchRDD.search("(RAM or memory) and (CPU or processor)^4", 10).foreach(println)

// Fuzzy matching
computersReviewsSearchRDD.searchList("reviewerName:Mikey~0.8 or reviewerName:Wiliam~0.4 or reviewerName:jonh~0.2", 100)
  .map(doc => (doc.getSource.reviewerName, doc.getScore))
  .foreach(println)

// RDD full text joining
val softwareReviewsRDD = sc.parallelize(Seq(Review("BBBB", Array(1), 4.0, "I use this and Ulead video studio 11.", "09 17, 2008", "YYYY", "Patrick Holtt", "Great, easy to use and user friendly.", 1221609600)))
val matchesRDD = searchRDD.searchJoin(softwareReviewsRDD, (sr: Review) => s"reviewerName:${"\"" + sr.reviewerName + "\""}~8", 10)
val matchesReviewersRDD = computersReviewsSearchRDD.searchJoin(softwareReviewsRDD, (sr: Review) => s"reviewerName:${"\"" + sr.reviewerName + "\""}~8", 10)
matchesReviewersRDD
  .filter(_.hits.nonEmpty)
  .map(m => (m.doc.reviewerName, m.hits.map(h => (h.source.reviewerName, h.score))))
  .foreach(println)

// Save then restore onto hdfs
matchesReviewersRDD.save("hdfs:///path-for-later-query-on")
val restoredSearchRDD = SearchRDD.load[Review](sc, "hdfs:///path-for-later-query-on")

// Drop duplicates (see options)
restoredSearchRDD.searchDropDuplicates()

See Examples for more details.

  • Java
import org.apache.spark.search.rdd.*;

JavaRDD<Review> reviewRDD = sqlContext.read().json(...).as(Encoders.bean(Review.class)).repartition(2).javaRDD();
SearchRDDJava<Review> searchRDDJava = new SearchRDDJava<>(reviewRDD);

// Count matching docs
searchRDDJava.count("reviewText:good AND reviewText:quality")

// List matching docs
searchRDDJava.searchList("reviewText:recommend~0.8", 100).forEach(System.out::println);

// Pass custom search options
searchRDDJava = new SearchRDDJava<>(reviewRDD,
        SearchOptions.<Review>builder().analyzer(ShingleAnalyzerWrapper.class).build());

searchRDDJava.searchList("reviewerName:Patrik", 100)
        .stream()
        .map(SearchRecord::getSource)
        .map(Review::getReviewerName)
        .forEach(System.out::println);

See Examples for more details.

Benchmark

All benchmarks run under AWS EMR with 3 Spark workers EC2 m5.xlarge and/or 3 r5.large.elasticsearch data nodes for AWS Elasticsearch. The general use cases is to match company names against two data sets (7M vs 600K rows)

Feature SearchRDD Elasticsearch Hadoop LuceneRDD Spark regex matches (no score)
Index + Count matches 51s 486s (*) 400s 12s
Index + Entity matching 128s 719s (*) 597s NA (>1h)

DISCLAIMER Benchmarks methodology or related results may improve, feel free to submit a pull request.

(*) Results of elasticsearch hadoop benchmark must be carefully reviewed, contribution welcomed

Release notes

v0.1.6
  • Switch to multi modules build: core, sql, examples, benchmark
  • Improve the github build with running examples against a spark cluster in docker
  • Improve licence header checking
  • RDD lineage works the same on all DAG Scheduler (Yarn/Standalone): SearchIndexRDD computes zipped index per partition for the next rdd
  • CI tests examples under Yarn and Standalone cluster mode
  • Fix default field where not used under certain circumstances
v0.1.5
  • Fix SearchRDD#searchDropDuplicate method
  • Save/Restore search RDD to/from HDF
  • Yarn support and tested over AWS EMR
  • Adding and running benchmark examples with alternatives libraries on AWS EMR
  • Support of spark 3.0.0
v0.1.4
  • Optimize searchJoin for small num partition
v0.1.3
  • Fix searchJoin on multiple partitions
v0.1.2
  • Released to maven central
v0.1.1
  • First stable version of the Scala Spark Search RDD
  • Support of SearchRDD#searchJoin(RDD, S => String) - join 2 RDD by matching queries
  • Support of SearchRDD#dropDuplicates(S => String) - deduplicate an RDD based on matching query
v0.1.0
  • Support of SearchRDD#count(String) - count matching hits
  • Support of SearchRDD#searchList(String) - search matching records as list
  • Support of SearchRDD#search(String) - search matching records as RDD

Building Spark Search

git clone https://github.com/phymbert/spark-search.git
cd spark-search
mvn clean verify

Known alternatives