From d7674d0ca0a55f1b7dbc731153918027d1c116d6 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 3 Jun 2014 13:35:12 +0800 Subject: [PATCH 1/5] [SPARK-1852] prevents queries with sorts submitting jobs prematurely --- .../org/apache/spark/sql/SQLContext.scala | 22 +++++++++---------- .../apache/spark/sql/hive/HiveContext.scala | 17 +++++++------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 043be58edc91b..015c03c80f22e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -25,20 +25,15 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental} import org.apache.spark.rdd.RDD - import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.{ScalaReflection, dsl} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor - +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.{ScalaReflection, dsl} import org.apache.spark.sql.columnar.InMemoryColumnarTableScan - import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.SparkStrategies - import org.apache.spark.sql.parquet.ParquetRelation /** @@ -148,10 +143,13 @@ class SQLContext(@transient val sparkContext: SparkContext) */ def sql(sqlText: String): SchemaRDD = { val result = new SchemaRDD(this, parseSql(sqlText)) - // We force query optimization to happen right away instead of letting it happen lazily like - // when using the query DSL. This is so DDL commands behave as expected. This is only - // generates the RDD lineage for DML queries, but do not perform any execution. - result.queryExecution.toRdd + result.logicalPlan match { + // We force query optimization to happen right away instead of letting it happen lazily like + // when using the query DSL. This is so DDL commands behave as expected. This is only + // generates the RDD lineage for DML queries, but do not perform any execution. + case _: Command => result.queryExecution.toRdd + case _ => + } result } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index b21f24dad785d..66de407c772a5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -30,14 +30,12 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema} -import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand} -import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution._ @@ -79,10 +77,13 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { */ def hiveql(hqlQuery: String): SchemaRDD = { val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery)) - // We force query optimization to happen right away instead of letting it happen lazily like - // when using the query DSL. This is so DDL commands behave as expected. This is only - // generates the RDD lineage for DML queries, but does not perform any execution. - result.queryExecution.toRdd + result.logicalPlan match { + // We force query optimization to happen right away instead of letting it happen lazily like + // when using the query DSL. This is so DDL commands behave as expected. This is only + // generates the RDD lineage for DML queries, but do not perform any execution. + case _: Command => result.queryExecution.toRdd + case _ => + } result } From aef872157307aa916f0aca294f3ac28ba2a4529c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 3 Jun 2014 18:18:02 +0800 Subject: [PATCH 2/5] Took insertion into account --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 4 +++- .../main/scala/org/apache/spark/sql/hive/HiveContext.scala | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 015c03c80f22e..692b0159b9e3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -28,6 +28,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, InsertIntoCreatedTable} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.types._ @@ -147,7 +148,8 @@ class SQLContext(@transient val sparkContext: SparkContext) // We force query optimization to happen right away instead of letting it happen lazily like // when using the query DSL. This is so DDL commands behave as expected. This is only // generates the RDD lineage for DML queries, but do not perform any execution. - case _: Command => result.queryExecution.toRdd + case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable => + result.queryExecution.toRdd case _ => } result diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 66de407c772a5..88fed895f3449 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -81,7 +81,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { // We force query optimization to happen right away instead of letting it happen lazily like // when using the query DSL. This is so DDL commands behave as expected. This is only // generates the RDD lineage for DML queries, but do not perform any execution. - case _: Command => result.queryExecution.toRdd + case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable => + result.queryExecution.toRdd case _ => } result From 2bf0e20550526ca0ddf544781821a78347fa8ae4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 9 Jun 2014 17:39:50 -0700 Subject: [PATCH 3/5] WriteToFile should be executed eagerly too --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 4 ++-- .../main/scala/org/apache/spark/sql/hive/HiveContext.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 692b0159b9e3a..f9e158f5afbfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, InsertIntoCreatedTable} -import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, WriteToFile} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl} @@ -148,7 +148,7 @@ class SQLContext(@transient val sparkContext: SparkContext) // We force query optimization to happen right away instead of letting it happen lazily like // when using the query DSL. This is so DDL commands behave as expected. This is only // generates the RDD lineage for DML queries, but do not perform any execution. - case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable => + case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile => result.queryExecution.toRdd case _ => } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 88fed895f3449..b2bddc4df3631 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -81,7 +81,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { // We force query optimization to happen right away instead of letting it happen lazily like // when using the query DSL. This is so DDL commands behave as expected. This is only // generates the RDD lineage for DML queries, but do not perform any execution. - case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable => + case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile => result.queryExecution.toRdd case _ => } From d459c15dc057d396bf5dc6bf0cd78a92e4dc2dc3 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 9 Jun 2014 19:26:31 -0700 Subject: [PATCH 4/5] Refactored duplicated code to SchemaRDD --- .../scala/org/apache/spark/sql/SQLContext.scala | 13 +------------ .../scala/org/apache/spark/sql/SchemaRDDLike.scala | 9 +++++++++ .../org/apache/spark/sql/hive/HiveContext.scala | 13 +------------ 3 files changed, 11 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f9e158f5afbfb..a06c91bf8599d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -142,18 +142,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * * @group userf */ - def sql(sqlText: String): SchemaRDD = { - val result = new SchemaRDD(this, parseSql(sqlText)) - result.logicalPlan match { - // We force query optimization to happen right away instead of letting it happen lazily like - // when using the query DSL. This is so DDL commands behave as expected. This is only - // generates the RDD lineage for DML queries, but do not perform any execution. - case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile => - result.queryExecution.toRdd - case _ => - } - result - } + def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText)) /** Returns the specified table as a SchemaRDD */ def table(tableName: String): SchemaRDD = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index 3a895e15a4508..8f6cdf8da57e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -50,6 +50,15 @@ private[sql] trait SchemaRDDLike { @DeveloperApi lazy val queryExecution = sqlContext.executePlan(logicalPlan) + logicalPlan match { + // We force query optimization to happen right away instead of letting it happen lazily like + // when using the query DSL. This is so DDL commands behave as expected. This is only + // generates the RDD lineage for DML queries, but do not perform any execution. + case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile => + queryExecution.toRdd + case _ => + } + override def toString = s"""${super.toString} |== Query Plan == diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index b2bddc4df3631..527f8ab47b392 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -75,18 +75,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /** * Executes a query expressed in HiveQL using Spark, returning the result as a SchemaRDD. */ - def hiveql(hqlQuery: String): SchemaRDD = { - val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery)) - result.logicalPlan match { - // We force query optimization to happen right away instead of letting it happen lazily like - // when using the query DSL. This is so DDL commands behave as expected. This is only - // generates the RDD lineage for DML queries, but do not perform any execution. - case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile => - result.queryExecution.toRdd - case _ => - } - result - } + def hiveql(hqlQuery: String): SchemaRDD = new SchemaRDD(this, HiveQl.parseSql(hqlQuery)) /** An alias for `hiveql`. */ def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery) From 2c9052a4a249db1d981d7f7878f4dc466d9197a9 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 9 Jun 2014 20:09:48 -0700 Subject: [PATCH 5/5] Removed an unused import --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a06c91bf8599d..2ac0741e51944 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -28,8 +28,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, InsertIntoCreatedTable} -import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, WriteToFile} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}