Skip to content

Commit

Permalink
[SPARK-22165][SQL] Fixes type conflicts between double, long, decimal…
Browse files Browse the repository at this point in the history
…s, dates and timestamps in partition column

## What changes were proposed in this pull request?

This PR proposes to add a rule that re-uses `TypeCoercion.findWiderCommonType` when resolving type conflicts in partition values.

Currently, this uses numeric precedence-like comparison; therefore, it looks introducing failures for type conflicts between timestamps, dates and decimals, please see:

```scala
private val upCastingOrder: Seq[DataType] =
  Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
...
literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
```

The codes below:

```scala
val df = Seq((1, "2015-01-01"), (2, "2016-01-01 00:00:00")).toDF("i", "ts")
df.write.format("parquet").partitionBy("ts").save("/tmp/foo")
spark.read.load("/tmp/foo").printSchema()

val df = Seq((1, "1"), (2, "1" * 30)).toDF("i", "decimal")
df.write.format("parquet").partitionBy("decimal").save("/tmp/bar")
spark.read.load("/tmp/bar").printSchema()
```

produces output as below:

**Before**

```
root
 |-- i: integer (nullable = true)
 |-- ts: date (nullable = true)

root
 |-- i: integer (nullable = true)
 |-- decimal: integer (nullable = true)
```

**After**

```
root
 |-- i: integer (nullable = true)
 |-- ts: timestamp (nullable = true)

root
 |-- i: integer (nullable = true)
 |-- decimal: decimal(30,0) (nullable = true)
```

### Type coercion table:

This PR proposes the type conflict resolusion as below:

**Before**

|InputA \ InputB|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`|
|------------------------|----------|----------|----------|----------|----------|----------|----------|----------|
|**`NullType`**|`StringType`|`IntegerType`|`LongType`|`StringType`|`DoubleType`|`StringType`|`StringType`|`StringType`|
|**`IntegerType`**|`IntegerType`|`IntegerType`|`LongType`|`IntegerType`|`DoubleType`|`IntegerType`|`IntegerType`|`StringType`|
|**`LongType`**|`LongType`|`LongType`|`LongType`|`LongType`|`DoubleType`|`LongType`|`LongType`|`StringType`|
|**`DecimalType(38,0)`**|`StringType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DecimalType(38,0)`|`DecimalType(38,0)`|`StringType`|
|**`DoubleType`**|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`StringType`|
|**`DateType`**|`StringType`|`IntegerType`|`LongType`|`DateType`|`DoubleType`|`DateType`|`DateType`|`StringType`|
|**`TimestampType`**|`StringType`|`IntegerType`|`LongType`|`TimestampType`|`DoubleType`|`TimestampType`|`TimestampType`|`StringType`|
|**`StringType`**|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|

**After**

|InputA \ InputB|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`|
|------------------------|----------|----------|----------|----------|----------|----------|----------|----------|
|**`NullType`**|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`|
|**`IntegerType`**|`IntegerType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`StringType`|`StringType`|`StringType`|
|**`LongType`**|`LongType`|`LongType`|`LongType`|`DecimalType(38,0)`|`StringType`|`StringType`|`StringType`|`StringType`|
|**`DecimalType(38,0)`**|`DecimalType(38,0)`|`DecimalType(38,0)`|`DecimalType(38,0)`|`DecimalType(38,0)`|`StringType`|`StringType`|`StringType`|`StringType`|
|**`DoubleType`**|`DoubleType`|`DoubleType`|`StringType`|`StringType`|`DoubleType`|`StringType`|`StringType`|`StringType`|
|**`DateType`**|`DateType`|`StringType`|`StringType`|`StringType`|`StringType`|`DateType`|`TimestampType`|`StringType`|
|**`TimestampType`**|`TimestampType`|`StringType`|`StringType`|`StringType`|`StringType`|`TimestampType`|`TimestampType`|`StringType`|
|**`StringType`**|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|

This was produced by:

