Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import java.math.{BigDecimal => JBigDecimal}
import scala.collection.mutable.ArrayBuffer
import scala.util.Try

import com.google.common.cache.{CacheBuilder, Cache}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
Expand Down Expand Up @@ -65,7 +64,7 @@ private[sql] object PartitioningUtils {
private[sql] def parsePartitions(
paths: Seq[Path],
defaultPartitionName: String): PartitionSpec = {
val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName)))
val partitionValues = resolvePartitions(paths.flatMap(parsePartition(_, defaultPartitionName)))
val fields = {
val (PartitionValues(columnNames, literals)) = partitionValues.head
columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
Expand Down Expand Up @@ -99,21 +98,27 @@ private[sql] object PartitioningUtils {
*/
private[sql] def parsePartition(
path: Path,
defaultPartitionName: String): PartitionValues = {
defaultPartitionName: String): Option[PartitionValues] = {
val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
var chopped = path

while (!finished) {
// Sometimes (e.g., when speculative task is enabled), temporary directories may be left
// uncleaned. Here we simply ignore them.
if (chopped.getName == "_temporary") {
return None
}

val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
maybeColumn.foreach(columns += _)
chopped = chopped.getParent
finished = maybeColumn.isEmpty || chopped.getParent == null
}

val (columnNames, values) = columns.reverse.unzip
PartitionValues(columnNames, values)
Some(PartitionValues(columnNames, values))
}

private def parsePartitionColumn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,44 +54,47 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
}

test("parse partition") {
def check(path: String, expected: PartitionValues): Unit = {
def check(path: String, expected: Option[PartitionValues]): Unit = {
assert(expected === parsePartition(new Path(path), defaultPartitionName))
}

def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
val message = intercept[T] {
parsePartition(new Path(path), defaultPartitionName)
parsePartition(new Path(path), defaultPartitionName).get
}.getMessage

assert(message.contains(expected))
}

check(
"file:///",
check("file:///", Some {
PartitionValues(
ArrayBuffer.empty[String],
ArrayBuffer.empty[Literal]))
ArrayBuffer.empty[Literal])
})

check(
"file://path/a=10",
check("file://path/a=10", Some {
PartitionValues(
ArrayBuffer("a"),
ArrayBuffer(Literal.create(10, IntegerType))))
ArrayBuffer(Literal.create(10, IntegerType)))
})

check(
"file://path/a=10/b=hello/c=1.5",
check("file://path/a=10/b=hello/c=1.5", Some {
PartitionValues(
ArrayBuffer("a", "b", "c"),
ArrayBuffer(
Literal.create(10, IntegerType),
Literal.create("hello", StringType),
Literal.create(1.5, FloatType))))
Literal.create(1.5, FloatType)))
})

check(
"file://path/a=10/b_hello/c=1.5",
check("file://path/a=10/b_hello/c=1.5", Some {
PartitionValues(
ArrayBuffer("c"),
ArrayBuffer(Literal.create(1.5, FloatType))))
ArrayBuffer(Literal.create(1.5, FloatType)))
})

check("file://path/a=10/_temporary/c=1.5", None)
check("file://path/a=10/c=1.5/_temporary", None)

checkThrows[AssertionError]("file://path/=10", "Empty partition column name")
checkThrows[AssertionError]("file://path/a=", "Empty partition column value")
Expand Down