diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 8e8210e334a1..4a521997be04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} +import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, InsertTableCommand, ShowTablesCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _} import org.apache.spark.util.Utils @@ -114,7 +114,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { /** - * Returns the result as a hive compatible sequence of strings. This is for testing only. + * Returns the result as a hive compatible sequence of strings. */ def hiveResultString(): Seq[String] = executedPlan match { case ExecutedCommandExec(desc: DescribeTableCommand) => @@ -130,12 +130,17 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { // SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp. case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => command.executeCollect().map(_.getString(1)) + case insertCommand @ ExecutedCommandExec(_: InsertTableCommand) => + // Insert command will start a new execution through FileFormatWriter + insertCommand.executeCollect().map(_.toString) case other => - val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq - // We need the types so we can output struct field names - val types = analyzed.output.map(_.dataType) - // Reformat to match hive tab delimited output. - result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")) + SQLExecution.withNewExecutionId(sparkSession, this) { + val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq + // We need the types so we can output struct field names + val types = analyzed.output.map(_.dataType) + // Reformat to match hive tab delimited output. + result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")) + } } /** Formats a datum (based on the given data type) and returns the string representation. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 5843c5b56d44..0ae1e0da2e4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -1025,3 +1025,5 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman builder.toString() } } + +trait InsertTableCommand extends RunnableCommand diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 19b51d4d9530..814092fb25ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -49,7 +49,7 @@ case class InsertIntoHadoopFsRelationCommand( mode: SaveMode, catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex]) - extends RunnableCommand { + extends InsertTableCommand { import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3682dc850790..4d014bcee1d6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.command.InsertTableCommand import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} @@ -78,7 +78,7 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, - ifNotExists: Boolean) extends RunnableCommand { + ifNotExists: Boolean) extends InsertTableCommand { override protected def innerChildren: Seq[LogicalPlan] = query :: Nil