Spark Search brings advanced full text search features to your Dataframe, Dataset and RDD. Powered by Apache Lucene.
Let's assume you have a billion records dataset you want to query on and match against another one using full text search... You do not expect an external datasource or database system than Spark, and of course with the best performances. Spark Search fits your needs: it builds for all parent RDD partitions a one-2-one volatile Lucene index available during the lifecycle of your spark session across your executors local directories and RAM. Strongly typed, Spark Search supports Java and Scala RDD and plans to support Python, Spark SQL and Dataset. Have a look and feel free to contribute!
- Scala
import org.apache.spark.search.rdd._ // to implicitly enhance RDD with search features
// Load some Amazon computer user reviews
val computersReviews: RDD[Review] = loadReviews("**/*/reviews_Computers.json.gz")
// Number of partition is the number of Lucene index which will be created across your cluster
.repartition(4)
// Count positive review: indexation + count matched doc with fuzzy matching
computersReviews.count("reviewText:happy OR reviewText:best OR reviewText:good OR reviewText:\"sounds great\"~")
// Search for key words
computersReviews.searchList("reviewText:\"World of Warcraft\" OR reviewText:\"Civilization IV\"",
topK = 100, minScore = 10)
.foreach(println)
// /!\ Important lucene indexation is done each time a SearchRDD is computed,
// if you do multiple operations on the same parent RDD, you might have a variable in the driver:
val computersReviewsSearchRDD: SearchRDD[Review] = computersReviewsRDD.searchRDD(
SearchOptions.builder[Review]() // See all other options SearchOptions, IndexationOptions and ReaderOptions
.read((r: ReaderOptions.Builder[Review]) => r.defaultFieldName("reviewText"))
.analyzer(classOf[EnglishAnalyzer])
.build())
// Boolean queries and boosting examples returning RDD
computersReviewsSearchRDD.search("(RAM OR memory) AND (CPU OR processor~)^4", 15)
.collect()
.foreach(println)
// Fuzzy matching
computersReviews.searchList("(reviewerName:Mikey~0.8) OR (reviewerName:Wiliam~0.4) OR (reviewerName:jonh~0.2)",
topKByPartition = 10)
.map(doc => s"${doc.source.reviewerName}=${doc.score}")
.foreach(println)
// RDD full text joining - example here searches for persons
// who did both computer and software reviews with fuzzy matching on reviewer name
val softwareReviews: RDD[Review] = loadReviews("**/*/reviews_Software_10.json.gz")
val matchesReviewers: RDD[(Review, Array[SearchRecord[Review]])] = computersReviews.matches(
softwareReviewsRDD.filter(_.reviewerName != null).map(sr => (sr.asin, sr)),
(sr: Review) => "reviewerName:\"" + sr.reviewerName + "\"~0.4",
topK = 10)
.values
matchesReviewersRDD
.filter(_._2.nonEmpty)
.map(m => (s"Reviewer ${m._1.reviewerName} reviews computer ${m._1.asin} but also on software:",
m._2.map(h => s"${h.source.reviewerName}=${h.score}=${h.source.asin}").toList))
.collect()
.foreach(println)
// Drop duplicates
println("Dropping duplicated reviewers:")
val distinctReviewers: RDD[String] = computersReviews.searchDropDuplicates[Int, Review](
queryBuilder = queryStringBuilder(sr => "reviewerName:\"" + sr.reviewerName.replace('"', ' ') + "\"~0.4")
).map(sr => sr.reviewerName)
distinctReviewers.collect().foreach(println)
// Save then restore onto hdfs
matchesReviewersRDD.save("/tmp/hdfs-pathname")
val restoredSearchRDD: SearchRDD[Review] = SearchRDD.load[Review](sc, "/tmp/hdfs-pathname")
// Restored index can be used as classical rdd
val topReviewer = restoredSearchRDD.map(r => (r.reviewerID, 1))
.reduceByKey(_ + _)
.sortBy(_._2, ascending = false)
.take(1).head
println(s"${topReviewer._1} has submitted ${topReviewer._2} reviews")
See Examples and Documentation for more details.
- Java
import org.apache.spark.search.rdd.*;
class SearchRDDJava {
public void examples() {
System.err.println("Loading reviews...");
JavaRDD<Review> reviewsRDD = loadReviewRDD(spark, "http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Computers.json.gz");
// Create the SearchRDD based on the JavaRDD to enjoy search features
SearchRDDJava<Review> computerReviews = SearchRDDJava.of(reviewsRDD, Review.class);
// Count matching docs
System.err.println("Computer reviews with good recommendations: "
+ computerReviews.count("reviewText:good AND reviewText:quality"));
// List matching docs
System.err.println("Reviews with good recommendations and fuzzy: ");
SearchRecordJava<Review>[] goodReviews = computerReviews
.searchList("reviewText:recommend~0.8", 100, 0);
Arrays.stream(goodReviews).forEach(r -> System.err.println(r));
// Pass custom search options
computerReviews = SearchRDDJava.<Review>builder()
.rdd(reviewsRDD)
.runtimeClass(Review.class)
.options(SearchOptions.<Review>builder().analyzer(ShingleAnalyzerWrapper.class).build())
.build();
System.err.println("Top 100 reviews from Patosh with fuzzy with 0.5 minimum score:");
computerReviews.search("reviewerName:Patrik~0.5", 100, 0.5)
.map(SearchRecordJava::getSource)
.map(Review::getReviewerName)
.distinct()
.collect()
.foreach(r -> System.err.println(r));
System.err.println("Loading software reviews...");
JavaRDD<Review> softwareReviews = loadReviewRDD(spark, "http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Software_10.json.gz");
System.err.println("Top 10 reviews from same reviewer between computer and software:");
computerReviews.matches(softwareReviews.filter(r -> r.reviewerName != null && !r.reviewerName.isEmpty())
.mapToPair(sr -> new Tuple2<String, Review>(sr.asin, sr)),
r -> String.format("reviewerName:\"%s\"~0.4", r.reviewerName.replaceAll("[\"]", " ")), 10, 0)
.values()
.filter(matches -> matches._2.length > 0)
.map(sameReviewerMatches -> String.format("Reviewer:%s reviews computer %s and software %s (score on names matching are %s)",
sameReviewerMatches._1.reviewerName,
sameReviewerMatches._1.asin,
Arrays.stream(sameReviewerMatches._2).map(h -> h.source.asin).collect(toList()),
Arrays.stream(sameReviewerMatches._2).map(h -> h.source.reviewerName + ":" + h.score).collect(toList())
))
.collect()
.foreach(matches -> System.err.println(matches));
// Save and search reload example
SearchRDDJava.of(softwareReviews.repartition(8), Review.class)
.save("/tmp/hdfs-pathname");
SearchRDDJava<Review> restoredSearchRDD = SearchRDDJava
.load(sc, "/tmp/hdfs-pathname", Review.class);
System.err.println("Software reviews with good recommendations: "
+ restoredSearchRDD.count("reviewText:good AND reviewText:quality"));
}
}
See Examples and Documentationfor more details.
- Python (In progress)
from pyspark import SparkContext
import pysparksearch
data = [{"firstName": "Geoorge", "lastName": "Michael"},
{"firstName": "Bob", "lastName": "Marley"},
{"firstName": "Agnès", "lastName": "Bartoll"}]
sc = SparkContext()
sc.parallelize(data).count("firstName:agnes~")
- Scala
import org.apache.spark.search.sql._
val sentences = spark.read.csv("...")
sentences.count("sentence:happy OR sentence:best OR sentence:good")
// coming soon: SearchSparkStrategy/LogicPlan & column enhanced with search
sentences.where($"sentence".matches($"searchKeyword" ))
All benchmarks run under AWS EMR with 3 Spark workers EC2 m5.xlarge and/or 3 r5.large.elasticsearch data nodes for AWS Elasticsearch. The general use cases is to match company names against two data sets (7M vs 600K rows)
Feature | SearchRDD | Elasticsearch Hadoop | LuceneRDD | Spark regex matches (no score) |
---|---|---|---|---|
Index + Count matches | 51s | 486s (*) | 400s | 12s |
Index + Entity matching | 128s | 719s (*) | 597s | NA (>1h) |
DISCLAIMER Benchmarks methodology or related results may improve, feel free to submit a pull request.
(*) Results of elasticsearch hadoop benchmark must be carefully reviewed, contribution welcomed
- SearchRDD#searchJoin renamed to SearchRDD#matches as it does automatically the reduction in addition of a simple join.
- Fix matches was using only one core & improve join and dropDuplicate performances drastically
- Scala 2.12 by default
- Fix deployment descriptor for scala 2.11
- SearchRDD is now iterable as a classical RDD, reloaded RDD can now be used as any other RDD
- Upgrade support matrix from spark-2.4.8 & hadoop-2.10.1 to spark-3.1.2 & hadoop-3.3.1, built by default for scala 2.12
- Enable caching of search index rdd only for yarn cluster, and as an option.
- Remove scala binary version in parent module artifact name
- Expose SearchRDD as a public API to ease Dataset binding and hdfs reloading
- Fix and enhance Search Java RDD API
- Fix string query builder does not support analyzer
- Switch to multi modules build: core, sql, examples, benchmark
- Improve the github build with running examples against a spark cluster in docker
- Improve licence header checking
- RDD lineage works the same on all DAG Scheduler (Yarn/Standalone): SearchIndexRDD computes zipped index per partition for the next rdd
- CI tests examples under Yarn and Standalone cluster mode
- Fix default field where not used under certain circumstances
- Fix SearchRDD#searchDropDuplicate method
- Save/Restore search RDD to/from HDF
- Yarn support and tested over AWS EMR
- Adding and running benchmark examples with alternatives libraries on AWS EMR
- Support of spark 3.0.0
- Optimize searchJoin for small num partition
- Fix searchJoin on multiple partitions
- Released to maven central
- First stable version of the Scala Spark Search RDD
- Support of
SearchRDD#searchJoin(RDD, S => String)
- join 2 RDD by matching queries - Support of
SearchRDD#dropDuplicates(S => String)
- deduplicate an RDD based on matching query
- Support of
SearchRDD#count(String)
- count matching hits - Support of
SearchRDD#searchList(String)
- search matching records as list - Support of
SearchRDD#search(String)
- search matching records as RDD
- Maven
<dependency>
<groupId>io.github.phymbert</groupId>
<artifactId>spark-search_2.12</artifactId>
<version>${spark.search.version}</version>
</dependency>
- Gradle
implementation 'io.github.phymbert:spark-search_2.12:$sparkSearchVersion'
git clone https://github.com/phymbert/spark-search.git
cd spark-search
mvn clean verify