diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 46444f0a0560..8ddeb5edf943 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -188,15 +188,13 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } val outputPath = t.location.rootPaths.head - if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath) - val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append val partitionSchema = actualQuery.resolve( t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get } - InsertIntoHadoopFsRelationCommand( + val insertCommand = InsertIntoHadoopFsRelationCommand( outputPath, staticPartitions, i.ifPartitionNotExists, @@ -209,6 +207,14 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast table, Some(t.location), actualQuery.output.map(_.name)) + + // For dynamic partition overwrite, we do not delete partition directories ahead. + // We write to staging directories and move to final partition directories after writing + // job is done. So it is ok to have outputPath try to overwrite inputpath. + if (overwrite && !insertCommand.dynamicPartitionOverwrite) { + DDLUtils.verifyNotReadPath(actualQuery, outputPath) + } + insertCommand } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index fbe874b3e8bc..f11972115e09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.util.SchemaUtils @@ -60,6 +61,21 @@ case class InsertIntoHadoopFsRelationCommand( extends DataWritingCommand { import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName + private lazy val parameters = CaseInsensitiveMap(options) + + private[sql] lazy val dynamicPartitionOverwrite: Boolean = { + val partitionOverwriteMode = parameters.get("partitionOverwriteMode") + // scalastyle:off caselocale + .map(mode => PartitionOverwriteMode.withName(mode.toUpperCase)) + // scalastyle:on caselocale + .getOrElse(SQLConf.get.partitionOverwriteMode) + val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC + // This config only makes sense when we are overwriting a partitioned dataset with dynamic + // partition columns. + enableDynamicOverwrite && mode == SaveMode.Overwrite && + staticPartitions.size < partitionColumns.length + } + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { // Most formats don't do well with duplicate columns, so lets not allow that SchemaUtils.checkColumnNameDuplication( @@ -90,19 +106,6 @@ case class InsertIntoHadoopFsRelationCommand( fs, catalogTable.get, qualifiedOutputPath, matchingPartitions) } - val parameters = CaseInsensitiveMap(options) - - val partitionOverwriteMode = parameters.get("partitionOverwriteMode") - // scalastyle:off caselocale - .map(mode => PartitionOverwriteMode.withName(mode.toUpperCase)) - // scalastyle:on caselocale - .getOrElse(sparkSession.sessionState.conf.partitionOverwriteMode) - val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC - // This config only makes sense when we are overwriting a partitioned dataset with dynamic - // partition columns. - val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite && - staticPartitions.size < partitionColumns.length - val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index fbde38322fca..871cb1ff151b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -270,6 +270,55 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { "INSERT OVERWRITE to a table while querying it should not be allowed.") } + test("SPARK-30112: it is allowed to write to a table while querying it for " + + "dynamic partition overwrite.") { + Seq(PartitionOverwriteMode.DYNAMIC.toString, + PartitionOverwriteMode.STATIC.toString).foreach { mode => + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> mode) { + withTable("insertTable") { + sql( + """ + |CREATE TABLE insertTable(i int, part1 int, part2 int) USING PARQUET + |PARTITIONED BY (part1, part2) + """.stripMargin) + + sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2=1) SELECT 1") + checkAnswer(spark.table("insertTable"), Row(1, 1, 1)) + sql("INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2=2) SELECT 2") + checkAnswer(spark.table("insertTable"), Row(1, 1, 1) :: Row(2, 1, 2) :: Nil) + + if (mode == PartitionOverwriteMode.DYNAMIC.toString) { + sql( + """ + |INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2) + |SELECT i + 1, part2 FROM insertTable + """.stripMargin) + checkAnswer(spark.table("insertTable"), Row(2, 1, 1) :: Row(3, 1, 2) :: Nil) + + sql( + """ + |INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2) + |SELECT i + 1, part2 + 1 FROM insertTable + """.stripMargin) + checkAnswer(spark.table("insertTable"), + Row(2, 1, 1) :: Row(3, 1, 2) :: Row(4, 1, 3) :: Nil) + } else { + val message = intercept[AnalysisException] { + sql( + """ + |INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2) + |SELECT i + 1, part2 FROM insertTable + """.stripMargin) + }.getMessage + assert( + message.contains("Cannot overwrite a path that is also being read from."), + "INSERT OVERWRITE to a table while querying it should not be allowed.") + } + } + } + } + } + test("Caching") { // write something to the jsonTable sql(