Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -445,21 +445,28 @@ case class DataSource(

// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
val existingPartitionColumns = Try {
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
}.getOrElse(Seq.empty[String])
// TODO: Case sensitivity.
val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
if (existingPartitionColumns.nonEmpty && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
|Existing partitioning columns:
| ${existingPartitionColumns.mkString(", ")}
|Requested partitioning columns:
| ${partitionColumns.mkString(", ")}
|""".stripMargin)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just remove this check? it's too expensive. cc @marmbrus

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems fine to remove in the case of files. Can we keep the track for catalog tables?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, we can remove the parm justPartitioning from the function getOrInferFileFormatSchema

// SPARK-18917 Add Skip Partition Check Flag to avoid list all leaf files in append mode
val skipPartitionCheckOnAppend = sparkSession.sessionState.conf.skipPartitionCheckOnAppend
if (skipPartitionCheckOnAppend) {
logInfo("Skipping Partition Check on Append Mode")
} else {
if (mode == SaveMode.Append) {
val existingPartitionColumns = Try {
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
}.getOrElse(Seq.empty[String])
// TODO: Case sensitivity.
val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
if (existingPartitionColumns.nonEmpty && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
|Existing partitioning columns:
| ${existingPartitionColumns.mkString(", ")}
|Requested partitioning columns:
| ${partitionColumns.mkString(", ")}
|""".stripMargin)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val SKIP_PARTITION_CHECK_ON_APPEND =
SQLConfigBuilder("spark.sql.execution.skipPartitionCheckOnAppend")
.internal()
.doc("Decides if we need to skip Partition Check on Append Mode. +" +
"Enable this is when writing to Object Stores to avoid time out issues")
.booleanConf
.createWithDefault(false)

val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
.internal()
.doc("Whether to delete the expired log files in file stream sink.")
Expand Down Expand Up @@ -813,6 +821,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG)

def skipPartitionCheckOnAppend: Boolean = getConf(SKIP_PARTITION_CHECK_ON_APPEND)

def objectAggSortBasedFallbackThreshold: Int = getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD)

def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)
Expand Down