From 2537c1ea5028541a68ae63e8f5eea8eb8f62dadf Mon Sep 17 00:00:00 2001 From: donnyzone Date: Wed, 16 Aug 2017 15:53:19 +0800 Subject: [PATCH 1/7] spark-21739 --- .../apache/spark/sql/hive/TableReader.scala | 4 +++- .../hive/execution/HiveTableScanExec.scala | 3 ++- .../spark/sql/hive/QueryPartitionSuite.scala | 21 +++++++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index f238b9a4f7f6..4f9ee63d44ab 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -227,7 +228,8 @@ class HadoopTableReader( def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): Unit = { partitionKeyAttrs.foreach { case (attr, ordinal) => val partOrdinal = partitionKeys.indexOf(attr) - row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null) + row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType, + Option(SQLConf.get.sessionLocalTimeZone)).eval(null) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 896f24f2e223..99ab2729b1ed 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType} import org.apache.spark.util.Utils @@ -104,7 +105,7 @@ case class HiveTableScanExec( hadoopConf) private def castFromString(value: String, dataType: DataType) = { - Cast(Literal(value), dataType).eval(null) + Cast(Literal(value), dataType, Option(SQLConf.get.sessionLocalTimeZone)).eval(null) } private def addColumnMetadataToConf(hiveConf: Configuration): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 43b6bf5feeb6..7e80b83d2b4c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -68,4 +68,25 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl sql("DROP TABLE IF EXISTS createAndInsertTest") } } + + test("SPARK-21739: Cast expression should initialize timezoneId " + + "when it is called statically to convert something into TimestampType") { + // create table for test + sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED by (ts timestamp)") + sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " + + "partition (ts = '2010-01-01 00:00:00.000') VALUES (1)") + sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " + + "partition (ts = '2010-01-02 00:00:00.000') VALUES (2)") + + // test for Cast expression in TableReader + checkAnswer(sql("select value from table_with_timestamp_partition"), + Seq(Row(1), Row(2))) + + // test for Cast expression in HiveTableScanExec + checkAnswer(sql("select value from table_with_timestamp_partition " + + "where ts='2010-01-02 00:00:00.000'"), Row(1)) + + sql("DROP TABLE IF EXISTS table_with_timestamp_partition") + } + } From 86331e37550f4204c8c4ee2c409fbd6000654f43 Mon Sep 17 00:00:00 2001 From: donnyzone Date: Wed, 16 Aug 2017 15:55:17 +0800 Subject: [PATCH 2/7] fix code style --- .../scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 7e80b83d2b4c..8dc458a25140 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -84,7 +84,7 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl // test for Cast expression in HiveTableScanExec checkAnswer(sql("select value from table_with_timestamp_partition " + - "where ts='2010-01-02 00:00:00.000'"), Row(1)) + "where ts = '2010-01-02 00:00:00.000'"), Row(1)) sql("DROP TABLE IF EXISTS table_with_timestamp_partition") } From 492b756fde5008854d1351ed423c3897c683c662 Mon Sep 17 00:00:00 2001 From: donnyzone Date: Wed, 16 Aug 2017 15:56:34 +0800 Subject: [PATCH 3/7] correct answer --- .../scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 8dc458a25140..8de6633490ae 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -84,7 +84,7 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl // test for Cast expression in HiveTableScanExec checkAnswer(sql("select value from table_with_timestamp_partition " + - "where ts = '2010-01-02 00:00:00.000'"), Row(1)) + "where ts = '2010-01-02 00:00:00.000'"), Row(2)) sql("DROP TABLE IF EXISTS table_with_timestamp_partition") } From a264e3aa166d2e83832a82489669893f41ff9749 Mon Sep 17 00:00:00 2001 From: donnyzone Date: Thu, 17 Aug 2017 11:06:02 +0800 Subject: [PATCH 4/7] Obtain SQLConf from sparksession --- .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 3 +-- .../apache/spark/sql/hive/execution/HiveTableScanExec.scala | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 4f9ee63d44ab..79146db056b6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -41,7 +41,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -229,7 +228,7 @@ class HadoopTableReader( partitionKeyAttrs.foreach { case (attr, ordinal) => val partOrdinal = partitionKeys.indexOf(attr) row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType, - Option(SQLConf.get.sessionLocalTimeZone)).eval(null) + Option(sparkSession.sessionState.conf.sessionLocalTimeZone)).eval(null) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 99ab2729b1ed..5286a2e46ddc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClientImpl -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType} import org.apache.spark.util.Utils @@ -105,7 +104,8 @@ case class HiveTableScanExec( hadoopConf) private def castFromString(value: String, dataType: DataType) = { - Cast(Literal(value), dataType, Option(SQLConf.get.sessionLocalTimeZone)).eval(null) + Cast(Literal(value), dataType, + Option(sparkSession.sessionState.conf.sessionLocalTimeZone)).eval(null) } private def addColumnMetadataToConf(hiveConf: Configuration): Unit = { From 5a04ddc77195ab287366754450665e0dbc240e0b Mon Sep 17 00:00:00 2001 From: donnyzone Date: Thu, 17 Aug 2017 15:50:26 +0800 Subject: [PATCH 5/7] fix test case queries --- .../spark/sql/hive/QueryPartitionSuite.scala | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 8de6633490ae..3af472553dd7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.io.File +import java.sql.Timestamp import com.google.common.io.Files import org.apache.hadoop.fs.FileSystem @@ -71,22 +72,27 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl test("SPARK-21739: Cast expression should initialize timezoneId " + "when it is called statically to convert something into TimestampType") { - // create table for test - sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED by (ts timestamp)") - sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " + - "partition (ts = '2010-01-01 00:00:00.000') VALUES (1)") - sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " + - "partition (ts = '2010-01-02 00:00:00.000') VALUES (2)") - - // test for Cast expression in TableReader - checkAnswer(sql("select value from table_with_timestamp_partition"), - Seq(Row(1), Row(2))) - - // test for Cast expression in HiveTableScanExec - checkAnswer(sql("select value from table_with_timestamp_partition " + - "where ts = '2010-01-02 00:00:00.000'"), Row(2)) - - sql("DROP TABLE IF EXISTS table_with_timestamp_partition") + withTable("table_with_timestamp_partition") { + // create table for test + sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED by (ts timestamp)") + sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " + + "partition (ts = '2010-01-01 00:00:00.000') VALUES (1)") + sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " + + "partition (ts = '2010-01-02 00:00:00.000') VALUES (2)") + sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " + + "partition (ts = '2010-01-03 00:00:00.000') VALUES (3)") + + // test for Cast expression in TableReader + checkAnswer(sql("select * from table_with_timestamp_partition"), + Seq( + Row(1, Timestamp.valueOf("2010-01-01 00:00:00.000")), + Row(2, Timestamp.valueOf("2010-01-02 00:00:00.000")), + Row(3, Timestamp.valueOf("2010-01-03 00:00:00.000")))) + + // test for Cast expression in HiveTableScanExec + checkAnswer(sql("select value from table_with_timestamp_partition " + + "where ts = '2010-01-02 00:00:00.000'"), Row(2)) + } } } From 93581bbfa2e9abd66fdce58cb037aa02296a6afa Mon Sep 17 00:00:00 2001 From: donnyzone Date: Thu, 17 Aug 2017 20:13:16 +0800 Subject: [PATCH 6/7] remove a blank line --- .../scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 3af472553dd7..0512df6497d0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -94,5 +94,4 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl "where ts = '2010-01-02 00:00:00.000'"), Row(2)) } } - } From 5b051dde07c046c94d4845c002e3467ea9cce166 Mon Sep 17 00:00:00 2001 From: donnyzone Date: Fri, 18 Aug 2017 10:20:13 +0800 Subject: [PATCH 7/7] refractor implementation --- .../apache/spark/sql/hive/TableReader.scala | 9 +++++--- .../hive/execution/HiveTableScanExec.scala | 9 +++++--- .../spark/sql/hive/QueryPartitionSuite.scala | 23 ++++++------------- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 79146db056b6..cc8907a0bbc9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -39,8 +39,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -65,7 +67,7 @@ class HadoopTableReader( @transient private val tableDesc: TableDesc, @transient private val sparkSession: SparkSession, hadoopConf: Configuration) - extends TableReader with Logging { + extends TableReader with CastSupport with Logging { // Hadoop honors "mapreduce.job.maps" as hint, // but will ignore when mapreduce.jobtracker.address is "local". @@ -86,6 +88,8 @@ class HadoopTableReader( private val _broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + override def conf: SQLConf = sparkSession.sessionState.conf + override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = makeRDDForTable( hiveTable, @@ -227,8 +231,7 @@ class HadoopTableReader( def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): Unit = { partitionKeyAttrs.foreach { case (attr, ordinal) => val partOrdinal = partitionKeys.indexOf(attr) - row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType, - Option(sparkSession.sessionState.conf.sessionLocalTimeZone)).eval(null) + row(ordinal) = cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 5286a2e46ddc..48d0b4a63e54 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan @@ -37,6 +38,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType} import org.apache.spark.util.Utils @@ -53,11 +55,13 @@ case class HiveTableScanExec( relation: HiveTableRelation, partitionPruningPred: Seq[Expression])( @transient private val sparkSession: SparkSession) - extends LeafExecNode { + extends LeafExecNode with CastSupport { require(partitionPruningPred.isEmpty || relation.isPartitioned, "Partition pruning predicates only supported for partitioned tables.") + override def conf: SQLConf = sparkSession.sessionState.conf + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -104,8 +108,7 @@ case class HiveTableScanExec( hadoopConf) private def castFromString(value: String, dataType: DataType) = { - Cast(Literal(value), dataType, - Option(sparkSession.sessionState.conf.sessionLocalTimeZone)).eval(null) + cast(Literal(value), dataType).eval(null) } private def addColumnMetadataToConf(hiveConf: Configuration): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 0512df6497d0..b2dc401ce1ef 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -70,28 +70,19 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl } } - test("SPARK-21739: Cast expression should initialize timezoneId " + - "when it is called statically to convert something into TimestampType") { + test("SPARK-21739: Cast expression should initialize timezoneId") { withTable("table_with_timestamp_partition") { - // create table for test - sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED by (ts timestamp)") + sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED BY (ts TIMESTAMP)") sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " + - "partition (ts = '2010-01-01 00:00:00.000') VALUES (1)") - sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " + - "partition (ts = '2010-01-02 00:00:00.000') VALUES (2)") - sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " + - "partition (ts = '2010-01-03 00:00:00.000') VALUES (3)") + "PARTITION (ts = '2010-01-01 00:00:00.000') VALUES (1)") // test for Cast expression in TableReader - checkAnswer(sql("select * from table_with_timestamp_partition"), - Seq( - Row(1, Timestamp.valueOf("2010-01-01 00:00:00.000")), - Row(2, Timestamp.valueOf("2010-01-02 00:00:00.000")), - Row(3, Timestamp.valueOf("2010-01-03 00:00:00.000")))) + checkAnswer(sql("SELECT * FROM table_with_timestamp_partition"), + Seq(Row(1, Timestamp.valueOf("2010-01-01 00:00:00.000")))) // test for Cast expression in HiveTableScanExec - checkAnswer(sql("select value from table_with_timestamp_partition " + - "where ts = '2010-01-02 00:00:00.000'"), Row(2)) + checkAnswer(sql("SELECT value FROM table_with_timestamp_partition " + + "WHERE ts = '2010-01-01 00:00:00.000'"), Row(1)) } } }