diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5245c14a4c96..512c8f78eab6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -445,21 +445,28 @@ case class DataSource( // If we are appending to a table that already exists, make sure the partitioning matches // up. If we fail to load the table for whatever reason, ignore the check. - if (mode == SaveMode.Append) { - val existingPartitionColumns = Try { - getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList - }.getOrElse(Seq.empty[String]) - // TODO: Case sensitivity. - val sameColumns = - existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase()) - if (existingPartitionColumns.nonEmpty && !sameColumns) { - throw new AnalysisException( - s"""Requested partitioning does not match existing partitioning. - |Existing partitioning columns: - | ${existingPartitionColumns.mkString(", ")} - |Requested partitioning columns: - | ${partitionColumns.mkString(", ")} - |""".stripMargin) + + // SPARK-18917 Add Skip Partition Check Flag to avoid list all leaf files in append mode + val skipPartitionCheckOnAppend = sparkSession.sessionState.conf.skipPartitionCheckOnAppend + if (skipPartitionCheckOnAppend) { + logInfo("Skipping Partition Check on Append Mode") + } else { + if (mode == SaveMode.Append) { + val existingPartitionColumns = Try { + getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList + }.getOrElse(Seq.empty[String]) + // TODO: Case sensitivity. + val sameColumns = + existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase()) + if (existingPartitionColumns.nonEmpty && !sameColumns) { + throw new AnalysisException( + s"""Requested partitioning does not match existing partitioning. + |Existing partitioning columns: + | ${existingPartitionColumns.mkString(", ")} + |Requested partitioning columns: + | ${partitionColumns.mkString(", ")} + |""".stripMargin) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4d25f54caa13..b14096c6c884 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -546,6 +546,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val SKIP_PARTITION_CHECK_ON_APPEND = + SQLConfigBuilder("spark.sql.execution.skipPartitionCheckOnAppend") + .internal() + .doc("Decides if we need to skip Partition Check on Append Mode. +" + + "Enable this is when writing to Object Stores to avoid time out issues") + .booleanConf + .createWithDefault(false) + val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion") .internal() .doc("Whether to delete the expired log files in file stream sink.") @@ -813,6 +821,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG) + def skipPartitionCheckOnAppend: Boolean = getConf(SKIP_PARTITION_CHECK_ON_APPEND) + def objectAggSortBasedFallbackThreshold: Int = getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD) def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)