Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ case class DataSource(
_ => Unit, // No existing table needs to be refreshed.
options,
data.logicalPlan,
mode)
mode,
catalogTable)
sparkSession.sessionState.executePlan(plan).toRdd
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {


case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false)
l @ LogicalRelation(t: HadoopFsRelation, _, table), part, query, overwrite, false)
if query.resolved && t.schema.asNullable == query.schema.asNullable =>

// Sanity checks
Expand Down Expand Up @@ -222,7 +222,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
refreshPartitionsCallback,
t.options,
query,
mode)
mode,
table)

insertCmd
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -41,7 +41,8 @@ case class InsertIntoHadoopFsRelationCommand(
refreshFunction: (Seq[TablePartitionSpec]) => Unit,
options: Map[String, String],
@transient query: LogicalPlan,
mode: SaveMode)
mode: SaveMode,
catalogTable: Option[CatalogTable])
extends RunnableCommand {

override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {

val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.sparkPlan match {
case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " +
Expand Down Expand Up @@ -337,7 +338,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {

val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.sparkPlan match {
case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." +
Expand Down