Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ object PartitioningUtils {
"root directory of the table. If there are multiple root directories, " +
"please load them separately and then union them.")

val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone)

// Creates the StructType which represents the partition columns.
val fields = {
Expand Down Expand Up @@ -318,7 +318,8 @@ object PartitioningUtils {
* }}}
*/
def resolvePartitions(
pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
pathsWithPartitionValues: Seq[(Path, PartitionValues)],
timeZone: TimeZone): Seq[PartitionValues] = {
if (pathsWithPartitionValues.isEmpty) {
Seq.empty
} else {
Expand All @@ -333,7 +334,7 @@ object PartitioningUtils {
val values = pathsWithPartitionValues.map(_._2)
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
resolveTypeConflicts(values.map(_.literals(i)), timeZone)
}

// Fills resolved literals back to each partition
Expand Down Expand Up @@ -470,15 +471,15 @@ object PartitioningUtils {
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
* types.
*/
private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = {
val desiredType = {
val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for such a quick fix!

but, don't we also need to update upCastingOrder? It seems this happens to work because upcastingOrder.indexOf(TimestampType) = -1. But I think that doesn't compare the right way with NullType, so if you add this to ParquetPartitionDiscoverySuite.test("parse partitions"), it fails:

    check(Seq(
      s"hdfs://host:9000/path/a=$defaultPartitionName/b=blah",
      s"hdfs://host:9000/path/a=2014-01-01 00%3A00%3A00.0/b=foo"),
      PartitionSpec(
        StructType(Seq(
          StructField("a", TimestampType),
          StructField("b", StringType))),
        Seq(
          Partition(InternalRow(null, "blah"),
            s"hdfs://host:9000/path/a=$defaultPartitionName/b=blah"),
          Partition(InternalRow(Timestamp.valueOf("2014-01-01 00:00:00.0"), "foo"),
            s"hdfs://host:9000/path/a=2014-01-01 00%3A00%3A00.0/b=foo"))))

(I have to admit, I don't totally understand what the ramifications of that fail are -- the behavior in the resulting dataframe seems fine to me, but I figure there is probably some case this would mess up ...)

Copy link
Member Author

@HyukjinKwon HyukjinKwon Sep 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Actually, I believe this PR fixed only the (corner) regression between 2.1.0 and 2.2.0 because, if I understood correctly, we started to support infer timestamp in partition column from 2.1.0, SPARK-17388, and this corner regression was introduced from 2.2.0 SPARK-18939 (not tested).

For the issue you described above, which I believe existed from 2.1.0 (not tested), I think that also should be applied to DecimalType, DateType and TimestampType which were started to be supported from SPARK-17388 and should strictly be related with this issue, SPARK-22109, but orthogonal.

For perfectness, I guess we should port this logic:

/**
* Case 2 type widening (see the classdoc comment above for TypeCoercion).
*
* i.e. the main difference with [[findTightestCommonType]] is that here we allow some
* loss of precision when widening decimal and double, and promotion to string.
*/
private[analysis] def findWiderTypeForTwo(t1: DataType, t2: DataType): Option[DataType] = {
findTightestCommonType(t1, t2)
.orElse(findWiderTypeForDecimal(t1, t2))
.orElse(stringPromotion(t1, t2))
.orElse((t1, t2) match {
case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) =>
findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2))
case _ => None
})
}

because the problem is, TimestampType, DateType and DecimalType can't be upcasted to other types easily by comparing numeric precedence, but of course with few special handling because we are currently only inferring decimals when scale <= 0 (e.g., not 1.1) and castable to a decimal before trying a double:

val decimalTry = Try {
// `BigDecimal` conversion can fail when the `field` is not a form of number.
val bigDecimal = new JBigDecimal(raw)
// It reduces the cases for decimals by disallowing values having scale (eg. `1.1`).
require(bigDecimal.scale <= 0)
// `DecimalType` conversion can fail when
// 1. The precision is bigger than 38.
// 2. scale is bigger than precision.
Literal(bigDecimal)
}

This actually also could a problem of types between DateType and TimestampType which should be upcastable (from date to timestamp), which might end up with DateType.

Let me take a closer look and probably make a fix soon.

// Falls back to string if all values of this column are null or empty string
if (topType == NullType) StringType else topType
}

literals.map { case l @ Literal(_, dataType) =>
Literal.create(Cast(l, desiredType).eval(), desiredType)
Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), desiredType)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1055,4 +1055,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
}

test("SPARK-22109: Resolve type conflicts between strings and timestamps in partition column") {
val df = Seq(
(1, "2015-01-01 00:00:00"),
(2, "2014-01-01 00:00:00"),
(3, "blah")).toDF("i", "str")

withTempPath { path =>
df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath)
checkAnswer(spark.read.load(path.getAbsolutePath), df)
}
}
}