diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 41fe0c3b60d9..ccc7aba42c41 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1335,7 +1335,11 @@ object SQLConf { "overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite " + "those partitions that have data written into it at runtime. By default we use static " + "mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't " + - "affect Hive serde tables, as they are always overwritten with dynamic mode.") + "affect Hive serde tables, as they are always overwritten with dynamic mode. This can " + + "also be set as an output option for a data source using key partitionOverwriteMode " + + "(which takes precendence over this setting), e.g. " + + "dataframe.write.option(\"partitionOverwriteMode\", \"dynamic\").save(path)." + ) .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(PartitionOverwriteMode.values.map(_.toString)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index dd7ef0d15c14..8a2e00d9780e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode @@ -91,8 +92,12 @@ case class InsertIntoHadoopFsRelationCommand( val pathExists = fs.exists(qualifiedOutputPath) - val enableDynamicOverwrite = - sparkSession.sessionState.conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC + val parameters = CaseInsensitiveMap(options) + + val partitionOverwriteMode = parameters.get("partitionOverwriteMode") + .map(mode => PartitionOverwriteMode.withName(mode.toUpperCase)) + .getOrElse(sparkSession.sessionState.conf.partitionOverwriteMode) + val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC // This config only makes sense when we are overwriting a partitioned dataset with dynamic // partition columns. val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite && diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 438d5d8176b8..0b6d93975dae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -545,6 +545,26 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { } } + test("SPARK-24860: dynamic partition overwrite specified per source without catalog table") { + withTempPath { path => + Seq((1, 1), (2, 2)).toDF("i", "part") + .write.partitionBy("part") + .parquet(path.getAbsolutePath) + checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 1) :: Row(2, 2) :: Nil) + + Seq((1, 2), (1, 3)).toDF("i", "part") + .write.partitionBy("part").mode("overwrite") + .option("partitionOverwriteMode", "dynamic").parquet(path.getAbsolutePath) + checkAnswer(spark.read.parquet(path.getAbsolutePath), + Row(1, 1) :: Row(1, 2) :: Row(1, 3) :: Nil) + + Seq((1, 2), (1, 3)).toDF("i", "part") + .write.partitionBy("part").mode("overwrite") + .option("partitionOverwriteMode", "static").parquet(path.getAbsolutePath) + checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 2) :: Row(1, 3) :: Nil) + } + } + test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") { withTable("test_table") { val schema = new StructType()