From 3df8973ab24da75e753455c96aadf17f69dfec88 Mon Sep 17 00:00:00 2001 From: Mohammad Derakhshani Date: Tue, 31 Aug 2021 16:16:23 -0700 Subject: [PATCH] bugfix: spark null filter pushdown to cosmos DB (#23804) Fixes: #23282 cosmos DB is schema-less, spark is schema-full. when reading data from cosmos DB, spark connector translates both null and undefined values to null spark column value. hence from the spark perspective null and not defined values in cosmos db are the same. expected behaviour: if there is a null spark filter on a column value, that should be translated to either null value or undefined value on the cosmos db query pushdown --- .../azure/cosmos/spark/FilterAnalyzer.scala | 4 +-- .../cosmos/spark/FilterAnalyzerSpec.scala | 28 ++++++++++++++++++- .../cosmos/spark/SparkE2EQueryITest.scala | 2 +- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/FilterAnalyzer.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/FilterAnalyzer.scala index 037500320ab64..76f517b63d490 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/FilterAnalyzer.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/FilterAnalyzer.scala @@ -147,11 +147,11 @@ private case class FilterAnalyzer() { true case IsNull(attr) => - queryBuilder.append(s"IS_NULL(${canonicalCosmosFieldPath(attr)})") + queryBuilder.append(s"(IS_NULL(${canonicalCosmosFieldPath(attr)}) OR NOT(IS_DEFINED(${canonicalCosmosFieldPath(attr)})))") true case IsNotNull(attr) => - queryBuilder.append(s"NOT(IS_NULL(${canonicalCosmosFieldPath(attr)}))") + queryBuilder.append(s"(NOT(IS_NULL(${canonicalCosmosFieldPath(attr)})) AND IS_DEFINED(${canonicalCosmosFieldPath(attr)}))") true case And(leftFilter, rightFilter) => diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/FilterAnalyzerSpec.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/FilterAnalyzerSpec.scala index e3911f7065b73..327be3f73d3a0 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/FilterAnalyzerSpec.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/FilterAnalyzerSpec.scala @@ -3,7 +3,7 @@ package com.azure.cosmos.spark import com.azure.cosmos.models.CosmosParameterizedQuery -import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, EqualTo, Filter, In, StringEndsWith, StringStartsWith, StringContains} +import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, EqualTo, Filter, In, IsNotNull, IsNull, StringContains, StringEndsWith, StringStartsWith} import org.assertj.core.api.Assertions.assertThat // scalastyle:off underscore.import import scala.collection.JavaConverters._ @@ -84,6 +84,32 @@ class FilterAnalyzerSpec extends UnitSpec { query.parameterValues should contain theSameElementsInOrderAs List("خوارزمی") } + "IsNull" should "be translated to cosmos" in { + val filterProcessor = FilterAnalyzer() + + val filters = Array[Filter](IsNull("age")) + val analyzedQuery = filterProcessor.analyze(filters, readConfigWithoutCustomQuery) + analyzedQuery.filtersNotSupportedByCosmos shouldBe empty + analyzedQuery.filtersToBePushedDownToCosmos should contain theSameElementsInOrderAs filters + + val query = analyzedQuery.cosmosParametrizedQuery + query.queryText shouldEqual "SELECT * FROM r WHERE (IS_NULL(r['age']) OR NOT(IS_DEFINED(r['age'])))" + query.parameterNames shouldBe empty + } + + "IsNotNull" should "be translated to cosmos" in { + val filterProcessor = FilterAnalyzer() + + val filters = Array[Filter](IsNotNull("age")) + val analyzedQuery = filterProcessor.analyze(filters, readConfigWithoutCustomQuery) + analyzedQuery.filtersNotSupportedByCosmos shouldBe empty + analyzedQuery.filtersToBePushedDownToCosmos should contain theSameElementsInOrderAs filters + + val query = analyzedQuery.cosmosParametrizedQuery + query.queryText shouldEqual "SELECT * FROM r WHERE (NOT(IS_NULL(r['age'])) AND IS_DEFINED(r['age']))" + query.parameterNames shouldBe empty + } + "nested filter" should "be translated to nested cosmos json filter" in { val filterProcessor = FilterAnalyzer() diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala index 471179d3ef4d8..a6a9fa910f496 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala @@ -767,7 +767,7 @@ class SparkE2EQueryITest } queryPlan = output.toString.replaceAll("#\\d+", "#x") logInfo(s"Query Plan: $queryPlan") - val expected = s"Cosmos Query: SELECT * FROM r WHERE NOT(IS_NULL(r['nestedObject'])) " + + val expected = s"Cosmos Query: SELECT * FROM r WHERE (NOT(IS_NULL(r['nestedObject'])) AND IS_DEFINED(r['nestedObject'])) " + s"AND r['nestedObject']['prop2']=" + s"@param0${System.getProperty("line.separator")} > param: @param0 = 6" queryPlan.contains(expected) shouldEqual true