Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,11 @@ object SQLConf {
"overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite " +
"those partitions that have data written into it at runtime. By default we use static " +
"mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't " +
"affect Hive serde tables, as they are always overwritten with dynamic mode.")
"affect Hive serde tables, as they are always overwritten with dynamic mode. This can " +
"also be set as an output option for a data source using key partitionOverwriteMode " +
"(which takes precendence over this setting), e.g. " +
"dataframe.write.option(\"partitionOverwriteMode\", \"dynamic\").save(path)."
)
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(PartitionOverwriteMode.values.map(_.toString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
Expand Down Expand Up @@ -91,8 +92,12 @@ case class InsertIntoHadoopFsRelationCommand(

val pathExists = fs.exists(qualifiedOutputPath)

val enableDynamicOverwrite =
sparkSession.sessionState.conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
val parameters = CaseInsensitiveMap(options)

val partitionOverwriteMode = parameters.get("partitionOverwriteMode")
.map(mode => PartitionOverwriteMode.withName(mode.toUpperCase))
.getOrElse(sparkSession.sessionState.conf.partitionOverwriteMode)
val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
// This config only makes sense when we are overwriting a partitioned dataset with dynamic
// partition columns.
val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,26 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
}
}

test("SPARK-24860: dynamic partition overwrite specified per source without catalog table") {
withTempPath { path =>
Seq((1, 1), (2, 2)).toDF("i", "part")
.write.partitionBy("part")
.parquet(path.getAbsolutePath)
checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 1) :: Row(2, 2) :: Nil)

Seq((1, 2), (1, 3)).toDF("i", "part")
.write.partitionBy("part").mode("overwrite")
.option("partitionOverwriteMode", "dynamic").parquet(path.getAbsolutePath)
checkAnswer(spark.read.parquet(path.getAbsolutePath),
Row(1, 1) :: Row(1, 2) :: Row(1, 3) :: Nil)

Seq((1, 2), (1, 3)).toDF("i", "part")
.write.partitionBy("part").mode("overwrite")
.option("partitionOverwriteMode", "static").parquet(path.getAbsolutePath)
checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 2) :: Row(1, 3) :: Nil)
}
}

test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") {
withTable("test_table") {
val schema = new StructType()
Expand Down