Skip to content

Commit

Permalink
bugfix: spark null filter pushdown to cosmos DB (#23804)
Browse files Browse the repository at this point in the history
Fixes: #23282

cosmos DB is schema-less, spark is schema-full.

when reading data from cosmos DB, spark connector translates both null and undefined values to null spark column value.
hence from the spark perspective null and not defined values in cosmos db are the same.

expected behaviour:

if there is a null spark filter on a column value, that should be translated to either null value or undefined value on the cosmos db query pushdown
  • Loading branch information
moderakh authored Aug 31, 2021
1 parent 3277d65 commit 3df8973
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ private case class FilterAnalyzer() {
true

case IsNull(attr) =>
queryBuilder.append(s"IS_NULL(${canonicalCosmosFieldPath(attr)})")
queryBuilder.append(s"(IS_NULL(${canonicalCosmosFieldPath(attr)}) OR NOT(IS_DEFINED(${canonicalCosmosFieldPath(attr)})))")
true

case IsNotNull(attr) =>
queryBuilder.append(s"NOT(IS_NULL(${canonicalCosmosFieldPath(attr)}))")
queryBuilder.append(s"(NOT(IS_NULL(${canonicalCosmosFieldPath(attr)})) AND IS_DEFINED(${canonicalCosmosFieldPath(attr)}))")
true

case And(leftFilter, rightFilter) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
package com.azure.cosmos.spark

import com.azure.cosmos.models.CosmosParameterizedQuery
import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, EqualTo, Filter, In, StringEndsWith, StringStartsWith, StringContains}
import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, EqualTo, Filter, In, IsNotNull, IsNull, StringContains, StringEndsWith, StringStartsWith}
import org.assertj.core.api.Assertions.assertThat
// scalastyle:off underscore.import
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -84,6 +84,32 @@ class FilterAnalyzerSpec extends UnitSpec {
query.parameterValues should contain theSameElementsInOrderAs List("خوارزمی")
}

"IsNull" should "be translated to cosmos" in {
val filterProcessor = FilterAnalyzer()

val filters = Array[Filter](IsNull("age"))
val analyzedQuery = filterProcessor.analyze(filters, readConfigWithoutCustomQuery)
analyzedQuery.filtersNotSupportedByCosmos shouldBe empty
analyzedQuery.filtersToBePushedDownToCosmos should contain theSameElementsInOrderAs filters

val query = analyzedQuery.cosmosParametrizedQuery
query.queryText shouldEqual "SELECT * FROM r WHERE (IS_NULL(r['age']) OR NOT(IS_DEFINED(r['age'])))"
query.parameterNames shouldBe empty
}

"IsNotNull" should "be translated to cosmos" in {
val filterProcessor = FilterAnalyzer()

val filters = Array[Filter](IsNotNull("age"))
val analyzedQuery = filterProcessor.analyze(filters, readConfigWithoutCustomQuery)
analyzedQuery.filtersNotSupportedByCosmos shouldBe empty
analyzedQuery.filtersToBePushedDownToCosmos should contain theSameElementsInOrderAs filters

val query = analyzedQuery.cosmosParametrizedQuery
query.queryText shouldEqual "SELECT * FROM r WHERE (NOT(IS_NULL(r['age'])) AND IS_DEFINED(r['age']))"
query.parameterNames shouldBe empty
}

"nested filter" should "be translated to nested cosmos json filter" in {
val filterProcessor = FilterAnalyzer()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ class SparkE2EQueryITest
}
queryPlan = output.toString.replaceAll("#\\d+", "#x")
logInfo(s"Query Plan: $queryPlan")
val expected = s"Cosmos Query: SELECT * FROM r WHERE NOT(IS_NULL(r['nestedObject'])) " +
val expected = s"Cosmos Query: SELECT * FROM r WHERE (NOT(IS_NULL(r['nestedObject'])) AND IS_DEFINED(r['nestedObject'])) " +
s"AND r['nestedObject']['prop2']=" +
s"@param0${System.getProperty("line.separator")} > param: @param0 = 6"
queryPlan.contains(expected) shouldEqual true
Expand Down

0 comments on commit 3df8973

Please sign in to comment.