diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index dbc6db62bd82..076bce77a174 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{execution, AnalysisException, Strategy} +import org.apache.spark.sql.{execution, AnalysisException, SparkSession, Strategy} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ @@ -534,7 +534,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object BasicOperators extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil + case d: DataWritingCommand => + val plan = SparkSession.getActiveSession.get.sessionState.executePlan(d.query).sparkPlan + DataWritingCommandExec(d, plan) :: Nil case r: RunnableCommand => ExecutedCommandExec(r) :: Nil case MemoryPlan(sink, output) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index e11dbd201004..4579daaae36a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -39,7 +39,7 @@ trait DataWritingCommand extends Command { */ def query: LogicalPlan - override final def children: Seq[LogicalPlan] = query :: Nil + override final def innerChildren: Seq[LogicalPlan] = query :: Nil // Output columns of the analyzed input query plan def outputColumns: Seq[Attribute] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 6708a50a961f..018471a39976 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -2252,6 +2252,38 @@ class HiveDDLSuite } } + test("SPARK-25135: FileFormatWriter should respect the input query schema in HIVE") { + withTempPath { dir => + val path = dir.getCanonicalPath + val cnt = 30 + val table1Path = s"$path/table1" + val table2Path = s"$path/table2" + val table3Path = s"$path/table3" + val data = + spark.range(cnt).selectExpr("cast(id as bigint) as col1", "cast(id % 3 as bigint) as col2") + data.write.mode(SaveMode.Overwrite).parquet(table1Path) + withTable("table1", "table2", "table3") { + spark.sql( + s"CREATE TABLE table1(col1 bigint, col2 bigint) using parquet location '$table1Path'") + spark.sql( + s"CREATE TABLE table2(COL1 bigint, COL2 bigint) using parquet location '$table2Path'") + spark.sql("CREATE TABLE table3(COL1 bigint, COL2 bigint) using parquet " + + "PARTITIONED BY (COL2) " + + s"CLUSTERED BY (COL1) INTO 2 BUCKETS location '$table3Path'") + + withView("view1") { + spark.sql("CREATE VIEW view1 as select col1, col2 from table1 where col1 > -20") + spark.sql("INSERT OVERWRITE TABLE table2 select COL1, COL2 from view1") + checkAnswer(spark.table("table2"), data) + assert(spark.read.parquet(table2Path).schema === spark.table("table2").schema) + + spark.sql("INSERT OVERWRITE TABLE table3 select COL1, COL2 from view1 CLUSTER BY COL1") + checkAnswer(spark.table("table3"), data) + } + } + } + } + test("SPARK-24812: desc formatted table for last access verification") { withTable("t1") { sql(