From b635db529b18404d43df47cf0c789fe22e414165 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 1 Mar 2017 06:59:23 +0000 Subject: [PATCH 1/3] Resolving timezone aware expressions with time zone when resolving inline table. --- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../analysis/ResolveInlineTables.scala | 16 +++++--- .../analysis/ResolveInlineTablesSuite.scala | 40 ++++++++++++------- .../org/apache/spark/sql/SQLQuerySuite.scala | 11 +++++ 4 files changed, 47 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c477cb48d0b0..6d569b612de7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -146,7 +146,7 @@ class Analyzer( GlobalAggregates :: ResolveAggregateFunctions :: TimeWindowing :: - ResolveInlineTables :: + ResolveInlineTables(conf) :: TypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index 7323197b10f6..fa93635034c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis import scala.util.control.NonFatal -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow} +import org.apache.spark.sql.catalyst.expressions.{Cast, TimeZoneAwareExpression} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.{StructField, StructType} @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.{StructField, StructType} /** * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. */ -object ResolveInlineTables extends Rule[LogicalPlan] { +case class ResolveInlineTables(conf: CatalystConf) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case table: UnresolvedInlineTable if table.expressionsResolved => validateInputDimension(table) @@ -95,10 +95,14 @@ object ResolveInlineTables extends Rule[LogicalPlan] { InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) => val targetType = fields(ci).dataType try { - if (e.dataType.sameType(targetType)) { - e.eval() + val castedExpr = if (e.dataType.sameType(targetType)) { + e } else { - Cast(e, targetType).eval() + Cast(e, targetType) + } + castedExpr match { + case te: TimeZoneAwareExpression => te.withTimeZone(conf.sessionLocalTimeZone).eval() + case _ => castedExpr.eval() } } catch { case NonFatal(ex) => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala index 920c6ea50f4b..f45a82686984 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala @@ -20,68 +20,67 @@ package org.apache.spark.sql.catalyst.analysis import org.scalatest.BeforeAndAfter import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Literal, Rand} +import org.apache.spark.sql.catalyst.expressions.{Cast, Literal, Rand} import org.apache.spark.sql.catalyst.expressions.aggregate.Count -import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.types.{LongType, NullType} +import org.apache.spark.sql.types.{LongType, NullType, TimestampType} /** * Unit tests for [[ResolveInlineTables]]. Note that there are also test cases defined in * end-to-end tests (in sql/core module) for verifying the correct error messages are shown * in negative cases. */ -class ResolveInlineTablesSuite extends PlanTest with BeforeAndAfter { +class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter { private def lit(v: Any): Literal = Literal(v) test("validate inputs are foldable") { - ResolveInlineTables.validateInputEvaluable( + ResolveInlineTables(conf).validateInputEvaluable( UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1))))) // nondeterministic (rand) should not work intercept[AnalysisException] { - ResolveInlineTables.validateInputEvaluable( + ResolveInlineTables(conf).validateInputEvaluable( UnresolvedInlineTable(Seq("c1"), Seq(Seq(Rand(1))))) } // aggregate should not work intercept[AnalysisException] { - ResolveInlineTables.validateInputEvaluable( + ResolveInlineTables(conf).validateInputEvaluable( UnresolvedInlineTable(Seq("c1"), Seq(Seq(Count(lit(1)))))) } // unresolved attribute should not work intercept[AnalysisException] { - ResolveInlineTables.validateInputEvaluable( + ResolveInlineTables(conf).validateInputEvaluable( UnresolvedInlineTable(Seq("c1"), Seq(Seq(UnresolvedAttribute("A"))))) } } test("validate input dimensions") { - ResolveInlineTables.validateInputDimension( + ResolveInlineTables(conf).validateInputDimension( UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2))))) // num alias != data dimension intercept[AnalysisException] { - ResolveInlineTables.validateInputDimension( + ResolveInlineTables(conf).validateInputDimension( UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1)), Seq(lit(2))))) } // num alias == data dimension, but data themselves are inconsistent intercept[AnalysisException] { - ResolveInlineTables.validateInputDimension( + ResolveInlineTables(conf).validateInputDimension( UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(21), lit(22))))) } } test("do not fire the rule if not all expressions are resolved") { val table = UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(UnresolvedAttribute("A")))) - assert(ResolveInlineTables(table) == table) + assert(ResolveInlineTables(conf)(table) == table) } test("convert") { val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L)))) - val converted = ResolveInlineTables.convert(table) + val converted = ResolveInlineTables(conf).convert(table) assert(converted.output.map(_.dataType) == Seq(LongType)) assert(converted.data.size == 2) @@ -89,13 +88,24 @@ class ResolveInlineTablesSuite extends PlanTest with BeforeAndAfter { assert(converted.data(1).getLong(0) == 2L) } + test("convert TimeZoneAwareExpression") { + val table = UnresolvedInlineTable(Seq("c1"), + Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType)))) + val converted = ResolveInlineTables(conf).convert(table) + val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType) + .withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long] + assert(converted.output.map(_.dataType) == Seq(TimestampType)) + assert(converted.data.size == 1) + assert(converted.data(0).getLong(0) == correct) + } + test("nullability inference in convert") { val table1 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L)))) - val converted1 = ResolveInlineTables.convert(table1) + val converted1 = ResolveInlineTables(conf).convert(table1) assert(!converted1.schema.fields(0).nullable) val table2 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(Literal(null, NullType)))) - val converted2 = ResolveInlineTables.convert(table2) + val converted2 = ResolveInlineTables(conf).convert(table2) assert(converted2.schema.fields(0).nullable) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 468ea0551298..bd9ac13b9576 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2586,4 +2586,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } assert(!jobStarted.get(), "Command should not trigger a Spark job.") } + + test("string to timestamp in inline table definition") { + sql( + """ + |CREATE TEMPORARY VIEW table_4(timestamp_col_3) + |AS VALUES TIMESTAMP('1991-12-06 00:00:00.0') + """.stripMargin) + checkAnswer( + sql("SELECT timestamp_col_3 FROM table_4"), + Row(java.sql.Timestamp.valueOf("1991-12-06 00:00:00")) :: Nil) + } } From cc6ee95b52cb630e6a21e9367065c206dca06949 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 1 Mar 2017 08:06:04 +0000 Subject: [PATCH 2/3] Move test case into sql file. --- .../sql-tests/inputs/inline-table.sql | 3 ++ .../sql-tests/results/inline-table.sql.out | 36 +++++++++++-------- .../org/apache/spark/sql/SQLQuerySuite.scala | 11 ------ 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index 5107fa4d5553..331f18113ea1 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -29,6 +29,9 @@ select * from values ("one", array(0, 1)), ("two", array(2, 3)) as data(a, b); -- decimal and double coercion select * from values ("one", 2.0), ("two", 3.0D) as data(a, b); +-- string to timestamp +select * from values timestamp('1991-12-06 00:00:00.0') as data(a); + -- error reporting: nondeterministic function rand select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b); diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index de6f01b8de77..d8640f32ab2a 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 16 +-- Number of queries: 17 -- !query 0 @@ -92,54 +92,62 @@ two 3.0 -- !query 10 -select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b) +select * from values timestamp('1991-12-06 00:00:00.0') as data(a) -- !query 10 schema -struct<> +struct -- !query 10 output -org.apache.spark.sql.AnalysisException -cannot evaluate expression rand(5) in inline table definition; line 1 pos 29 +1991-12-06 00:00:00 -- !query 11 -select * from values ("one", 2.0), ("two") as data(a, b) +select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b) -- !query 11 schema struct<> -- !query 11 output org.apache.spark.sql.AnalysisException -expected 2 columns but found 1 columns in row 1; line 1 pos 14 +cannot evaluate expression rand(5) in inline table definition; line 1 pos 29 -- !query 12 -select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) +select * from values ("one", 2.0), ("two") as data(a, b) -- !query 12 schema struct<> -- !query 12 output org.apache.spark.sql.AnalysisException -incompatible types found in column b for inline table; line 1 pos 14 +expected 2 columns but found 1 columns in row 1; line 1 pos 14 -- !query 13 -select * from values ("one"), ("two") as data(a, b) +select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) -- !query 13 schema struct<> -- !query 13 output org.apache.spark.sql.AnalysisException -expected 2 columns but found 1 columns in row 0; line 1 pos 14 +incompatible types found in column b for inline table; line 1 pos 14 -- !query 14 -select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) +select * from values ("one"), ("two") as data(a, b) -- !query 14 schema struct<> -- !query 14 output org.apache.spark.sql.AnalysisException -Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 +expected 2 columns but found 1 columns in row 0; line 1 pos 14 -- !query 15 -select * from values ("one", count(1)), ("two", 2) as data(a, b) +select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) -- !query 15 schema struct<> -- !query 15 output org.apache.spark.sql.AnalysisException +Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 + + +-- !query 16 +select * from values ("one", count(1)), ("two", 2) as data(a, b) +-- !query 16 schema +struct<> +-- !query 16 output +org.apache.spark.sql.AnalysisException cannot evaluate expression count(1) in inline table definition; line 1 pos 29 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index bd9ac13b9576..468ea0551298 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2586,15 +2586,4 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } assert(!jobStarted.get(), "Command should not trigger a Spark job.") } - - test("string to timestamp in inline table definition") { - sql( - """ - |CREATE TEMPORARY VIEW table_4(timestamp_col_3) - |AS VALUES TIMESTAMP('1991-12-06 00:00:00.0') - """.stripMargin) - checkAnswer( - sql("SELECT timestamp_col_3 FROM table_4"), - Row(java.sql.Timestamp.valueOf("1991-12-06 00:00:00")) :: Nil) - } } From a981ed52f9a27d53107dd4a12e50f0b3cfca1eba Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 3 Mar 2017 13:04:21 +0000 Subject: [PATCH 3/3] Traverse input expressions. --- .../analysis/ResolveInlineTables.scala | 8 ++--- .../sql-tests/inputs/inline-table.sql | 6 ++-- .../sql-tests/results/inline-table.sql.out | 34 +++++++++---------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index fa93635034c1..d5b3ea8c37c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -100,10 +100,10 @@ case class ResolveInlineTables(conf: CatalystConf) extends Rule[LogicalPlan] { } else { Cast(e, targetType) } - castedExpr match { - case te: TimeZoneAwareExpression => te.withTimeZone(conf.sessionLocalTimeZone).eval() - case _ => castedExpr.eval() - } + castedExpr.transform { + case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty => + e.withTimeZone(conf.sessionLocalTimeZone) + }.eval() } catch { case NonFatal(ex) => table.failAnalysis(s"failed to evaluate expression ${e.sql}: ${ex.getMessage}") diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index 331f18113ea1..b3ec956cd178 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -29,9 +29,6 @@ select * from values ("one", array(0, 1)), ("two", array(2, 3)) as data(a, b); -- decimal and double coercion select * from values ("one", 2.0), ("two", 3.0D) as data(a, b); --- string to timestamp -select * from values timestamp('1991-12-06 00:00:00.0') as data(a); - -- error reporting: nondeterministic function rand select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b); @@ -49,3 +46,6 @@ select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) -- error reporting: aggregate expression select * from values ("one", count(1)), ("two", 2) as data(a, b); + +-- string to timestamp +select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) as data(a, b); diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index d8640f32ab2a..4e80f0bda551 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -92,62 +92,62 @@ two 3.0 -- !query 10 -select * from values timestamp('1991-12-06 00:00:00.0') as data(a) +select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b) -- !query 10 schema -struct +struct<> -- !query 10 output -1991-12-06 00:00:00 +org.apache.spark.sql.AnalysisException +cannot evaluate expression rand(5) in inline table definition; line 1 pos 29 -- !query 11 -select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b) +select * from values ("one", 2.0), ("two") as data(a, b) -- !query 11 schema struct<> -- !query 11 output org.apache.spark.sql.AnalysisException -cannot evaluate expression rand(5) in inline table definition; line 1 pos 29 +expected 2 columns but found 1 columns in row 1; line 1 pos 14 -- !query 12 -select * from values ("one", 2.0), ("two") as data(a, b) +select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) -- !query 12 schema struct<> -- !query 12 output org.apache.spark.sql.AnalysisException -expected 2 columns but found 1 columns in row 1; line 1 pos 14 +incompatible types found in column b for inline table; line 1 pos 14 -- !query 13 -select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) +select * from values ("one"), ("two") as data(a, b) -- !query 13 schema struct<> -- !query 13 output org.apache.spark.sql.AnalysisException -incompatible types found in column b for inline table; line 1 pos 14 +expected 2 columns but found 1 columns in row 0; line 1 pos 14 -- !query 14 -select * from values ("one"), ("two") as data(a, b) +select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) -- !query 14 schema struct<> -- !query 14 output org.apache.spark.sql.AnalysisException -expected 2 columns but found 1 columns in row 0; line 1 pos 14 +Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 -- !query 15 -select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) +select * from values ("one", count(1)), ("two", 2) as data(a, b) -- !query 15 schema struct<> -- !query 15 output org.apache.spark.sql.AnalysisException -Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 +cannot evaluate expression count(1) in inline table definition; line 1 pos 29 -- !query 16 -select * from values ("one", count(1)), ("two", 2) as data(a, b) +select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) as data(a, b) -- !query 16 schema -struct<> +struct> -- !query 16 output -org.apache.spark.sql.AnalysisException -cannot evaluate expression count(1) in inline table definition; line 1 pos 29 +1991-12-06 00:00:00 [1991-12-06 01:00:00.0,1991-12-06 12:00:00.0]