From 294fcb772594fb563d9a32e19b0073219120b2b1 Mon Sep 17 00:00:00 2001 From: scwf Date: Tue, 21 Oct 2014 14:01:18 -0700 Subject: [PATCH 1/4] attributes names in table scan should convert lowercase in neededColumnsIDs --- .../org/apache/spark/sql/hive/execution/HiveTableScan.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 5b83b77d80a2..b51359bbb651 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -81,7 +81,8 @@ case class HiveTableScan( // Specifies needed column IDs for those non-partitioning columns. val neededColumnIDs = attributes.map(a => - relation.attributes.indexWhere(_.name == a.name): Integer).filter(index => index >= 0) + relation.attributes.indexWhere( + _.name.toLowerCase == a.name): Integer).filter(index => index >= 0) ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) From 3ff3a8094f0d5c6aa50a53ac6b08345c1c7a3f69 Mon Sep 17 00:00:00 2001 From: wangfei Date: Wed, 22 Oct 2014 01:15:52 -0700 Subject: [PATCH 2/4] more safer change --- .../org/apache/spark/sql/hive/execution/HiveTableScan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index b51359bbb651..8411e4d1ad58 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -82,7 +82,7 @@ case class HiveTableScan( val neededColumnIDs = attributes.map(a => relation.attributes.indexWhere( - _.name.toLowerCase == a.name): Integer).filter(index => index >= 0) + _.name.toLowerCase == a.name.toLowerCase): Integer).filter(index => index >= 0) ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) From dc74a24ab0384dc253d6eca551ed804cfbea22ae Mon Sep 17 00:00:00 2001 From: wangfei Date: Sat, 25 Oct 2014 00:45:09 -0700 Subject: [PATCH 3/4] use lowerName and add a test case for this issue --- .../sql/catalyst/expressions/namedExpressions.scala | 1 + .../scala/org/apache/spark/sql/hive/HiveStrategies.scala | 2 +- .../scala/org/apache/spark/sql/hive/TableReader.scala | 2 +- .../apache/spark/sql/hive/execution/HiveTableScan.scala | 2 +- .../spark/sql/hive/execution/HiveTableScanSuite.scala | 9 +++++++++ 5 files changed, 13 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index d023db44d854..bbad2a5f3f80 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -38,6 +38,7 @@ abstract class NamedExpression extends Expression { self: Product => def name: String + def lowerName = name.toLowerCase() def exprId: ExprId def qualifiers: Seq[String] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 5c66322f1ed9..4aa5ce9fff64 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -68,7 +68,7 @@ private[hive] trait HiveStrategies { def fakeOutput(newOutput: Seq[Attribute]) = OutputFaker( originalPlan.output.map(a => - newOutput.find(a.name.toLowerCase == _.name.toLowerCase) + newOutput.find(a.lowerName == _.lowerName) .getOrElse( sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))), originalPlan) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 0de29d5cffd0..91db17f1eb92 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -273,7 +273,7 @@ private[hive] object HadoopTableReader extends HiveInspectors { val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector] val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) => - soi.getStructFieldRef(attr.name) -> ordinal + soi.getStructFieldRef(attr.lowerName) -> ordinal }.unzip // Builds specific unwrappers ahead of time according to object inspector types to avoid pattern diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 8411e4d1ad58..5dc0fb1a9cb4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -82,7 +82,7 @@ case class HiveTableScan( val neededColumnIDs = attributes.map(a => relation.attributes.indexWhere( - _.name.toLowerCase == a.name.toLowerCase): Integer).filter(index => index >= 0) + _.lowerName == a.lowerName): Integer).filter(index => index >= 0) ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index c5736723b47c..2f3db9588209 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.hive.test.TestHive + class HiveTableScanSuite extends HiveComparisonTest { createQueryTest("partition_based_table_scan_with_different_serde", @@ -38,4 +40,11 @@ class HiveTableScanSuite extends HiveComparisonTest { | |SELECT * from part_scan_test; """.stripMargin) + + test("Spark-4041: lowercase issue") { + TestHive.sql("CREATE TABLE tb (KEY INT, VALUE STRING) STORED AS ORC") + TestHive.sql("insert into table tb select key, value from src") + TestHive.sql("select KEY from tb where VALUE='just_for_test' limit 5").collect() + TestHive.sql("drop table tb") + } } From 617404683c50f631cbe0150189bc7c4e535cc33c Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 26 Oct 2014 19:44:09 -0700 Subject: [PATCH 4/4] use AttributeMap for this issue --- .../sql/catalyst/expressions/namedExpressions.scala | 1 - .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 6 ++++++ .../org/apache/spark/sql/hive/HiveStrategies.scala | 2 +- .../scala/org/apache/spark/sql/hive/TableReader.scala | 2 +- .../spark/sql/hive/execution/HiveTableScan.scala | 10 +++++----- 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index bbad2a5f3f80..d023db44d854 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -38,7 +38,6 @@ abstract class NamedExpression extends Expression { self: Product => def name: String - def lowerName = name.toLowerCase() def exprId: ExprId def qualifiers: Seq[String] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 75a19656af11..9e2ba0f12cc4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -308,4 +308,10 @@ private[hive] case class MetastoreRelation val attributes = hiveQlTable.getCols.map(_.toAttribute) val output = attributes ++ partitionKeys + + /** An attribute map that can be used to lookup original attributes based on expression id. */ + val attributeMap = AttributeMap(output.map(o => (o,o))) + + /** An attribute map for determining the ordinal for non-partition columns. */ + val columnOrdinals = AttributeMap(attributes.zipWithIndex) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 4aa5ce9fff64..5c66322f1ed9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -68,7 +68,7 @@ private[hive] trait HiveStrategies { def fakeOutput(newOutput: Seq[Attribute]) = OutputFaker( originalPlan.output.map(a => - newOutput.find(a.lowerName == _.lowerName) + newOutput.find(a.name.toLowerCase == _.name.toLowerCase) .getOrElse( sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))), originalPlan) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 91db17f1eb92..0de29d5cffd0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -273,7 +273,7 @@ private[hive] object HadoopTableReader extends HiveInspectors { val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector] val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) => - soi.getStructFieldRef(attr.lowerName) -> ordinal + soi.getStructFieldRef(attr.name) -> ordinal }.unzip // Builds specific unwrappers ahead of time according to object inspector types to avoid pattern diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 5dc0fb1a9cb4..11373d730162 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.hive._ */ @DeveloperApi case class HiveTableScan( - attributes: Seq[Attribute], + requestedAttributes: Seq[Attribute], relation: MetastoreRelation, partitionPruningPred: Option[Expression])( @transient val context: HiveContext) @@ -54,6 +54,9 @@ case class HiveTableScan( require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, "Partition pruning predicates only supported for partitioned tables.") + // Retrieve the original attributes based on expression ID so that capitalization matches. + val attributes = requestedAttributes.map(relation.attributeMap) + // Bind all partition key attribute references in the partition pruning predicate for later // evaluation. private[this] val boundPruningPred = partitionPruningPred.map { pred => @@ -79,10 +82,7 @@ case class HiveTableScan( private def addColumnMetadataToConf(hiveConf: HiveConf) { // Specifies needed column IDs for those non-partitioning columns. - val neededColumnIDs = - attributes.map(a => - relation.attributes.indexWhere( - _.lowerName == a.lowerName): Integer).filter(index => index >= 0) + val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer) ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name))