diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 31f1a5fdc7e53..cb314899a294a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import java.net.URLDecoder import java.sql.{Date, Timestamp} import java.text.{DateFormat, SimpleDateFormat} @@ -158,6 +159,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w if (periodIdx != -1 && n.length() - periodIdx > 9) { n = n.substring(0, periodIdx + 10) } + // Timestamp value in the partition could be in the form: (2015-02-09 00%3A55%3A00) + n = URLDecoder.decode(n, "UTF-8"); try Timestamp.valueOf(n) catch { case _: java.lang.IllegalArgumentException => null } }) case BooleanType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index cdf2bc68d9c5e..f7cad20b91554 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -434,7 +434,7 @@ case object BooleanType extends BooleanType * @group dataType */ @DeveloperApi -class TimestampType private() extends NativeType { +class TimestampType private() extends NativeType with PrimitiveType { // The companion object and this class is separated so the companion object also subclasses // this type. Otherwise, the companion object would be of type "TimestampType$" in byte code. // Defined with a private constructor so the companion object is the only possible instantiation. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index fcb9513ab66f6..a1e98bdbb6cf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -49,7 +49,8 @@ private[sql] case class ParquetRelation( path: String, @transient conf: Option[Configuration], @transient sqlContext: SQLContext, - partitioningAttributes: Seq[Attribute] = Nil) + partitioningAttributes: Seq[Attribute] = Nil, + partitionValues: String = "") extends LeafNode with MultiInstanceRelation { self: Product => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 1c868da23e060..b06356ad4637b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -80,18 +80,34 @@ private[sql] case class ParquetTableScan( override def execute(): RDD[Row] = { import parquet.filter2.compat.FilterCompat.FilterPredicateCompat + var partMap = mutable.HashMap[String, String]() val sc = sqlContext.sparkContext val job = new Job(sc.hadoopConfiguration) ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport]) val conf: Configuration = ContextUtil.getConfiguration(job) - relation.path.split(",").foreach { curPath => - val qualifiedPath = { - val path = new Path(curPath) - path.getFileSystem(conf).makeQualified(path) + if (requestedPartitionOrdinals.nonEmpty) { + val partVals = relation.partitionValues.split(",") + var i = 0 + relation.path.split(",").foreach { curPath => + val partition = partVals.apply(i) + i += 1 + val qualifiedPath = { + val path = new Path(curPath) + path.getFileSystem(conf).makeQualified(path) + } + partMap += curPath->partition; + NewFileInputFormat.addInputPath(job, qualifiedPath) + } + } else { + relation.path.split(",").foreach { curPath => + val qualifiedPath = { + val path = new Path(curPath) + path.getFileSystem(conf).makeQualified(path) + } + NewFileInputFormat.addInputPath(job, qualifiedPath) } - NewFileInputFormat.addInputPath(job, qualifiedPath) } // Store both requested and original schema in `Configuration` @@ -135,15 +151,18 @@ private[sql] case class ParquetTableScan( baseRDD.mapPartitionsWithInputSplit { case (split, iter) => val partValue = "([^=]+)=([^=]+)".r + val iSplit = split.asInstanceOf[parquet.hadoop.ParquetInputSplit] + .getPath + .toString val partValues = - split.asInstanceOf[parquet.hadoop.ParquetInputSplit] - .getPath - .toString - .split("/") - .flatMap { - case partValue(key, value) => Some(key -> value) - case _ => None - }.toMap + partMap.get( + iSplit.splitAt(iSplit.lastIndexOf("/"))._1) + .get + .split("/") + .flatMap { + case partValue(key, value) => Some(key -> value) + case _ => None + }.toMap // Convert the partitioning attributes into the correct types val partitionRowValues = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 5f7e897295117..16f9e33a9273c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -60,7 +60,7 @@ private[hive] trait HiveStrategies { implicit class LogicalPlanHacks(s: DataFrame) { def lowerCase: DataFrame = DataFrame(s.sqlContext, s.logicalPlan) - def addPartitioningAttributes(attrs: Seq[Attribute]): DataFrame = { + def addPartitioningAttributes(attrs: Seq[Attribute], partVals: String): DataFrame = { // Don't add the partitioning key if its already present in the data. if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) { s @@ -68,7 +68,8 @@ private[hive] trait HiveStrategies { DataFrame( s.sqlContext, s.logicalPlan transform { - case p: ParquetRelation => p.copy(partitioningAttributes = attrs) + case p: ParquetRelation => p.copy(partitioningAttributes = attrs, + partitionValues = partVals) }) } } @@ -137,14 +138,15 @@ private[hive] trait HiveStrategies { pruningCondition(inputData) } - val partitionLocations = partitions.map(_.getLocation) + val partitionLocations = partitions.map(part => part.getLocation) + val partitionNames = partitions.map(part => part.getName) if (partitionLocations.isEmpty) { PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil } else { hiveContext .parquetFile(partitionLocations: _*) - .addPartitioningAttributes(relation.partitionKeys) + .addPartitioningAttributes(relation.partitionKeys, partitionNames.mkString(",")) .lowerCase .where(unresolvedOtherPredicates) .select(unresolvedProjection: _*)