```scala
  test("Print out chart") {
    val supportedTypes: Seq[DataType] = Seq(
      NullType, IntegerType, LongType, DecimalType(38, 0), DoubleType,
      DateType, TimestampType, StringType)

    // Old type conflict resolution:
    val upCastingOrder: Seq[DataType] =
      Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
    def oldResolveTypeConflicts(dataTypes: Seq[DataType]): DataType = {
      val topType = dataTypes.maxBy(upCastingOrder.indexOf(_))
      if (topType == NullType) StringType else topType
    }
    println(s"|InputA \\ InputB|${supportedTypes.map(dt => s"`${dt.toString}`").mkString("|")}|")
    println(s"|------------------------|${supportedTypes.map(_ => "----------").mkString("|")}|")
    supportedTypes.foreach { inputA =>
      val types = supportedTypes.map(inputB => oldResolveTypeConflicts(Seq(inputA, inputB)))
      println(s"|**`$inputA`**|${types.map(dt => s"`${dt.toString}`").mkString("|")}|")
    }

    // New type conflict resolution:
    def newResolveTypeConflicts(dataTypes: Seq[DataType]): DataType = {
      dataTypes.fold[DataType](NullType)(findWiderTypeForPartitionColumn)
    }
    println(s"|InputA \\ InputB|${supportedTypes.map(dt => s"`${dt.toString}`").mkString("|")}|")
    println(s"|------------------------|${supportedTypes.map(_ => "----------").mkString("|")}|")
    supportedTypes.foreach { inputA =>
      val types = supportedTypes.map(inputB => newResolveTypeConflicts(Seq(inputA, inputB)))
      println(s"|**`$inputA`**|${types.map(dt => s"`${dt.toString}`").mkString("|")}|")
    }
  }
```

## How was this patch tested?

Unit tests added in `ParquetPartitionDiscoverySuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19389 from HyukjinKwon/partition-type-coercion.
  • Loading branch information
HyukjinKwon authored and cloud-fan committed Nov 21, 2017
1 parent 2d868d9 commit 6d7ebf2
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 23 deletions.
139 changes: 139 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,145 @@ options.

- Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`.
- The `percentile_approx` function previously accepted numeric type input and output double type results. Now it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles.
- Partition column inference previously found incorrect common type for different inferred types, for example, previously it ended up with double type as the common type for double type and date type. Now it finds the correct common type for such conflicts. The conflict resolution follows the table below:

<table class="table">
<tr>
<th>
<b>InputA \ InputB</b>
</th>
<th>
<b>NullType</b>
</th>
<th>
<b>IntegerType</b>
</th>
<th>
<b>LongType</b>
</th>
<th>
<b>DecimalType(38,0)*</b>
</th>
<th>
<b>DoubleType</b>
</th>
<th>
<b>DateType</b>
</th>
<th>
<b>TimestampType</b>
</th>
<th>
<b>StringType</b>
</th>
</tr>
<tr>
<td>
<b>NullType</b>
</td>
<td>NullType</td>
<td>IntegerType</td>
<td>LongType</td>
<td>DecimalType(38,0)</td>
<td>DoubleType</td>
<td>DateType</td>
<td>TimestampType</td>
<td>StringType</td>
</tr>
<tr>
<td>
<b>IntegerType</b>
</td>
<td>IntegerType</td>
<td>IntegerType</td>
<td>LongType</td>
<td>DecimalType(38,0)</td>
<td>DoubleType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
</tr>
<tr>
<td>
<b>LongType</b>
</td>
<td>LongType</td>
<td>LongType</td>
<td>LongType</td>
<td>DecimalType(38,0)</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
</tr>
<tr>
<td>
<b>DecimalType(38,0)*</b>
</td>
<td>DecimalType(38,0)</td>
<td>DecimalType(38,0)</td>
<td>DecimalType(38,0)</td>
<td>DecimalType(38,0)</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
</tr>
<tr>
<td>
<b>DoubleType</b>
</td>
<td>DoubleType</td>
<td>DoubleType</td>
<td>StringType</td>
<td>StringType</td>
<td>DoubleType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
</tr>
<tr>
<td>
<b>DateType</b>
</td>
<td>DateType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
<td>DateType</td>
<td>TimestampType</td>
<td>StringType</td>
</tr>
<tr>
<td>
<b>TimestampType</b>
</td>
<td>TimestampType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
<td>TimestampType</td>
<td>TimestampType</td>
<td>StringType</td>
</tr>
<tr>
<td>
<b>StringType</b>
</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
<td>StringType</td>
</tr>
</table>

Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.

