Skip to content

Commit dacc382

Browse files
committed
[SPARK-19887][SQL] dynamic partition keys can be null or empty string
## What changes were proposed in this pull request? When dynamic partition value is null or empty string, we should write the data to a directory like `a=__HIVE_DEFAULT_PARTITION__`, when we read the data back, we should respect this special directory name and treat it as null. This is the same behavior of impala, see https://issues.apache.org/jira/browse/IMPALA-252 ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17277 from cloud-fan/partition.
1 parent 7ded39c commit dacc382

File tree

7 files changed

+39
-16
lines changed

7 files changed

+39
-16
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ object ExternalCatalogUtils {
118118
}
119119

120120
def getPartitionPathString(col: String, value: String): String = {
121-
val partitionString = if (value == null) {
121+
val partitionString = if (value == null || value.isEmpty) {
122122
DEFAULT_PARTITION_NAME
123123
} else {
124124
escapePathName(value)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,12 @@ case class CatalogTablePartition(
116116
val timeZoneId = caseInsensitiveProperties.getOrElse(
117117
DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
118118
InternalRow.fromSeq(partitionSchema.map { field =>
119-
Cast(Literal(spec(field.name)), field.dataType, Option(timeZoneId)).eval()
119+
val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
120+
null
121+
} else {
122+
spec(field.name)
123+
}
124+
Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
120125
})
121126
}
122127
}
@@ -164,7 +169,7 @@ case class BucketSpec(
164169
* @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
165170
* catalog. If false, it is inferred automatically based on file
166171
* structure.
167-
* @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive.
172+
* @param schemaPreservesCase Whether or not the schema resolved for this table is case-sensitive.
168173
* When using a Hive Metastore, this flag is set to false if a case-
169174
* sensitive schema was unable to be read from the table properties.
170175
* Used to trigger case-sensitive schema inference at query time, when

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ case class FileSourceScanExec(
319319
val input = ctx.freshName("input")
320320
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
321321
val exprRows = output.zipWithIndex.map{ case (a, i) =>
322-
new BoundReference(i, a.dataType, a.nullable)
322+
BoundReference(i, a.dataType, a.nullable)
323323
}
324324
val row = ctx.freshName("row")
325325
ctx.INPUT_ROW = row

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -335,14 +335,11 @@ object FileFormatWriter extends Logging {
335335
/** Expressions that given partition columns build a path string like: col1=val/col2=val/... */
336336
private def partitionPathExpression: Seq[Expression] = {
337337
desc.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
338-
val escaped = ScalaUDF(
339-
ExternalCatalogUtils.escapePathName _,
338+
val partitionName = ScalaUDF(
339+
ExternalCatalogUtils.getPartitionPathString _,
340340
StringType,
341-
Seq(Cast(c, StringType, Option(desc.timeZoneId))),
342-
Seq(StringType))
343-
val str = If(IsNull(c), Literal(ExternalCatalogUtils.DEFAULT_PARTITION_NAME), escaped)
344-
val partitionName = Literal(ExternalCatalogUtils.escapePathName(c.name) + "=") :: str :: Nil
345-
if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
341+
Seq(Literal(c.name), Cast(c, StringType, Option(desc.timeZoneId))))
342+
if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)
346343
}
347344
}
348345

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3333
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
3434
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3535
import org.apache.spark.sql.types._
36-
import org.apache.spark.unsafe.types.UTF8String
3736

3837
// TODO: We should tighten up visibility of the classes here once we clean up Hive coupling.
3938

@@ -129,7 +128,7 @@ object PartitioningUtils {
129128
// "hdfs://host:9000/invalidPath"
130129
// "hdfs://host:9000/path"
131130
// TODO: Selective case sensitivity.
132-
val discoveredBasePaths = optDiscoveredBasePaths.flatMap(x => x).map(_.toString.toLowerCase())
131+
val discoveredBasePaths = optDiscoveredBasePaths.flatten.map(_.toString.toLowerCase())
133132
assert(
134133
discoveredBasePaths.distinct.size == 1,
135134
"Conflicting directory structures detected. Suspicious paths:\b" +

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,8 +1012,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
10121012
val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
10131013
val clientPartitionNames =
10141014
client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec))
1015-
clientPartitionNames.map { partName =>
1016-
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName)
1015+
clientPartitionNames.map { partitionPath =>
1016+
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
10171017
partSpec.map { case (partName, partValue) =>
10181018
partColNameMap(partName.toLowerCase) + "=" + escapePathName(partValue)
10191019
}.mkString("/")

sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.io.File
2222
import org.apache.hadoop.fs.Path
2323

2424
import org.apache.spark.metrics.source.HiveCatalogMetrics
25-
import org.apache.spark.sql.{AnalysisException, QueryTest}
25+
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
2626
import org.apache.spark.sql.catalyst.TableIdentifier
2727
import org.apache.spark.sql.hive.test.TestHiveSingleton
2828
import org.apache.spark.sql.internal.SQLConf
@@ -316,6 +316,28 @@ class PartitionProviderCompatibilitySuite
316316
}
317317
}
318318
}
319+
320+
test(s"SPARK-19887 partition value is null - partition management $enabled") {
321+
withTable("test") {
322+
Seq((1, "p", 1), (2, null, 2)).toDF("a", "b", "c")
323+
.write.partitionBy("b", "c").saveAsTable("test")
324+
checkAnswer(spark.table("test"),
325+
Row(1, "p", 1) :: Row(2, null, 2) :: Nil)
326+
327+
Seq((3, null: String, 3)).toDF("a", "b", "c")
328+
.write.mode("append").partitionBy("b", "c").saveAsTable("test")
329+
checkAnswer(spark.table("test"),
330+
Row(1, "p", 1) :: Row(2, null, 2) :: Row(3, null, 3) :: Nil)
331+
// make sure partition pruning also works.
332+
checkAnswer(spark.table("test").filter($"b".isNotNull), Row(1, "p", 1))
333+
334+
// empty string is an invalid partition value and we treat it as null when read back.
335+
Seq((4, "", 4)).toDF("a", "b", "c")
336+
.write.mode("append").partitionBy("b", "c").saveAsTable("test")
337+
checkAnswer(spark.table("test"),
338+
Row(1, "p", 1) :: Row(2, null, 2) :: Row(3, null, 3) :: Row(4, null, 4) :: Nil)
339+
}
340+
}
319341
}
320342

321343
/**

0 commit comments

Comments
 (0)