Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,13 @@ case class CreateDataSourceTableAsSelectCommand(
bucketSpec = bucketSpec,
options = optionsWithPath)

val result = dataSource.write(mode, df)

val result = try {
dataSource.write(mode, df)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex)
throw ex
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log entry is mainly for catching the table name and mode, right?

if (createMetastoreTable) {
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,26 +435,25 @@ case class DataSource(
// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
val existingPartitionColumnSet = try {
Some(
resolveRelation()
.asInstanceOf[HadoopFsRelation]
.location
.partitionSpec()
.partitionColumns
.fieldNames
.toSet)
} catch {
case e: Exception =>
None
}

existingPartitionColumnSet.foreach { ex =>
if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) {
throw new AnalysisException(
s"Requested partitioning does not equal existing partitioning: " +
s"$ex != ${partitionColumns.toSet}.")
}
val existingColumns = Try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existingPartitionColumns

resolveRelation()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the returned partitioning columns are user-provided instead of existing dataset's partitioning columns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this triggers a partitioning discovery. We should avoid it.

.asInstanceOf[HadoopFsRelation]
.location
.partitionSpec()
.partitionColumns
.fieldNames
.toSeq
}.getOrElse(Seq.empty[String])
val sameColumns =
existingColumns.map(_.toLowerCase) == partitionColumns.map(_.toLowerCase)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, this is always case-insensitive resolution.

if (existingColumns.size > 0 && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add "Requested partitioning does not match existing partitioning for table $table" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated

|Existing partitioning columns:
| ${existingColumns.mkString(", ")}
|Requested partitioning columns:
| ${partitionColumns.mkString(", ")}
|""".stripMargin)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1317,4 +1317,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
}

test("SPARK-16034 Partition columns should match when appending to existing data source tables") {
import testImplicits._
val df = Seq((1, 2, 3)).toDF("a", "b", "c")
withTable("partitionedTable") {
df.write.mode("overwrite").partitionBy("a", "b").saveAsTable("partitionedTable")
// Misses some partition columns
intercept[AnalysisException] {
df.write.mode("append").partitionBy("a").saveAsTable("partitionedTable")
}
// Wrong order
intercept[AnalysisException] {
df.write.mode("append").partitionBy("b", "a").saveAsTable("partitionedTable")
}
// Partition columns not specified
intercept[AnalysisException] {
df.write.mode("append").saveAsTable("partitionedTable")
}
assert(sql("select * from partitionedTable").collect().size == 1)
// Inserts new data successfully when partition columns are correctly specified in
// partitionBy(...).
df.write.mode("append").partitionBy("a", "b").saveAsTable("partitionedTable")
assert(sql("select * from partitionedTable").collect().size == 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to check the answer.

}
}
}