## Upgrading From Spark SQL 2.1 to 2.2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ object TypeCoercion {
* i.e. the main difference with [[findTightestCommonType]] is that here we allow some
* loss of precision when widening decimal and double, and promotion to string.
*/
private[analysis] def findWiderTypeForTwo(t1: DataType, t2: DataType): Option[DataType] = {
def findWiderTypeForTwo(t1: DataType, t2: DataType): Option[DataType] = {
findTightestCommonType(t1, t2)
.orElse(findWiderTypeForDecimal(t1, t2))
.orElse(stringPromotion(t1, t2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down Expand Up @@ -309,13 +309,8 @@ object PartitioningUtils {
}

/**
* Resolves possible type conflicts between partitions by up-casting "lower" types. The up-
* casting order is:
* {{{
* NullType ->
* IntegerType -> LongType ->
* DoubleType -> StringType
* }}}
* Resolves possible type conflicts between partitions by up-casting "lower" types using
* [[findWiderTypeForPartitionColumn]].
*/
def resolvePartitions(
pathsWithPartitionValues: Seq[(Path, PartitionValues)],
Expand Down Expand Up @@ -372,11 +367,31 @@ object PartitioningUtils {
suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
}

// scalastyle:off line.size.limit
/**
* Converts a string to a [[Literal]] with automatic type inference. Currently only supports
* [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType]], [[DateType]]
* Converts a string to a [[Literal]] with automatic type inference. Currently only supports
* [[NullType]], [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType]], [[DateType]]
* [[TimestampType]], and [[StringType]].
*
* When resolving conflicts, it follows the table below:
*
* +--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
* | InputA \ InputB | NullType | IntegerType | LongType | DecimalType(38,0)* | DoubleType | DateType | TimestampType | StringType |
* +--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
* | NullType | NullType | IntegerType | LongType | DecimalType(38,0) | DoubleType | DateType | TimestampType | StringType |
* | IntegerType | IntegerType | IntegerType | LongType | DecimalType(38,0) | DoubleType | StringType | StringType | StringType |
* | LongType | LongType | LongType | LongType | DecimalType(38,0) | StringType | StringType | StringType | StringType |
* | DecimalType(38,0)* | DecimalType(38,0) | DecimalType(38,0) | DecimalType(38,0) | DecimalType(38,0) | StringType | StringType | StringType | StringType |
* | DoubleType | DoubleType | DoubleType | StringType | StringType | DoubleType | StringType | StringType | StringType |
* | DateType | DateType | StringType | StringType | StringType | StringType | DateType | TimestampType | StringType |
* | TimestampType | TimestampType | StringType | StringType | StringType | StringType | TimestampType | TimestampType | StringType |
* | StringType | StringType | StringType | StringType | StringType | StringType | StringType | StringType | StringType |
* +--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
* Note that, for DecimalType(38,0)*, the table above intentionally does not cover all other
* combinations of scales and precisions because currently we only infer decimal type like
* `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.
*/
// scalastyle:on line.size.limit
private[datasources] def inferPartitionColumnValue(
raw: String,
typeInference: Boolean,
Expand Down Expand Up @@ -427,9 +442,6 @@ object PartitioningUtils {
}
}

private val upCastingOrder: Seq[DataType] =
Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)

def validatePartitionColumn(
schema: StructType,
partitionColumns: Seq[String],
Expand Down Expand Up @@ -468,18 +480,26 @@ object PartitioningUtils {
}

/**
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
* types.
* Given a collection of [[Literal]]s, resolves possible type conflicts by
* [[findWiderTypeForPartitionColumn]].
*/
private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = {
val desiredType = {
val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
// Falls back to string if all values of this column are null or empty string
if (topType == NullType) StringType else topType
}
val litTypes = literals.map(_.dataType)
val desiredType = litTypes.reduce(findWiderTypeForPartitionColumn)

literals.map { case l @ Literal(_, dataType) =>
Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), desiredType)
}
}

/**
* Type widening rule for partition column types. It is similar to
* [[TypeCoercion.findWiderTypeForTwo]] but the main difference is that here we disallow
* precision loss when widening double/long and decimal, and fall back to string.
*/
private val findWiderTypeForPartitionColumn: (DataType, DataType) => DataType = {
case (DoubleType, _: DecimalType) | (_: DecimalType, DoubleType) => StringType
case (DoubleType, LongType) | (LongType, DoubleType) => StringType
case (t1, t2) => TypeCoercion.findWiderTypeForTwo(t1, t2).getOrElse(StringType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
true,
rootPaths,
timeZoneId)
assert(actualSpec.partitionColumns === spec.partitionColumns)
assert(actualSpec.partitions.length === spec.partitions.length)
actualSpec.partitions.zip(spec.partitions).foreach { case (actual, expected) =>
assert(actual === expected)
}
assert(actualSpec === spec)
}

Expand Down Expand Up @@ -314,7 +319,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
PartitionSpec(
StructType(Seq(
StructField("a", DoubleType),
StructField("b", StringType))),
StructField("b", NullType))),
Seq(
Partition(InternalRow(10, null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
Partition(InternalRow(10.5, null),
Expand All @@ -324,6 +329,32 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
s"hdfs://host:9000/path1",
s"hdfs://host:9000/path2"),
PartitionSpec.emptySpec)

// The cases below check the resolution for type conflicts.
val t1 = Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000
val t2 = Timestamp.valueOf("2014-01-01 00:01:00.0").getTime * 1000
// Values in column 'a' are inferred as null, date and timestamp each, and timestamp is set
// as a common type.
// Values in column 'b' are inferred as integer, decimal(22, 0) and null, and decimal(22, 0)
// is set as a common type.
check(Seq(
s"hdfs://host:9000/path/a=$defaultPartitionName/b=0",
s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111",
s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"),
PartitionSpec(
StructType(Seq(
StructField("a", TimestampType),
StructField("b", DecimalType(22, 0)))),
Seq(
Partition(
InternalRow(null, Decimal(0)),
s"hdfs://host:9000/path/a=$defaultPartitionName/b=0"),
Partition(
InternalRow(t1, Decimal(s"${Long.MaxValue}111")),
s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111"),
Partition(
InternalRow(t2, null),
s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"))))
}

test("parse partitions with type inference disabled") {
Expand Down Expand Up @@ -395,7 +426,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
PartitionSpec(
StructType(Seq(
StructField("a", StringType),
StructField("b", StringType))),
StructField("b", NullType))),
Seq(
Partition(InternalRow(UTF8String.fromString("10"), null),
s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
Expand Down Expand Up @@ -1067,4 +1098,26 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
checkAnswer(spark.read.load(path.getAbsolutePath), df)
}
}

test("Resolve type conflicts - decimals, dates and timestamps in partition column") {
withTempPath { path =>
val df = Seq((1, "2014-01-01"), (2, "2016-01-01"), (3, "2015-01-01 00:01:00")).toDF("i", "ts")
df.write.format("parquet").partitionBy("ts").save(path.getAbsolutePath)
checkAnswer(
spark.read.load(path.getAbsolutePath),
Row(1, Timestamp.valueOf("2014-01-01 00:00:00")) ::
Row(2, Timestamp.valueOf("2016-01-01 00:00:00")) ::
Row(3, Timestamp.valueOf("2015-01-01 00:01:00")) :: Nil)
}

withTempPath { path =>
val df = Seq((1, "1"), (2, "3"), (3, "2" * 30)).toDF("i", "decimal")
df.write.format("parquet").partitionBy("decimal").save(path.getAbsolutePath)
checkAnswer(
spark.read.load(path.getAbsolutePath),
Row(1, BigDecimal("1")) ::
Row(2, BigDecimal("3")) ::
Row(3, BigDecimal("2" * 30)) :: Nil)
}
}
}

0 comments on commit 6d7ebf2

Please sign in to comment.