Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,26 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v }
}


/**
* The partition path created by Hive is in lowercase, while Spark SQL will
* rename it with the partition name in partitionColumnNames, and this function
* returns the extra lowercase path created by Hive, and then we can delete it.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: all of them are commas. You need to use periods. : )

* e.g. /path/A=1/B=2/C=3 is changed to /path/A=4/B=5/C=6, this function returns
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same issue here.

* /path/a=4
*/
def getExtraPartPathCreatedByHive(
spec: TablePartitionSpec,
partitionColumnNames: Seq[String],
tablePath: Path): Path = {
val partColumnNames = partitionColumnNames
.take(partitionColumnNames.indexWhere(col => col.toLowerCase != col) + 1)
.map(_.toLowerCase)

ExternalCatalogUtils.generatePartitionPath(lowerCasePartitionSpec(spec),
partColumnNames, tablePath)
}

override def createPartitions(
db: String,
table: String,
Expand Down Expand Up @@ -899,6 +919,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
spec, partitionColumnNames, tablePath)
try {
tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found an issue here... When we call rename, not all the file systems have the same behaviors. For example, on mac OS, when we doing this .../tbl/a=5/b=6 -> .../tbl/A=5/B=6 . The result is .../tbl/a=5/B=6. Thus, it is not recursive. However, the file system used in Jenkin does not have such an issue. You can hit this issue if you are using macOS. Thus, this fix causes an regression, but the bug is not in your fix.


// If the newSpec contains more than one depth partition, FileSystem.rename just deletes
// the leaf(i.e. wrongPath), we should check if wrongPath's parents need to be deleted.
// For example, give a newSpec 'A=1/B=2', after calling Hive's client.renamePartitions,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

give -> given

// the location path in FileSystem is changed to 'a=1/b=2', which is wrongPath, then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

, then -> . Then

// although we renamed it to 'A=1/B=2', 'a=1/b=2' in FileSystem is deleted, but 'a=1'
Copy link
Member

@gatorsmile gatorsmile Jan 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to use a period here.

// is still exists, which we also need to delete
val delHivePartPathAfterRename = getExtraPartPathCreatedByHive(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, could it possibly have multiple specs sharing the same parent directory, e.g., 'A=1/B=2', 'A=1/B=3', ...?

If so, when you delete the path 'a=1' here, in processing the next spec 'A=1/B=3', I think the rename will fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path a=1 was created when you call client.renamePartitions, right? Based on my understanding, when you rename A=1/B=3, Hive will create the directory a=1 and a=1/b=3. Thus, the rename will not fail. Have you made a try?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client.renamePartitions is called at the beginning of renamePartitions for all specs at once. It creates the directory a=1 and a=1/b=2 and a=1/b=3.

When you iterates specs and rename the directories with FileSystem.rename, in the first iteration, a=1/b=2 is renamed, and a=1 is deleted in this change, then a=1/b=3 will be deleted too. So in next iteration, the renaming of a=1/b=3 to A=1/B=3 will fail.

Copy link
Member

@gatorsmile gatorsmile Jan 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far, the partition rename DDL we support is for a single pair of partition spec. That is, ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2. This PR will not introduce a bug to end users.

However, your concern looks reasonable. I think we should not support the partition renaming for multiple partitions in a single DDL in the SessionCatalog and ExternalCatalog. It just makes the code more complex for error handling. Let me remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be worse. If we already have a partition A=1/B=2, and we rename some other partition to A=1/B=3, then we will have A=1/B=2 and a=1/b=3, and we have a lot of work to do, instead of just a renaming.

spec,
partitionColumnNames,
tablePath)

if (delHivePartPathAfterRename != wrongPath) {
tablePath.getFileSystem(hadoopConf).delete(delHivePartPathAfterRename, true)
}
} catch {
case e: IOException => throw new SparkException(
s"Unable to rename partition path from $wrongPath to $rightPath", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package org.apache.spark.sql.hive

import java.io.File

import org.apache.hadoop.fs.Path

import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
Expand Down Expand Up @@ -481,4 +484,37 @@ class PartitionProviderCompatibilitySuite
assert(spark.sql("show partitions test").count() == 5)
}
}

test("partition path created by Hive should be deleted after renamePartitions with upper-case") {
withTable("t", "t1", "t2") {
Seq((1, 2, 3)).toDF("id", "A", "B").write.partitionBy("A", "B").saveAsTable("t")
spark.sql("alter table t partition(A=2, B=3) rename to partition(A=4, B=5)")

var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
var extraHivePath = new Path(table.location + "/a=4")
assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf())
.exists(extraHivePath), "partition path created by Hive should be deleted " +
"after renamePartitions with upper-case")

Seq((1, 2, 3, 4)).toDF("id", "A", "B", "C").write.partitionBy("A", "B", "C").saveAsTable("t1")
spark.sql("alter table t1 partition(A=2, B=3, C=4) rename to partition(A=5, B=6, C=7)")
table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
extraHivePath = new Path(table.location + "/a=5")
assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf())
.exists(extraHivePath), "partition path created by Hive should be deleted " +
"after renamePartitions with upper-case")

Seq((1, 2, 3, 4)).toDF("id", "a", "B", "C").write.partitionBy("a", "B", "C").saveAsTable("t2")
spark.sql("alter table t2 partition(a=2, B=3, C=4) rename to partition(a=4, B=5, C=6)")
table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
val partPath = new Path(table.location + "/a=4")
assert(partPath.getFileSystem(spark.sessionState.newHadoopConf())
.exists(partPath), "partition path of lower-case partition name should not be deleted")

extraHivePath = new Path(table.location + "/a=4/b=5")
assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf())
.exists(extraHivePath), "partition path created by Hive should be deleted " +
"after renamePartitions with upper-case")
}
}
}