From 4cc1976012c8fa12c39d84c80f33ecc96bedc05c Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Tue, 20 Oct 2020 18:24:34 -0700 Subject: [PATCH 01/12] First attempt --- .../spark/sql/hive/client/HiveShim.scala | 50 +++++++++++++------ 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 4ab0599e4477..fa035a0d5cc1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{AtomicType, IntegralType, StringType} +import org.apache.spark.sql.types.{AtomicType, DataType, IntegralType, NumericType, StringType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -646,16 +646,16 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } object ExtractableLiteral { - def unapply(expr: Expression): Option[String] = expr match { + def unapply(expr: Expression): Option[(String, DataType)] = expr match { case Literal(null, _) => None // `null`s can be cast as other types; we want to avoid NPEs. - case Literal(value, _: IntegralType) => Some(value.toString) - case Literal(value, _: StringType) => Some(quoteStringLiteral(value.toString)) + case l @ Literal(value, _: IntegralType) => Some(value.toString, l.dataType) + case Literal(value, _: StringType) => Some(quoteStringLiteral(value.toString), StringType) case _ => None } } object ExtractableLiterals { - def unapply(exprs: Seq[Expression]): Option[Seq[String]] = { + def unapply(exprs: Seq[Expression]): Option[Seq[(String, DataType)]] = { // SPARK-24879: The Hive metastore filter parser does not support "null", but we still want // to push down as many predicates as we can while still maintaining correctness. // In SQL, the `IN` expression evaluates as follows: @@ -726,9 +726,9 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled object ExtractAttribute { - def unapply(expr: Expression): Option[Attribute] = { + def unapply(expr: Expression): Option[(Attribute, DataType)] = { expr match { - case attr: Attribute => Some(attr) + case attr: Attribute => Some(attr, attr.dataType) case Cast(child @ AtomicType(), dt: AtomicType, _) if Cast.canUpCast(child.dataType.asInstanceOf[AtomicType], dt) => unapply(child) case _ => None @@ -736,21 +736,41 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } } + def compatibleTypes(dt1: Any, dt2: Any): Boolean = + (dt1, dt2) match { + case (_: NumericType, _: NumericType) => true + case (_: StringType, _: StringType) => true + case _ => false + } + + def compatibleTypesIn(dt1: Any, dts: Seq[Any]): Boolean = { + dts.forall(compatibleTypes(dt1, _)) + } + def convert(expr: Expression): Option[String] = expr match { - case In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values)) - if useAdvanced => - Some(convertInToOr(name, values)) + case In(ExtractAttribute(SupportedAttribute(name), dt1), ExtractableLiterals(values)) + if useAdvanced && compatibleTypesIn(dt1, values.map(_._2)) => + val newValues = values.map(_._1) + Some(convertInToOr(name, newValues)) - case InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values)) + case InSet(ExtractAttribute(SupportedAttribute(name), dt1), ExtractableValues(values)) if useAdvanced => Some(convertInToOr(name, values)) case op @ SpecialBinaryComparison( - ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) => + ExtractAttribute(SupportedAttribute(name), dt1), ExtractableLiteral(value, dt2)) + if compatibleTypes(dt1, dt2) => Some(s"$name ${op.symbol} $value") case op @ SpecialBinaryComparison( - ExtractableLiteral(value), ExtractAttribute(SupportedAttribute(name))) => + ExtractAttribute(SupportedAttribute(name), dt1), ExtractableLiteral(value, dt2)) + if !compatibleTypes(dt1, dt2) => + logWarning(s"Not creating filter because $dt1 not same as $dt2") + None + + case op @ SpecialBinaryComparison( + ExtractableLiteral(value, dt2), ExtractAttribute(SupportedAttribute(name), dt1)) + if (dt1 == dt2) => Some(s"$value ${op.symbol} $name") case And(expr1, expr2) if useAdvanced => @@ -788,11 +808,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition] = { - + logWarning(s"Filters on entry are ${predicates.toList}") // Hive getPartitionsByFilter() takes a string that represents partition // predicates like "str_key=\"value\" and int_key=1 ..." val filter = convertFilters(table, predicates) - + logWarning(s"Filters after convert are $filter") val partitions = if (filter.isEmpty) { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] From e1843529593fca4fed302cbd2c3850b5f914688b Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Wed, 21 Oct 2020 10:14:20 -0700 Subject: [PATCH 02/12] Fix test --- .../scala/org/apache/spark/sql/hive/client/FiltersSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala index 2a4efd0cce6e..123ab8110a37 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala @@ -61,7 +61,7 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest { filterTest("int and string filter", (Literal(1) === a("intcol", IntegerType)) :: (Literal("a") === a("strcol", IntegerType)) :: Nil, - "1 = intcol and \"a\" = strcol") + "1 = intcol") filterTest("skip varchar", (Literal("") === a("varchar", StringType)) :: Nil, From 704d56535d70378e40690be35f1e27789b324d62 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Wed, 21 Oct 2020 14:43:55 -0700 Subject: [PATCH 03/12] Handle inset as well --- .../spark/sql/hive/client/HiveShim.scala | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index fa035a0d5cc1..5d62b80bcc82 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{AtomicType, DataType, IntegralType, NumericType, StringType} +import org.apache.spark.sql.types.{AtomicType, ByteType, DataType, IntegerType, IntegralType, LongType, ShortType, StringType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -682,15 +682,15 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } object ExtractableValues { - private lazy val valueToLiteralString: PartialFunction[Any, String] = { - case value: Byte => value.toString - case value: Short => value.toString - case value: Int => value.toString - case value: Long => value.toString - case value: UTF8String => quoteStringLiteral(value.toString) + private lazy val valueToLiteralString: PartialFunction[Any, (String, DataType)] = { + case value: Byte => (value.toString, ByteType) + case value: Short => (value.toString, ShortType) + case value: Int => (value.toString, IntegerType) + case value: Long => (value.toString, LongType) + case value: UTF8String => (quoteStringLiteral(value.toString), StringType) } - def unapply(values: Set[Any]): Option[Seq[String]] = { + def unapply(values: Set[Any]): Option[Seq[(String, DataType)]] = { val extractables = values.toSeq.map(valueToLiteralString.lift) if (extractables.nonEmpty && extractables.forall(_.isDefined)) { Some(extractables.map(_.get)) @@ -738,7 +738,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { def compatibleTypes(dt1: Any, dt2: Any): Boolean = (dt1, dt2) match { - case (_: NumericType, _: NumericType) => true + case (_: IntegralType, _: IntegralType) => true case (_: StringType, _: StringType) => true case _ => false } @@ -748,13 +748,14 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } def convert(expr: Expression): Option[String] = expr match { - case In(ExtractAttribute(SupportedAttribute(name), dt1), ExtractableLiterals(values)) - if useAdvanced && compatibleTypesIn(dt1, values.map(_._2)) => - val newValues = values.map(_._1) - Some(convertInToOr(name, newValues)) + case In(ExtractAttribute(SupportedAttribute(name), dt1), ExtractableLiterals(valsAndDts)) + if useAdvanced && compatibleTypesIn(dt1, valsAndDts.map(_._2)) => + val values = valsAndDts.map(_._1) + Some(convertInToOr(name, values)) - case InSet(ExtractAttribute(SupportedAttribute(name), dt1), ExtractableValues(values)) - if useAdvanced => + case InSet(ExtractAttribute(SupportedAttribute(name), dt1), ExtractableValues(valsAndDts)) + if useAdvanced && compatibleTypesIn(dt1, valsAndDts.map(_._2)) => + val values = valsAndDts.map(_._1) Some(convertInToOr(name, values)) case op @ SpecialBinaryComparison( From e4bd24f91312310d2a1df25be87b6ddd8dc0455d Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Wed, 21 Oct 2020 17:51:27 -0700 Subject: [PATCH 04/12] Add some tests --- .../spark/sql/hive/client/FiltersSuite.scala | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala index 123ab8110a37..6cde440470b3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala @@ -60,8 +60,25 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest { "1 = intcol") filterTest("int and string filter", - (Literal(1) === a("intcol", IntegerType)) :: (Literal("a") === a("strcol", IntegerType)) :: Nil, - "1 = intcol") + (Literal(1) === a("intcol", IntegerType)) :: (Literal("a") === a("strcol", StringType)) :: Nil, + "1 = intcol and \"a\" = strcol") + + filterTest("int and string/int filter", + (Literal(1) === a("intcol1", IntegerType)) :: + (Literal("a") === a("intcol2", IntegerType)) :: Nil, + "1 = intcol1") + + filterTest("int filter with in", + (a("intcol", IntegerType) in (Literal(1), Literal(2))) :: Nil, + "(intcol = 1 or intcol = 2)") + + filterTest("int filter with inset", + (a("intcol", IntegerType) in ((0 to 11).map(Literal(_)): _*)) :: Nil, + "(" + (0 to 11).map(x => s"intcol = $x").mkString(" or ") + ")") + + filterTest("string filter with in", + (a("strcol", StringType) in (Literal("1"), Literal("2"))) :: Nil, + "(strcol = \"1\" or strcol = \"2\")") filterTest("skip varchar", (Literal("") === a("varchar", StringType)) :: Nil, From c6cb7d262b338c4ca57ecb0a5dacf2d04baef54a Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Fri, 23 Oct 2020 08:08:18 -0700 Subject: [PATCH 05/12] Add additional tests --- .../org/apache/spark/sql/hive/client/FiltersSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala index 6cde440470b3..e27310a4cc08 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala @@ -72,10 +72,18 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest { (a("intcol", IntegerType) in (Literal(1), Literal(2))) :: Nil, "(intcol = 1 or intcol = 2)") + filterTest("int/string filter with in", + (a("intcol", IntegerType) in (Literal("1"), Literal("2"))) :: Nil, + "") + filterTest("int filter with inset", (a("intcol", IntegerType) in ((0 to 11).map(Literal(_)): _*)) :: Nil, "(" + (0 to 11).map(x => s"intcol = $x").mkString(" or ") + ")") + filterTest("int/string filter with inset", + (a("intcol", IntegerType) in ((0 to 11).map(x => Literal(x.toString)): _*)) :: Nil, + "") + filterTest("string filter with in", (a("strcol", StringType) in (Literal("1"), Literal("2"))) :: Nil, "(strcol = \"1\" or strcol = \"2\")") From 80b000640b2a1bfdf0904d5733dc4d84cad70d12 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Thu, 29 Oct 2020 16:59:13 -0700 Subject: [PATCH 06/12] Add additional test --- .../execution/PruneHiveTablePartitionsSuite.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala index 06aea084330f..47cc61537419 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala @@ -80,6 +80,18 @@ class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase { } } + test("SPARK-33098: Don't push partition filter with mismatched datatypes to metastore") { + withTable("t") { + sql("create table t (a int) partitioned by (b int) stored as parquet") + + // There's only one test case because TestHiveSparkSession sets + // hive.metastore.integral.jdo.pushdown=true, which, as a side effect, prevents + // the Metaexception for most of the problem cases. Only the case below still throws + // a MetaException when Hive is configured this way + sql("select * from t where cast(b as string) > '1'").show(false) + } + } + override def getScanExecPartitionSize(plan: SparkPlan): Long = { plan.collectFirst { case p: HiveTableScanExec => p From 4b77288b158b693ccfb08eb80980be057a8f4ca6 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Thu, 29 Oct 2020 18:37:12 -0700 Subject: [PATCH 07/12] Add another case --- .../hive/execution/PruneHiveTablePartitionsSuite.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala index 47cc61537419..fa03c2b48230 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala @@ -84,11 +84,12 @@ class PruneHiveTablePartitionsSuite extends PrunePartitionSuiteBase { withTable("t") { sql("create table t (a int) partitioned by (b int) stored as parquet") - // There's only one test case because TestHiveSparkSession sets + // There are only two test cases because TestHiveSparkSession sets // hive.metastore.integral.jdo.pushdown=true, which, as a side effect, prevents - // the Metaexception for most of the problem cases. Only the case below still throws - // a MetaException when Hive is configured this way - sql("select * from t where cast(b as string) > '1'").show(false) + // the Metaexception for most of the problem cases. Only the two cases below + // would still throw a MetaException when Hive is configured this way + sql("select cast(b as string) b from t").filter("b > '1'").collect + sql("select * from t where cast(b as string) > '1'").collect } } From f85d965b2de964ad923ff17eacc86dd5b75a5c95 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Fri, 30 Oct 2020 09:11:26 -0700 Subject: [PATCH 08/12] Remove debugging log messages --- .../scala/org/apache/spark/sql/hive/client/HiveShim.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 5d62b80bcc82..a8b0346a509c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -766,7 +766,6 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { case op @ SpecialBinaryComparison( ExtractAttribute(SupportedAttribute(name), dt1), ExtractableLiteral(value, dt2)) if !compatibleTypes(dt1, dt2) => - logWarning(s"Not creating filter because $dt1 not same as $dt2") None case op @ SpecialBinaryComparison( @@ -809,13 +808,12 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition] = { - logWarning(s"Filters on entry are ${predicates.toList}") // Hive getPartitionsByFilter() takes a string that represents partition // predicates like "str_key=\"value\" and int_key=1 ..." val filter = convertFilters(table, predicates) - logWarning(s"Filters after convert are $filter") val partitions = if (filter.isEmpty) { + logDebug(s"Falling back to getting all partitions") getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") From ac73ebd6d196c6e653da1ee71f84b410540204ef Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Fri, 30 Oct 2020 09:55:36 -0700 Subject: [PATCH 09/12] Style fix --- .../main/scala/org/apache/spark/sql/hive/client/HiveShim.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index a8b0346a509c..94aece1b2dfe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -770,7 +770,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { case op @ SpecialBinaryComparison( ExtractableLiteral(value, dt2), ExtractAttribute(SupportedAttribute(name), dt1)) - if (dt1 == dt2) => + if (dt1 == dt2) => Some(s"$value ${op.symbol} $name") case And(expr1, expr2) if useAdvanced => From b89a6aa95a9495c9d672a79d3887a7fdb8745e82 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Fri, 30 Oct 2020 10:33:39 -0700 Subject: [PATCH 10/12] Put back some empty lines --- .../main/scala/org/apache/spark/sql/hive/client/HiveShim.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 94aece1b2dfe..0c35cb5e80f7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -808,9 +808,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition] = { + // Hive getPartitionsByFilter() takes a string that represents partition // predicates like "str_key=\"value\" and int_key=1 ..." val filter = convertFilters(table, predicates) + val partitions = if (filter.isEmpty) { logDebug(s"Falling back to getting all partitions") From f0bafe99d6b477b0318e30a513893c9bcbfaa552 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sun, 1 Nov 2020 12:10:03 -0800 Subject: [PATCH 11/12] Small fix; fix test --- .../scala/org/apache/spark/sql/hive/client/HiveShim.scala | 7 +------ .../org/apache/spark/sql/hive/client/FiltersSuite.scala | 7 ++++++- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 0c35cb5e80f7..f28eccc7b490 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -763,14 +763,9 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { if compatibleTypes(dt1, dt2) => Some(s"$name ${op.symbol} $value") - case op @ SpecialBinaryComparison( - ExtractAttribute(SupportedAttribute(name), dt1), ExtractableLiteral(value, dt2)) - if !compatibleTypes(dt1, dt2) => - None - case op @ SpecialBinaryComparison( ExtractableLiteral(value, dt2), ExtractAttribute(SupportedAttribute(name), dt1)) - if (dt1 == dt2) => + if compatibleTypes(dt1, dt2) => Some(s"$value ${op.symbol} $name") case And(expr1, expr2) if useAdvanced => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala index e27310a4cc08..1e949deb2ccd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala @@ -63,7 +63,12 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest { (Literal(1) === a("intcol", IntegerType)) :: (Literal("a") === a("strcol", StringType)) :: Nil, "1 = intcol and \"a\" = strcol") - filterTest("int and string/int filter", + filterTest("int and int column/string literal filter", + (a("intcol1", IntegerType) === Literal(1)) :: + (a("intcol2", IntegerType) === (Literal("a"))) :: Nil, + "intcol1 = 1") + + filterTest("int and int column/string literal filter backwards", (Literal(1) === a("intcol1", IntegerType)) :: (Literal("a") === a("intcol2", IntegerType)) :: Nil, "1 = intcol1") From a186127197321f4dd3c52ac69784f44e938c5823 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sun, 1 Nov 2020 13:37:08 -0800 Subject: [PATCH 12/12] Don't throw away all filters with mismatched datatypes --- .../spark/sql/hive/client/HiveShim.scala | 31 +++++++++++++++++++ .../spark/sql/hive/client/FiltersSuite.scala | 10 ++++++ 2 files changed, 41 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index f28eccc7b490..890d7810507b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -747,6 +747,23 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { dts.forall(compatibleTypes(dt1, _)) } + def fixValue(quotedValue: String, desiredType: DataType): Option[Any] = try { + val value = quotedValue.init.tail // remove leading and trailing quotes + desiredType match { + case LongType => + Some(value.toLong) + case IntegerType => + Some(value.toInt) + case ShortType => + Some(value.toShort) + case ByteType => + Some(value.toByte) + } + } catch { + case _: NumberFormatException => + None + } + def convert(expr: Expression): Option[String] = expr match { case In(ExtractAttribute(SupportedAttribute(name), dt1), ExtractableLiterals(valsAndDts)) if useAdvanced && compatibleTypesIn(dt1, valsAndDts.map(_._2)) => @@ -763,11 +780,25 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { if compatibleTypes(dt1, dt2) => Some(s"$name ${op.symbol} $value") + case op @ SpecialBinaryComparison( + ExtractAttribute(SupportedAttribute(name), dt1), ExtractableLiteral(rawValue, dt2)) + if dt1.isInstanceOf[IntegralType] && dt2.isInstanceOf[StringType] => + fixValue(rawValue, dt1).map { value => + s"$name ${op.symbol} $value" + } + case op @ SpecialBinaryComparison( ExtractableLiteral(value, dt2), ExtractAttribute(SupportedAttribute(name), dt1)) if compatibleTypes(dt1, dt2) => Some(s"$value ${op.symbol} $name") + case op @ SpecialBinaryComparison( + ExtractableLiteral(rawValue, dt2), ExtractAttribute(SupportedAttribute(name), dt1)) + if dt1.isInstanceOf[IntegralType] && dt2.isInstanceOf[StringType] => + fixValue(rawValue, dt1).map { value => + s"$value ${op.symbol} $name" + } + case And(expr1, expr2) if useAdvanced => val converted = convert(expr1) ++ convert(expr2) if (converted.isEmpty) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala index 1e949deb2ccd..567510216c50 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala @@ -68,11 +68,21 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest { (a("intcol2", IntegerType) === (Literal("a"))) :: Nil, "intcol1 = 1") + filterTest("int and int column/string literal filter with conversion", + (a("intcol1", IntegerType) === Literal(1)) :: + (a("intcol2", IntegerType) === (Literal("00002"))) :: Nil, + "intcol1 = 1 and intcol2 = 2") + filterTest("int and int column/string literal filter backwards", (Literal(1) === a("intcol1", IntegerType)) :: (Literal("a") === a("intcol2", IntegerType)) :: Nil, "1 = intcol1") + filterTest("int and int column/string literal filter backwards with conversion", + (Literal(1) === a("intcol1", IntegerType)) :: + (Literal("00002") === a("intcol2", IntegerType)) :: Nil, + "1 = intcol1 and 2 = intcol2") + filterTest("int filter with in", (a("intcol", IntegerType) in (Literal(1), Literal(2))) :: Nil, "(intcol = 1 or intcol = 2)")