Skip to content

Commit e5f4122

Browse files
committed
allow insert overwrite same table if dynamic partition overwrite.
1 parent e04a634 commit e5f4122

File tree

3 files changed

+64
-15
lines changed

3 files changed

+64
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,13 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
188188
}
189189

190190
val outputPath = t.location.rootPaths.head
191-
if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath)
192-
193191
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
194192

195193
val partitionSchema = actualQuery.resolve(
196194
t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
197195
val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
198196

199-
InsertIntoHadoopFsRelationCommand(
197+
val insertCommand = InsertIntoHadoopFsRelationCommand(
200198
outputPath,
201199
staticPartitions,
202200
i.ifPartitionNotExists,
@@ -209,6 +207,14 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
209207
table,
210208
Some(t.location),
211209
actualQuery.output.map(_.name))
210+
211+
// For dynamic partition overwrite, we do not delete partition directories ahead.
212+
// We write to staging directories and move to final partition directories after writing
213+
// job is done. So it is ok to have outputPath try to overwrite inputpath.
214+
if (!insertCommand.dynamicPartitionOverwriteEnabled) {
215+
DDLUtils.verifyNotReadPath(actualQuery, outputPath)
216+
}
217+
insertCommand
212218
}
213219
}
214220

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3030
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3131
import org.apache.spark.sql.execution.SparkPlan
3232
import org.apache.spark.sql.execution.command._
33+
import org.apache.spark.sql.internal.SQLConf
3334
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
3435
import org.apache.spark.sql.util.SchemaUtils
3536

@@ -60,6 +61,21 @@ case class InsertIntoHadoopFsRelationCommand(
6061
extends DataWritingCommand {
6162
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
6263

64+
private lazy val parameters = CaseInsensitiveMap(options)
65+
66+
private[sql] lazy val dynamicPartitionOverwriteEnabled: Boolean = {
67+
val partitionOverwriteMode = parameters.get("partitionOverwriteMode")
68+
// scalastyle:off caselocale
69+
.map(mode => PartitionOverwriteMode.withName(mode.toUpperCase))
70+
// scalastyle:on caselocale
71+
.getOrElse(SQLConf.get.partitionOverwriteMode)
72+
val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
73+
// This config only makes sense when we are overwriting a partitioned dataset with dynamic
74+
// partition columns.
75+
enableDynamicOverwrite && mode == SaveMode.Overwrite &&
76+
staticPartitions.size < partitionColumns.length
77+
}
78+
6379
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
6480
// Most formats don't do well with duplicate columns, so lets not allow that
6581
SchemaUtils.checkColumnNameDuplication(
@@ -90,18 +106,7 @@ case class InsertIntoHadoopFsRelationCommand(
90106
fs, catalogTable.get, qualifiedOutputPath, matchingPartitions)
91107
}
92108

93-
val parameters = CaseInsensitiveMap(options)
94-
95-
val partitionOverwriteMode = parameters.get("partitionOverwriteMode")
96-
// scalastyle:off caselocale
97-
.map(mode => PartitionOverwriteMode.withName(mode.toUpperCase))
98-
// scalastyle:on caselocale
99-
.getOrElse(sparkSession.sessionState.conf.partitionOverwriteMode)
100-
val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
101-
// This config only makes sense when we are overwriting a partitioned dataset with dynamic
102-
// partition columns.
103-
val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite &&
104-
staticPartitions.size < partitionColumns.length
109+
val dynamicPartitionOverwrite = dynamicPartitionOverwriteEnabled
105110

106111
val committer = FileCommitProtocol.instantiate(
107112
sparkSession.sessionState.conf.fileCommitProtocolClass,

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,44 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
270270
"INSERT OVERWRITE to a table while querying it should not be allowed.")
271271
}
272272

273+
test("it is allowed to write to a table while querying it for dynamic partition overwrite.") {
274+
Seq(PartitionOverwriteMode.DYNAMIC.toString,
275+
PartitionOverwriteMode.STATIC.toString).foreach { usedMode =>
276+
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> usedMode) {
277+
withTable("insertTable") {
278+
sql(
279+
"""
280+
|CREATE TABLE insertTable(i int, part1 int, part2 int) USING PARQUET
281+
|PARTITIONED BY (part1, part2)
282+
""".stripMargin)
283+
284+
sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2=1) SELECT 1")
285+
checkAnswer(spark.table("insertTable"), Row(1, 1, 1))
286+
287+
if (usedMode == PartitionOverwriteMode.DYNAMIC.toString) {
288+
sql(
289+
"""
290+
|INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2)
291+
|SELECT i + 1, part2 FROM insertTable
292+
""".stripMargin)
293+
checkAnswer(spark.table("insertTable"), Row(2, 1, 1))
294+
} else {
295+
val message = intercept[AnalysisException] {
296+
sql(
297+
"""
298+
|INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2)
299+
|SELECT i + 1, part2 FROM insertTable
300+
""".stripMargin)
301+
}.getMessage
302+
assert(
303+
message.contains("Cannot overwrite a path that is also being read from."),
304+
"INSERT OVERWRITE to a table while querying it should not be allowed.")
305+
}
306+
}
307+
}
308+
}
309+
}
310+
273311
test("Caching") {
274312
// write something to the jsonTable
275313
sql(

0 commit comments

Comments
 (0)