Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{execution, AnalysisException, Strategy}
import org.apache.spark.sql.{execution, AnalysisException, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -534,7 +534,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
case d: DataWritingCommand =>
val plan = SparkSession.getActiveSession.get.sessionState.executePlan(d.query).sparkPlan
DataWritingCommandExec(d, plan) :: Nil
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil

case MemoryPlan(sink, output) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ trait DataWritingCommand extends Command {
*/
def query: LogicalPlan

override final def children: Seq[LogicalPlan] = query :: Nil
override final def innerChildren: Seq[LogicalPlan] = query :: Nil

// Output columns of the analyzed input query plan
def outputColumns: Seq[Attribute]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2252,6 +2252,38 @@ class HiveDDLSuite
}
}

test("SPARK-25135: FileFormatWriter should respect the input query schema in HIVE") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val cnt = 30
val table1Path = s"$path/table1"
val table2Path = s"$path/table2"
val table3Path = s"$path/table3"
val data =
spark.range(cnt).selectExpr("cast(id as bigint) as col1", "cast(id % 3 as bigint) as col2")
data.write.mode(SaveMode.Overwrite).parquet(table1Path)
withTable("table1", "table2", "table3") {
spark.sql(
s"CREATE TABLE table1(col1 bigint, col2 bigint) using parquet location '$table1Path'")
spark.sql(
s"CREATE TABLE table2(COL1 bigint, COL2 bigint) using parquet location '$table2Path'")
spark.sql("CREATE TABLE table3(COL1 bigint, COL2 bigint) using parquet " +
"PARTITIONED BY (COL2) " +
s"CLUSTERED BY (COL1) INTO 2 BUCKETS location '$table3Path'")

withView("view1") {
spark.sql("CREATE VIEW view1 as select col1, col2 from table1 where col1 > -20")
spark.sql("INSERT OVERWRITE TABLE table2 select COL1, COL2 from view1")
checkAnswer(spark.table("table2"), data)
assert(spark.read.parquet(table2Path).schema === spark.table("table2").schema)

spark.sql("INSERT OVERWRITE TABLE table3 select COL1, COL2 from view1 CLUSTER BY COL1")
checkAnswer(spark.table("table3"), data)
}
}
}
}

test("SPARK-24812: desc formatted table for last access verification") {
withTable("t1") {
sql(
Expand Down