Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.catalog

import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.Shell

import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec

object ExternalCatalogUtils {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl PartitioningUtils is not accessible in catalyst module, so I created this.

// This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since catalyst doesn't
// depend on Hive.
val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"

//////////////////////////////////////////////////////////////////////////////////////////////////
// The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
//////////////////////////////////////////////////////////////////////////////////////////////////

val charToEscape = {
val bitSet = new java.util.BitSet(128)

/**
* ASCII 01-1F are HTTP control characters that need to be escaped.
* \u000A and \u000D are \n and \r, respectively.
*/
val clist = Array(
'\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
'\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
'\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
'\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
'{', '[', ']', '^')

clist.foreach(bitSet.set(_))

if (Shell.WINDOWS) {
Array(' ', '<', '>', '|').foreach(bitSet.set(_))
}

bitSet
}

def needsEscaping(c: Char): Boolean = {
c >= 0 && c < charToEscape.size() && charToEscape.get(c)
}

def escapePathName(path: String): String = {
val builder = new StringBuilder()
path.foreach { c =>
if (needsEscaping(c)) {
builder.append('%')
builder.append(f"${c.asInstanceOf[Int]}%02X")
} else {
builder.append(c)
}
}

builder.toString()
}


def unescapePathName(path: String): String = {
val sb = new StringBuilder
var i = 0

while (i < path.length) {
val c = path.charAt(i)
if (c == '%' && i + 2 < path.length) {
val code: Int = try {
Integer.parseInt(path.substring(i + 1, i + 3), 16)
} catch {
case _: Exception => -1
}
if (code >= 0) {
sb.append(code.asInstanceOf[Char])
i += 3
} else {
sb.append(c)
i += 1
}
} else {
sb.append(c)
i += 1
}
}

sb.toString()
}

def generatePartitionPath(
spec: TablePartitionSpec,
partitionColumnNames: Seq[String],
tablePath: Path): Path = {
val partitionPathStrings = partitionColumnNames.map { col =>
val partitionValue = spec(col)
val partitionString = if (partitionValue == null) {
DEFAULT_PARTITION_NAME
} else {
escapePathName(partitionValue)
}
escapePathName(col) + "=" + partitionString
}
partitionPathStrings.foldLeft(tablePath) { (totalPath, nextPartPath) =>
new Path(totalPath, nextPartPath)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class InMemoryCatalog(
assert(tableMeta.storage.locationUri.isDefined,
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
val dir = new Path(tableMeta.storage.locationUri.get)
val dir = new Path(tableMeta.location)
try {
val fs = dir.getFileSystem(hadoopConfig)
fs.delete(dir, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the deletion is recursive and thus it can delete the lcation/directory of a partitioned table. I checked the implementation of delete. It sounds like it does not guaranttee the atomicity. The deletion could be partial. We might need to also put a TODO here, like what we did for dropPartition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's minor and not related to this PR, I'd like to not touch it.

Expand Down Expand Up @@ -259,7 +259,7 @@ class InMemoryCatalog(
assert(oldDesc.table.storage.locationUri.isDefined,
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
val oldDir = new Path(oldDesc.table.storage.locationUri.get)
val oldDir = new Path(oldDesc.table.location)
val newDir = new Path(catalog(db).db.locationUri, newName)
try {
val fs = oldDir.getFileSystem(hadoopConfig)
Expand Down Expand Up @@ -355,25 +355,28 @@ class InMemoryCatalog(
}
}

val tableDir = new Path(catalog(db).db.locationUri, table)
val partitionColumnNames = getTable(db, table).partitionColumnNames
val tableMeta = getTable(db, table)
val partitionColumnNames = tableMeta.partitionColumnNames
val tablePath = new Path(tableMeta.location)
// TODO: we should follow hive to roll back if one partition path failed to create.
parts.foreach { p =>
// If location is set, the partition is using an external partition location and we don't
// need to handle its directory.
if (p.storage.locationUri.isEmpty) {
val partitionPath = partitionColumnNames.flatMap { col =>
p.spec.get(col).map(col + "=" + _)
}.mkString("/")
try {
val fs = tableDir.getFileSystem(hadoopConfig)
fs.mkdirs(new Path(tableDir, partitionPath))
} catch {
case e: IOException =>
throw new SparkException(s"Unable to create partition path $partitionPath", e)
val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse {
ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath)
}

try {
val fs = tablePath.getFileSystem(hadoopConfig)
if (!fs.exists(partitionPath)) {
fs.mkdirs(partitionPath)
}
} catch {
case e: IOException =>
throw new SparkException(s"Unable to create partition path $partitionPath", e)
}
existingParts.put(p.spec, p)

existingParts.put(
p.spec,
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))))
}
}

Expand All @@ -392,19 +395,15 @@ class InMemoryCatalog(
}
}

val tableDir = new Path(catalog(db).db.locationUri, table)
val partitionColumnNames = getTable(db, table).partitionColumnNames
// TODO: we should follow hive to roll back if one partition path failed to delete.
val shouldRemovePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
// TODO: we should follow hive to roll back if one partition path failed to delete, and support
// partial partition spec.
partSpecs.foreach { p =>
// If location is set, the partition is using an external partition location and we don't
// need to handle its directory.
if (existingParts.contains(p) && existingParts(p).storage.locationUri.isEmpty) {
val partitionPath = partitionColumnNames.flatMap { col =>
p.get(col).map(col + "=" + _)
}.mkString("/")
if (existingParts.contains(p) && shouldRemovePartitionLocation) {
val partitionPath = new Path(existingParts(p).location)
try {
val fs = tableDir.getFileSystem(hadoopConfig)
fs.delete(new Path(tableDir, partitionPath), true)
val fs = partitionPath.getFileSystem(hadoopConfig)
fs.delete(partitionPath, true)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to delete partition path $partitionPath", e)
Expand All @@ -423,33 +422,34 @@ class InMemoryCatalog(
requirePartitionsExist(db, table, specs)
requirePartitionsNotExist(db, table, newSpecs)

val tableDir = new Path(catalog(db).db.locationUri, table)
val partitionColumnNames = getTable(db, table).partitionColumnNames
val tableMeta = getTable(db, table)
val partitionColumnNames = tableMeta.partitionColumnNames
val tablePath = new Path(tableMeta.location)
val shouldUpdatePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
val existingParts = catalog(db).tables(table).partitions
// TODO: we should follow hive to roll back if one partition path failed to rename.
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
val existingParts = catalog(db).tables(table).partitions

// If location is set, the partition is using an external partition location and we don't
// need to handle its directory.
if (newPart.storage.locationUri.isEmpty) {
val oldPath = partitionColumnNames.flatMap { col =>
oldSpec.get(col).map(col + "=" + _)
}.mkString("/")
val newPath = partitionColumnNames.flatMap { col =>
newSpec.get(col).map(col + "=" + _)
}.mkString("/")
val oldPartition = getPartition(db, table, oldSpec)
val newPartition = if (shouldUpdatePartitionLocation) {
val oldPartPath = new Path(oldPartition.location)
val newPartPath = ExternalCatalogUtils.generatePartitionPath(
newSpec, partitionColumnNames, tablePath)
try {
val fs = tableDir.getFileSystem(hadoopConfig)
fs.rename(new Path(tableDir, oldPath), new Path(tableDir, newPath))
val fs = tablePath.getFileSystem(hadoopConfig)
fs.rename(oldPartPath, newPartPath)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to rename partition path $oldPath", e)
throw new SparkException(s"Unable to rename partition path $oldPartPath", e)
}
oldPartition.copy(
spec = newSpec,
storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString)))
} else {
oldPartition.copy(spec = newSpec)
}

existingParts.remove(oldSpec)
existingParts.put(newSpec, newPart)
existingParts.put(newSpec, newPartition)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ case class CatalogTablePartition(
output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")")
}

/** Return the partition location, assuming it is specified. */
def location: String = storage.locationUri.getOrElse {
val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
throw new AnalysisException(s"Partition [$specString] did not specify locationUri")
}

/**
* Given the partition schema, returns a row with that schema holding the partition values.
*/
Expand Down Expand Up @@ -171,6 +177,11 @@ case class CatalogTable(
throw new AnalysisException(s"table $identifier did not specify database")
}

/** Return the table location, assuming it is specified. */
def location: String = storage.locationUri.getOrElse {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the other functions in this file, add the function descriptions too?

throw new AnalysisException(s"table $identifier did not specify locationUri")
}

/** Return the fully qualified name of this table, assuming the database was specified. */
def qualifiedName: String = identifier.unquotedString

Expand Down
Loading