From 1255968908454bfb01b247567f796e10ca6e6d30 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 2 Jul 2016 22:46:44 -0700 Subject: [PATCH 01/15] fix --- .../sql/catalyst/analysis/CheckAnalysis.scala | 16 +++++++ .../plans/logical/basicLogicalOperators.scala | 14 +++++- .../analysis/AnalysisErrorSuite.scala | 6 +++ .../sql/catalyst/analysis/AnalysisSuite.scala | 1 + .../org/apache/spark/sql/SQLQuerySuite.scala | 13 ++++++ .../apache/spark/sql/StatisticsSuite.scala | 44 +++++++++++++++++++ 6 files changed, 92 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 7b30fcc6c531..0070887dd014 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -251,6 +251,22 @@ trait CheckAnalysis extends PredicateHelper { s"but one table has '${firstError.output.length}' columns and another table has " + s"'${s.children.head.output.length}' columns") + case l: GlobalLimit => + val numRows = l.limitExpr.eval().asInstanceOf[Int] + if (numRows < 0) { + failAnalysis( + s"number_rows in limit clause must be equal to or greater than 0. " + + s"number_rows:$numRows") + } + + case l: LocalLimit => + val numRows = l.limitExpr.eval().asInstanceOf[Int] + if (numRows < 0) { + failAnalysis( + s"number_rows in limit clause must be equal to or greater than 0. " + + s"number_rows:$numRows") + } + case p if p.expressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) => p match { case _: Filter | _: Aggregate | _: Project => // Ok diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 79f9a210a30b..94c9b76e6cd6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -660,7 +660,12 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN } override lazy val statistics: Statistics = { val limit = limitExpr.eval().asInstanceOf[Int] - val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum + var sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum + if (sizeInBytes == 0) { + // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero + // (product of children). + sizeInBytes = 1 + } child.statistics.copy(sizeInBytes = sizeInBytes) } } @@ -675,7 +680,12 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo } override lazy val statistics: Statistics = { val limit = limitExpr.eval().asInstanceOf[Int] - val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum + var sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum + if (sizeInBytes == 0) { + // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero + // (product of children). + sizeInBytes = 1 + } child.statistics.copy(sizeInBytes = sizeInBytes) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index a9cde1e19efc..9ad60691c65a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -352,6 +352,12 @@ class AnalysisErrorSuite extends AnalysisTest { "Generators are not supported outside the SELECT clause, but got: Sort" :: Nil ) + errorTest( + "num_rows in limit clause must be equal to or greater than 0", + listRelation.limit(-1), + "number_rows in limit clause must be equal to or greater than 0. number_rows:-1" :: Nil + ) + errorTest( "more than one generators in SELECT", listRelation.select(Explode('list), Explode('list)), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 102c78bd7211..cfba896055b1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 084ba9b78ec5..52e65f47ca8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -592,6 +592,19 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("negative in LIMIT or TABLESAMPLE") { + val expected = "number_rows in limit clause must be equal to or greater than 0. number_rows:-1" + var e = intercept[AnalysisException] { + sql("SELECT * FROM testData TABLESAMPLE (-1 rows)") + }.getMessage + assert(e.contains(expected)) + + e = intercept[AnalysisException] { + sql("SELECT * FROM testData LIMIT -1") + }.getMessage + assert(e.contains(expected)) + } + test("aggregates with nulls") { checkAnswer( sql("SELECT SKEWNESS(a), KURTOSIS(a), MIN(a), MAX(a)," + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala index 4de3cf605caa..9a3397fcd288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Join, LocalLimit} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ class StatisticsSuite extends QueryTest with SharedSQLContext { + import testImplicits._ test("SPARK-15392: DataFrame created from RDD should not be broadcasted") { val rdd = sparkContext.range(1, 100).map(i => Row(i, i)) @@ -31,4 +33,46 @@ class StatisticsSuite extends QueryTest with SharedSQLContext { spark.sessionState.conf.autoBroadcastJoinThreshold) } + test("estimates the size of limit") { + withTempTable("test") { + Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v") + .createOrReplaceTempView("test") + Seq((0, 1), (1, 24), (2, 48)).foreach { case (limit, expected) => + val df = sql(s"""SELECT * FROM test limit $limit""") + + val sizesGlobalLimit = df.queryExecution.analyzed.collect { case g: GlobalLimit => + g.statistics.sizeInBytes + } + assert(sizesGlobalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizesGlobalLimit(0).equals(BigInt(expected)), + s"expected exact size 24 for table 'test', got: ${sizesGlobalLimit(0)}") + + val sizesLocalLimit = df.queryExecution.analyzed.collect { case l: LocalLimit => + l.statistics.sizeInBytes + } + assert(sizesLocalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizesLocalLimit(0).equals(BigInt(expected)), + s"expected exact size 24 for table 'test', got: ${sizesLocalLimit(0)}") + } + } + } + + test("estimates the size of a limit 0 on outer join") { + withTempTable("test") { + Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v") + .createOrReplaceTempView("test") + val df1 = spark.table("test") + val df2 = spark.table("test").limit(0) + val df = df1.join(df2, Seq("k"), "left") + + val sizes = df.queryExecution.analyzed.collect { case g: Join => + g.statistics.sizeInBytes + } + + assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizes(0).equals(BigInt(96)), + s"expected exact size 96 for table 'test', got: ${sizes(0)}") + } + } + } From bdf4e56f3478bd99d1e92d338a984dba869363dc Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 2 Jul 2016 23:06:24 -0700 Subject: [PATCH 02/15] reverted --- .../org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index cfba896055b1..102c78bd7211 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ From 3c402d304883fa83712f07cd09a3bbe765b1f071 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 4 Jul 2016 15:27:50 -0700 Subject: [PATCH 03/15] address comments --- .../sql/catalyst/analysis/CheckAnalysis.scala | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 0070887dd014..970d4053790f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -46,6 +46,15 @@ trait CheckAnalysis extends PredicateHelper { }).length > 1 } + private def checkLimitClause(limitExpr: Expression): Unit = { + val numRows = limitExpr.eval().asInstanceOf[Int] + if (numRows < 0) { + failAnalysis( + s"number_rows in limit clause must be equal to or greater than 0. " + + s"number_rows:$numRows") + } + } + def checkAnalysis(plan: LogicalPlan): Unit = { // We transform up and order the rules so as to catch the first possible failure instead // of the result of cascading resolution failures. @@ -251,21 +260,9 @@ trait CheckAnalysis extends PredicateHelper { s"but one table has '${firstError.output.length}' columns and another table has " + s"'${s.children.head.output.length}' columns") - case l: GlobalLimit => - val numRows = l.limitExpr.eval().asInstanceOf[Int] - if (numRows < 0) { - failAnalysis( - s"number_rows in limit clause must be equal to or greater than 0. " + - s"number_rows:$numRows") - } + case GlobalLimit(limitExpr, _) => checkLimitClause(limitExpr) - case l: LocalLimit => - val numRows = l.limitExpr.eval().asInstanceOf[Int] - if (numRows < 0) { - failAnalysis( - s"number_rows in limit clause must be equal to or greater than 0. " + - s"number_rows:$numRows") - } + case LocalLimit(limitExpr, _) => checkLimitClause(limitExpr) case p if p.expressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) => p match { From 5b36fbcd1b0417c0e5796299ffb6b538d322fed6 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 4 Jul 2016 23:26:29 -0700 Subject: [PATCH 04/15] Detected foldable expression in limit --- .../sql/catalyst/analysis/CheckAnalysis.scala | 5 +++ .../org/apache/spark/sql/SQLQuerySuite.scala | 38 +++++++++++-------- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 970d4053790f..fd9104be952b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -47,6 +47,11 @@ trait CheckAnalysis extends PredicateHelper { } private def checkLimitClause(limitExpr: Expression): Unit = { + if (!limitExpr.foldable) { + failAnalysis( + "The argument to the LIMIT clause must evaluate to a constant value. " + + s"Limit:${limitExpr.sql}") + } val numRows = limitExpr.eval().asInstanceOf[Int] if (numRows < 0) { failAnalysis( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 52e65f47ca8f..ec961b7ef780 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -592,19 +592,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test("negative in LIMIT or TABLESAMPLE") { - val expected = "number_rows in limit clause must be equal to or greater than 0. number_rows:-1" - var e = intercept[AnalysisException] { - sql("SELECT * FROM testData TABLESAMPLE (-1 rows)") - }.getMessage - assert(e.contains(expected)) - - e = intercept[AnalysisException] { - sql("SELECT * FROM testData LIMIT -1") - }.getMessage - assert(e.contains(expected)) - } - test("aggregates with nulls") { checkAnswer( sql("SELECT SKEWNESS(a), KURTOSIS(a), MIN(a), MAX(a)," + @@ -673,11 +660,11 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("limit") { checkAnswer( - sql("SELECT * FROM testData LIMIT 10"), + sql("SELECT * FROM testData LIMIT 9 + 1"), testData.take(10).toSeq) checkAnswer( - sql("SELECT * FROM arrayData LIMIT 1"), + sql("SELECT * FROM arrayData LIMIT CAST(1 AS Integer)"), arrayData.collect().take(1).map(Row.fromTuple).toSeq) checkAnswer( @@ -685,6 +672,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { mapData.collect().take(1).map(Row.fromTuple).toSeq) } + test("non-foldable expressions in LIMIT") { + val e = intercept[AnalysisException] { + sql("SELECT * FROM testData LIMIT key > 3") + }.getMessage + assert(e.contains("The argument to the LIMIT clause must evaluate to a constant value. " + + "Limit:(testdata.`key` > 3)")) + } + + test("negative in LIMIT or TABLESAMPLE") { + val expected = "number_rows in limit clause must be equal to or greater than 0. number_rows:-1" + var e = intercept[AnalysisException] { + sql("SELECT * FROM testData TABLESAMPLE (-1 rows)") + }.getMessage + assert(e.contains(expected)) + + e = intercept[AnalysisException] { + sql("SELECT * FROM testData LIMIT -1") + }.getMessage + assert(e.contains(expected)) + } + test("CTE feature") { checkAnswer( sql("with q1 as (select * from testData limit 10) select * from q1"), From a2a828ff3b2a944ceec48ee99163392267311646 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 5 Jul 2016 09:57:38 -0700 Subject: [PATCH 05/15] address comments. --- .../scala/org/apache/spark/sql/StatisticsSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala index 9a3397fcd288..8b331a66f8d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala @@ -44,15 +44,15 @@ class StatisticsSuite extends QueryTest with SharedSQLContext { g.statistics.sizeInBytes } assert(sizesGlobalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}") - assert(sizesGlobalLimit(0).equals(BigInt(expected)), - s"expected exact size 24 for table 'test', got: ${sizesGlobalLimit(0)}") + assert(sizesGlobalLimit.head === BigInt(expected), + s"expected exact size 24 for table 'test', got: ${sizesGlobalLimit.head}") val sizesLocalLimit = df.queryExecution.analyzed.collect { case l: LocalLimit => l.statistics.sizeInBytes } assert(sizesLocalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}") - assert(sizesLocalLimit(0).equals(BigInt(expected)), - s"expected exact size 24 for table 'test', got: ${sizesLocalLimit(0)}") + assert(sizesLocalLimit.head === BigInt(expected), + s"expected exact size 24 for table 'test', got: ${sizesLocalLimit.head}") } } } @@ -70,8 +70,8 @@ class StatisticsSuite extends QueryTest with SharedSQLContext { } assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}") - assert(sizes(0).equals(BigInt(96)), - s"expected exact size 96 for table 'test', got: ${sizes(0)}") + assert(sizes.head === BigInt(96), + s"expected exact size 96 for table 'test', got: ${sizes.head}") } } From f600ba4f4455890b17080bc1cd3f647ec9023799 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 7 Jul 2016 23:03:15 -0700 Subject: [PATCH 06/15] address comments. --- .../plans/logical/basicLogicalOperators.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 94c9b76e6cd6..c0e400f61777 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -660,11 +660,12 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN } override lazy val statistics: Statistics = { val limit = limitExpr.eval().asInstanceOf[Int] - var sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum - if (sizeInBytes == 0) { + val sizeInBytes = if (limit == 0) { // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero // (product of children). - sizeInBytes = 1 + 1 + } else { + (limit: Long) * output.map(a => a.dataType.defaultSize).sum } child.statistics.copy(sizeInBytes = sizeInBytes) } @@ -680,11 +681,12 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo } override lazy val statistics: Statistics = { val limit = limitExpr.eval().asInstanceOf[Int] - var sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum - if (sizeInBytes == 0) { + val sizeInBytes = if (limit == 0) { // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero // (product of children). - sizeInBytes = 1 + 1 + } else { + (limit: Long) * output.map(a => a.dataType.defaultSize).sum } child.statistics.copy(sizeInBytes = sizeInBytes) } From 8fd72f650af0653c96cc0702a8a96743ed73e08f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 8 Jul 2016 00:01:55 -0700 Subject: [PATCH 07/15] address comments. --- .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 11 ++++++----- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 7 ++++++- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index fd9104be952b..f9d41d420b82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -52,11 +52,12 @@ trait CheckAnalysis extends PredicateHelper { "The argument to the LIMIT clause must evaluate to a constant value. " + s"Limit:${limitExpr.sql}") } - val numRows = limitExpr.eval().asInstanceOf[Int] - if (numRows < 0) { - failAnalysis( - s"number_rows in limit clause must be equal to or greater than 0. " + - s"number_rows:$numRows") + limitExpr.eval() match { + case o: Int if o >= 0 => // OK + case o: Int => failAnalysis( + s"number_rows in limit clause must be equal to or greater than 0. number_rows:$o") + case o => failAnalysis( + s"number_rows in limit clause cannot be cast to integer:$o") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index ec961b7ef780..0a44c0ac1b81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -673,11 +673,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("non-foldable expressions in LIMIT") { - val e = intercept[AnalysisException] { + var e = intercept[AnalysisException] { sql("SELECT * FROM testData LIMIT key > 3") }.getMessage assert(e.contains("The argument to the LIMIT clause must evaluate to a constant value. " + "Limit:(testdata.`key` > 3)")) + + e = intercept[AnalysisException] { + sql("SELECT * FROM testData LIMIT true") + }.getMessage + assert(e.contains("number_rows in limit clause cannot be cast to integer:true")) } test("negative in LIMIT or TABLESAMPLE") { From 1abdbb9e6b03a01184825c9785a5b49c3724e315 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 8 Jul 2016 00:06:28 -0700 Subject: [PATCH 08/15] address comments. --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 0a44c0ac1b81..cff4c5411422 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -673,13 +673,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("non-foldable expressions in LIMIT") { - var e = intercept[AnalysisException] { + val e = intercept[AnalysisException] { sql("SELECT * FROM testData LIMIT key > 3") }.getMessage assert(e.contains("The argument to the LIMIT clause must evaluate to a constant value. " + "Limit:(testdata.`key` > 3)")) + } - e = intercept[AnalysisException] { + test("Limit: unable to evaluate and cast expressions in limit clauses to Int") { + val e = intercept[AnalysisException] { sql("SELECT * FROM testData LIMIT true") }.getMessage assert(e.contains("number_rows in limit clause cannot be cast to integer:true")) From 3036847f8436ab72b05dafcf6566a96dc766dca1 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 8 Jul 2016 14:22:27 -0700 Subject: [PATCH 09/15] address comments. --- .../sql/catalyst/analysis/Analyzer.scala | 17 +++++++++++ .../sql/catalyst/analysis/CheckAnalysis.scala | 13 ++++----- .../plans/logical/basicLogicalOperators.scala | 4 +-- .../org/apache/spark/sql/SQLQuerySuite.scala | 29 +++++++++++++++++-- 4 files changed, 52 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d1d2c59caed9..aaf509a5a6df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -109,6 +109,8 @@ class Analyzer( TimeWindowing :: TypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), + Batch("LIMIT", Once, + ResolveLimits), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, @@ -2044,6 +2046,21 @@ object EliminateUnions extends Rule[LogicalPlan] { } } +/** + * Converts foldable numeric expressions to integers of [[GlobalLimit]] and [[LocalLimit]] operators + */ +object ResolveLimits extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case g @ GlobalLimit(limitExpr, _) if limitExpr.foldable && isNumeric(limitExpr.eval()) => + g.copy(limitExpr = Literal(limitExpr.eval().asInstanceOf[Number].intValue(), IntegerType)) + case l @ LocalLimit(limitExpr, _) if limitExpr.foldable && isNumeric(limitExpr.eval()) => + l.copy(limitExpr = Literal(limitExpr.eval().asInstanceOf[Number].intValue(), IntegerType)) + } + + private def isNumeric(value: Any): Boolean = + scala.util.Try(value.asInstanceOf[Number].intValue()).isSuccess +} + /** * Cleans up unnecessary Aliases inside the plan. Basically we only need Alias as a top level * expression in Project(project list) or Aggregate(aggregate expressions) or diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index f9d41d420b82..b974589b231a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -50,14 +50,13 @@ trait CheckAnalysis extends PredicateHelper { if (!limitExpr.foldable) { failAnalysis( "The argument to the LIMIT clause must evaluate to a constant value. " + - s"Limit:${limitExpr.sql}") + s"Limit:${limitExpr.sql}") } - limitExpr.eval() match { - case o: Int if o >= 0 => // OK - case o: Int => failAnalysis( - s"number_rows in limit clause must be equal to or greater than 0. number_rows:$o") - case o => failAnalysis( - s"number_rows in limit clause cannot be cast to integer:$o") + limitExpr match { + case IntegerLiteral(limit) if limit >= 0 => // OK + case IntegerLiteral(limit) => failAnalysis( + s"number_rows in limit clause must be equal to or greater than 0. number_rows:$limit") + case o => failAnalysis(s"""number_rows in limit clause cannot be cast to integer:"$o".""") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index c0e400f61777..5ff369fb35e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -659,7 +659,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN } } override lazy val statistics: Statistics = { - val limit = limitExpr.eval().asInstanceOf[Int] + val limit = limitExpr.eval().asInstanceOf[Number].intValue() val sizeInBytes = if (limit == 0) { // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero // (product of children). @@ -680,7 +680,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo } } override lazy val statistics: Statistics = { - val limit = limitExpr.eval().asInstanceOf[Int] + val limit = limitExpr.eval().asInstanceOf[Number].intValue() val sizeInBytes = if (limit == 0) { // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero // (product of children). diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index cff4c5411422..e18f5f76a175 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -670,6 +670,26 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer( sql("SELECT * FROM mapData LIMIT 1"), mapData.collect().take(1).map(Row.fromTuple).toSeq) + + checkAnswer( + sql("SELECT * FROM mapData LIMIT CAST(1 AS Double)"), + mapData.collect().take(1).map(Row.fromTuple).toSeq) + + checkAnswer( + sql("SELECT * FROM mapData LIMIT CAST(1 AS BYTE)"), + mapData.collect().take(1).map(Row.fromTuple).toSeq) + + checkAnswer( + sql("SELECT * FROM mapData LIMIT CAST(1 AS LONG)"), + mapData.collect().take(1).map(Row.fromTuple).toSeq) + + checkAnswer( + sql("SELECT * FROM mapData LIMIT CAST(1 AS SHORT)"), + mapData.collect().take(1).map(Row.fromTuple).toSeq) + + checkAnswer( + sql("SELECT * FROM mapData LIMIT CAST(1 AS FLOAT)"), + mapData.collect().take(1).map(Row.fromTuple).toSeq) } test("non-foldable expressions in LIMIT") { @@ -681,10 +701,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("Limit: unable to evaluate and cast expressions in limit clauses to Int") { - val e = intercept[AnalysisException] { + var e = intercept[AnalysisException] { sql("SELECT * FROM testData LIMIT true") }.getMessage - assert(e.contains("number_rows in limit clause cannot be cast to integer:true")) + assert(e.contains("number_rows in limit clause cannot be cast to integer:\"true\"")) + + e = intercept[AnalysisException] { + sql("SELECT * FROM testData LIMIT 'a'") + }.getMessage + assert(e.contains("number_rows in limit clause cannot be cast to integer:\"a\"")) } test("negative in LIMIT or TABLESAMPLE") { From d135b77b2d3dd2f9f6ae160bfb24b73df34d3995 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 8 Jul 2016 17:42:06 -0700 Subject: [PATCH 10/15] change another way --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index aaf509a5a6df..9a24cb7ddb86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2047,14 +2047,14 @@ object EliminateUnions extends Rule[LogicalPlan] { } /** - * Converts foldable numeric expressions to integers of [[GlobalLimit]] and [[LocalLimit]] operators + * Converts foldable numeric expressions to integers in [[GlobalLimit]] and [[LocalLimit]] operators */ object ResolveLimits extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case g @ GlobalLimit(limitExpr, _) if limitExpr.foldable && isNumeric(limitExpr.eval()) => - g.copy(limitExpr = Literal(limitExpr.eval().asInstanceOf[Number].intValue(), IntegerType)) + g.copy(limitExpr = Literal(Cast(limitExpr, IntegerType).eval(), IntegerType)) case l @ LocalLimit(limitExpr, _) if limitExpr.foldable && isNumeric(limitExpr.eval()) => - l.copy(limitExpr = Literal(limitExpr.eval().asInstanceOf[Number].intValue(), IntegerType)) + l.copy(limitExpr = Literal(Cast(limitExpr, IntegerType).eval(), IntegerType)) } private def isNumeric(value: Any): Boolean = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index b974589b231a..ba6129a551ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -52,6 +52,7 @@ trait CheckAnalysis extends PredicateHelper { "The argument to the LIMIT clause must evaluate to a constant value. " + s"Limit:${limitExpr.sql}") } + // Analyzer rule ResolveLimits already converts limitExpr to integers. limitExpr match { case IntegerLiteral(limit) if limit >= 0 => // OK case IntegerLiteral(limit) => failAnalysis( From 028aa79e3c0b57cf887d4d735a387a5043612281 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 9 Jul 2016 08:06:46 -0700 Subject: [PATCH 11/15] revert it back --- .../sql/catalyst/analysis/Analyzer.scala | 17 ---------------- .../sql/catalyst/analysis/CheckAnalysis.scala | 14 ++++++------- .../plans/logical/basicLogicalOperators.scala | 4 ++-- .../org/apache/spark/sql/SQLQuerySuite.scala | 20 ------------------- 4 files changed, 9 insertions(+), 46 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9a24cb7ddb86..d1d2c59caed9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -109,8 +109,6 @@ class Analyzer( TimeWindowing :: TypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), - Batch("LIMIT", Once, - ResolveLimits), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, @@ -2046,21 +2044,6 @@ object EliminateUnions extends Rule[LogicalPlan] { } } -/** - * Converts foldable numeric expressions to integers in [[GlobalLimit]] and [[LocalLimit]] operators - */ -object ResolveLimits extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case g @ GlobalLimit(limitExpr, _) if limitExpr.foldable && isNumeric(limitExpr.eval()) => - g.copy(limitExpr = Literal(Cast(limitExpr, IntegerType).eval(), IntegerType)) - case l @ LocalLimit(limitExpr, _) if limitExpr.foldable && isNumeric(limitExpr.eval()) => - l.copy(limitExpr = Literal(Cast(limitExpr, IntegerType).eval(), IntegerType)) - } - - private def isNumeric(value: Any): Boolean = - scala.util.Try(value.asInstanceOf[Number].intValue()).isSuccess -} - /** * Cleans up unnecessary Aliases inside the plan. Basically we only need Alias as a top level * expression in Project(project list) or Aggregate(aggregate expressions) or diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index ba6129a551ff..ed3586dbcb15 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -50,14 +50,14 @@ trait CheckAnalysis extends PredicateHelper { if (!limitExpr.foldable) { failAnalysis( "The argument to the LIMIT clause must evaluate to a constant value. " + - s"Limit:${limitExpr.sql}") + s"Limit:${limitExpr.sql}") } - // Analyzer rule ResolveLimits already converts limitExpr to integers. - limitExpr match { - case IntegerLiteral(limit) if limit >= 0 => // OK - case IntegerLiteral(limit) => failAnalysis( - s"number_rows in limit clause must be equal to or greater than 0. number_rows:$limit") - case o => failAnalysis(s"""number_rows in limit clause cannot be cast to integer:"$o".""") + limitExpr.eval() match { + case o: Int if o >= 0 => // OK + case o: Int => failAnalysis( + s"number_rows in limit clause must be equal to or greater than 0. number_rows:$o") + case o => failAnalysis( + s"""number_rows in limit clause cannot be cast to integer:\"$o\".""") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 5ff369fb35e6..c0e400f61777 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -659,7 +659,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN } } override lazy val statistics: Statistics = { - val limit = limitExpr.eval().asInstanceOf[Number].intValue() + val limit = limitExpr.eval().asInstanceOf[Int] val sizeInBytes = if (limit == 0) { // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero // (product of children). @@ -680,7 +680,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo } } override lazy val statistics: Statistics = { - val limit = limitExpr.eval().asInstanceOf[Number].intValue() + val limit = limitExpr.eval().asInstanceOf[Int] val sizeInBytes = if (limit == 0) { // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero // (product of children). diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e18f5f76a175..ccf7791dbe95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -670,26 +670,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer( sql("SELECT * FROM mapData LIMIT 1"), mapData.collect().take(1).map(Row.fromTuple).toSeq) - - checkAnswer( - sql("SELECT * FROM mapData LIMIT CAST(1 AS Double)"), - mapData.collect().take(1).map(Row.fromTuple).toSeq) - - checkAnswer( - sql("SELECT * FROM mapData LIMIT CAST(1 AS BYTE)"), - mapData.collect().take(1).map(Row.fromTuple).toSeq) - - checkAnswer( - sql("SELECT * FROM mapData LIMIT CAST(1 AS LONG)"), - mapData.collect().take(1).map(Row.fromTuple).toSeq) - - checkAnswer( - sql("SELECT * FROM mapData LIMIT CAST(1 AS SHORT)"), - mapData.collect().take(1).map(Row.fromTuple).toSeq) - - checkAnswer( - sql("SELECT * FROM mapData LIMIT CAST(1 AS FLOAT)"), - mapData.collect().take(1).map(Row.fromTuple).toSeq) } test("non-foldable expressions in LIMIT") { From 01137dcf739e75be31ba8836e342537f66971aa3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 10 Jul 2016 08:17:28 -0700 Subject: [PATCH 12/15] address comments --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index ed3586dbcb15..dd653ebeab91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -57,7 +57,7 @@ trait CheckAnalysis extends PredicateHelper { case o: Int => failAnalysis( s"number_rows in limit clause must be equal to or greater than 0. number_rows:$o") case o => failAnalysis( - s"""number_rows in limit clause cannot be cast to integer:\"$o\".""") + s"""number_rows in limit clause must be integer. number_rows:\"$o\".""") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index ccf7791dbe95..d73e3e799b2c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -684,12 +684,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { var e = intercept[AnalysisException] { sql("SELECT * FROM testData LIMIT true") }.getMessage - assert(e.contains("number_rows in limit clause cannot be cast to integer:\"true\"")) + assert(e.contains("number_rows in limit clause must be integer. number_rows:\"true\"")) e = intercept[AnalysisException] { sql("SELECT * FROM testData LIMIT 'a'") }.getMessage - assert(e.contains("number_rows in limit clause cannot be cast to integer:\"a\"")) + assert(e.contains("number_rows in limit clause must be integer. number_rows:\"a\"")) } test("negative in LIMIT or TABLESAMPLE") { From dec5ad95bdd003fe58e92d1245388fa4758d8f49 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 10 Jul 2016 20:08:11 -0700 Subject: [PATCH 13/15] address comments --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../test/scala/org/apache/spark/sql/StatisticsSuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index f65bf6929a32..3c8dab50c356 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -680,7 +680,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { "Limit:(testdata.`key` > 3)")) } - test("Limit: unable to evaluate and cast expressions in limit clauses to Int") { + test("Expressions in limit clause are not integer") { var e = intercept[AnalysisException] { sql("SELECT * FROM testData LIMIT true") }.getMessage diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala index 8b331a66f8d9..ab55242ec068 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala @@ -45,14 +45,14 @@ class StatisticsSuite extends QueryTest with SharedSQLContext { } assert(sizesGlobalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}") assert(sizesGlobalLimit.head === BigInt(expected), - s"expected exact size 24 for table 'test', got: ${sizesGlobalLimit.head}") + s"expected exact size $expected for table 'test', got: ${sizesGlobalLimit.head}") val sizesLocalLimit = df.queryExecution.analyzed.collect { case l: LocalLimit => l.statistics.sizeInBytes } assert(sizesLocalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}") assert(sizesLocalLimit.head === BigInt(expected), - s"expected exact size 24 for table 'test', got: ${sizesLocalLimit.head}") + s"expected exact size $expected for table 'test', got: ${sizesLocalLimit.head}") } } } @@ -69,7 +69,7 @@ class StatisticsSuite extends QueryTest with SharedSQLContext { g.statistics.sizeInBytes } - assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizes.size === 1, s"number of Join nodes is wrong:\n ${df.queryExecution}") assert(sizes.head === BigInt(96), s"expected exact size 96 for table 'test', got: ${sizes.head}") } From 2e6f8d8c8b5007302415b7fd984a38fc51be44bf Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 10 Jul 2016 21:02:53 -0700 Subject: [PATCH 14/15] address comments --- .../sql/catalyst/analysis/CheckAnalysis.scala | 22 +++++++++---------- .../org/apache/spark/sql/SQLQuerySuite.scala | 10 ++++----- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index dd653ebeab91..8b87a4e41c23 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -47,17 +47,17 @@ trait CheckAnalysis extends PredicateHelper { } private def checkLimitClause(limitExpr: Expression): Unit = { - if (!limitExpr.foldable) { - failAnalysis( - "The argument to the LIMIT clause must evaluate to a constant value. " + - s"Limit:${limitExpr.sql}") - } - limitExpr.eval() match { - case o: Int if o >= 0 => // OK - case o: Int => failAnalysis( - s"number_rows in limit clause must be equal to or greater than 0. number_rows:$o") - case o => failAnalysis( - s"""number_rows in limit clause must be integer. number_rows:\"$o\".""") + limitExpr match { + case e if !e.foldable => failAnalysis( + "The limit expression must evaluate to a constant value, but got " + + limitExpr.sql) + case e if e.dataType != IntegerType => failAnalysis( + s"The limit expression must be integer type, but got " + + e.dataType.simpleString) + case e if e.eval().asInstanceOf[Int] < 0 => failAnalysis( + "The limit expression must be equal to or greater than 0, but got " + + e.eval().asInstanceOf[Int]) + case e => // OK } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 3c8dab50c356..ede7d9a0c95b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -676,24 +676,24 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val e = intercept[AnalysisException] { sql("SELECT * FROM testData LIMIT key > 3") }.getMessage - assert(e.contains("The argument to the LIMIT clause must evaluate to a constant value. " + - "Limit:(testdata.`key` > 3)")) + assert(e.contains("The limit expression must evaluate to a constant value, " + + "but got (testdata.`key` > 3)")) } test("Expressions in limit clause are not integer") { var e = intercept[AnalysisException] { sql("SELECT * FROM testData LIMIT true") }.getMessage - assert(e.contains("number_rows in limit clause must be integer. number_rows:\"true\"")) + assert(e.contains("The limit expression must be integer type, but got boolean")) e = intercept[AnalysisException] { sql("SELECT * FROM testData LIMIT 'a'") }.getMessage - assert(e.contains("number_rows in limit clause must be integer. number_rows:\"a\"")) + assert(e.contains("The limit expression must be integer type, but got string")) } test("negative in LIMIT or TABLESAMPLE") { - val expected = "number_rows in limit clause must be equal to or greater than 0. number_rows:-1" + val expected = "The limit expression must be equal to or greater than 0, but got -1" var e = intercept[AnalysisException] { sql("SELECT * FROM testData TABLESAMPLE (-1 rows)") }.getMessage From d66870bb274a206f16d33f56214246b17953a90e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 10 Jul 2016 23:12:31 -0700 Subject: [PATCH 15/15] fix test case --- .../apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 9ad60691c65a..ff112c51697a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -355,7 +355,7 @@ class AnalysisErrorSuite extends AnalysisTest { errorTest( "num_rows in limit clause must be equal to or greater than 0", listRelation.limit(-1), - "number_rows in limit clause must be equal to or greater than 0. number_rows:-1" :: Nil + "The limit expression must be equal to or greater than 0, but got -1" :: Nil ) errorTest(