diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala index 6bba259acc39..cf92a10deabd 100644 --- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala +++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala @@ -26,7 +26,7 @@ private[spark] object StaticSources { * The set of all static sources. These sources may be reported to from any class, including * static classes, without requiring reference to a SparkEnv. */ - val allSources = Seq(CodegenMetrics) + val allSources = Seq(CodegenMetrics, HiveCatalogMetrics) } /** @@ -60,3 +60,35 @@ object CodegenMetrics extends Source { val METRIC_GENERATED_METHOD_BYTECODE_SIZE = metricRegistry.histogram(MetricRegistry.name("generatedMethodSize")) } + +/** + * :: Experimental :: + * Metrics for access to the hive external catalog. + */ +@Experimental +object HiveCatalogMetrics extends Source { + override val sourceName: String = "HiveExternalCatalog" + override val metricRegistry: MetricRegistry = new MetricRegistry() + + /** + * Tracks the total number of partition metadata entries fetched via the client api. + */ + val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched")) + + /** + * Tracks the total number of files discovered off of the filesystem by ListingFileCatalog. + */ + val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered")) + + /** + * Resets the values of all metrics to zero. This is useful in tests. + */ + def reset(): Unit = { + METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount()) + METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount()) + } + + // clients can use these to avoid classloader issues with the codahale classes + def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n) + def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 348d3d0be215..a5e02523d288 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -198,11 +198,12 @@ abstract class ExternalCatalog { partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] /** - * List the metadata of selected partitions according to the given partition predicates. + * List the metadata of partitions that belong to the specified table, assuming it exists, that + * satisfy the given partition-pruning predicate expressions. * * @param db database name * @param table table name - * @param predicates partition predicated + * @param predicates partition-pruning predicates */ def listPartitionsByFilter( db: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 49280f82e20b..f95c9f8cfa2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -482,7 +482,9 @@ class InMemoryCatalog( db: String, table: String, predicates: Seq[Expression]): Seq[CatalogTablePartition] = { - throw new UnsupportedOperationException("listPartitionsByFilter is not implemented.") + // TODO: Provide an implementation + throw new UnsupportedOperationException( + "listPartitionsByFilter is not implemented for InMemoryCatalog") } // -------------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 51326ca25e9c..1a57a7707caa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -20,11 +20,11 @@ package org.apache.spark.sql.catalyst.catalog import java.util.Date import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} /** @@ -97,6 +97,15 @@ case class CatalogTablePartition( output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")") } + + /** + * Given the partition schema, returns a row with that schema holding the partition values. + */ + def toRow(partitionSchema: StructType): InternalRow = { + InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) => + Cast(Literal(spec(name)), dataType).eval() + }) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index e59a483075c9..9b9f54e046fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView} -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery} @@ -2602,7 +2602,7 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ def inputFiles: Array[String] = { - val files: Seq[String] = logicalPlan.collect { + val files: Seq[String] = queryExecution.optimizedPlan.collect { case LogicalRelation(fsBasedRelation: FileRelation, _, _) => fsBasedRelation.inputFiles case fr: FileRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 83b7c779ab81..92fd366e101f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -185,7 +185,7 @@ class CacheManager extends Logging { plan match { case lr: LogicalRelation => lr.relation match { case hr: HadoopFsRelation => - val invalidate = hr.location.paths + val invalidate = hr.location.rootPaths .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory)) .contains(qualifiedPath) if (invalidate) hr.location.refresh() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 6cdba406937d..623d2be55dce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -225,13 +225,27 @@ case class FileSourceScanExec( } // These metadata values make scan plans uniquely identifiable for equality checking. - override val metadata: Map[String, String] = Map( - "Format" -> relation.fileFormat.toString, - "ReadSchema" -> outputSchema.catalogString, - "Batched" -> supportsBatch.toString, - "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"), - "PushedFilters" -> dataFilters.mkString("[", ", ", "]"), - "InputPaths" -> relation.location.paths.mkString(", ")) + override val metadata: Map[String, String] = { + def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") + val location = relation.location + val locationDesc = + location.getClass.getSimpleName + seqToString(location.rootPaths) + val metadata = + Map( + "Format" -> relation.fileFormat.toString, + "ReadSchema" -> outputSchema.catalogString, + "Batched" -> supportsBatch.toString, + "PartitionFilters" -> seqToString(partitionFilters), + "PushedFilters" -> seqToString(dataFilters), + "Location" -> locationDesc) + val withOptPartitionCount = + relation.partitionSchemaOption.map { _ => + metadata + ("PartitionCount" -> selectedPartitions.size.toString) + } getOrElse { + metadata + } + withOptPartitionCount + } private lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 8b762b5d6c5f..981728331d36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.ExperimentalMethods import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate import org.apache.spark.sql.internal.SQLConf @@ -32,5 +33,6 @@ class SparkOptimizer( override def batches: Seq[Batch] = super.batches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+ Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+ + Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+ Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index a04a13e698c4..a8c75a7f29ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -67,7 +67,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo dataSource match { case fs: HadoopFsRelation => - if (table.tableType == CatalogTableType.EXTERNAL && fs.location.paths.isEmpty) { + if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) { throw new AnalysisException( "Cannot create a file-based external data source table without path") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e75e7d2770b4..92b1fff7d812 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -471,9 +471,7 @@ case class DataSource( val existingPartitionColumns = Try { resolveRelation() .asInstanceOf[HadoopFsRelation] - .location - .partitionSpec() - .partitionColumns + .partitionSchema .fieldNames .toSeq }.getOrElse(Seq.empty[String]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 6f9ed50a02b0..7d0abe86a44d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -163,14 +163,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { if query.resolved && t.schema.asNullable == query.schema.asNullable => // Sanity checks - if (t.location.paths.size != 1) { + if (t.location.rootPaths.size != 1) { throw new AnalysisException( "Can only write data to relations with a single path.") } - val outputPath = t.location.paths.head + val outputPath = t.location.rootPaths.head val inputPaths = query.collect { - case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.paths + case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths }.flatten val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append @@ -184,7 +184,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver), t.bucketSpec, t.fileFormat, - () => t.refresh(), + () => t.location.refresh(), t.options, query, mode) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index bde2d2b89d56..e7239ef91b32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType + /** * Used to read and write data stored in files to/from the [[InternalRow]] format. */ @@ -182,16 +183,17 @@ abstract class TextBasedFileFormat extends FileFormat { case class Partition(values: InternalRow, files: Seq[FileStatus]) /** - * An interface for objects capable of enumerating the files that comprise a relation as well - * as the partitioning characteristics of those files. + * An interface for objects capable of enumerating the root paths of a relation as well as the + * partitions of a relation subject to some pruning expressions. */ -trait FileCatalog { - - /** Returns the list of input paths from which the catalog will get files. */ - def paths: Seq[Path] +trait BasicFileCatalog { - /** Returns the specification of the partitions inferred from the data. */ - def partitionSpec(): PartitionSpec + /** + * Returns the list of root input paths from which the catalog will get files. There may be a + * single root path from which partitions are discovered, or individual partitions may be + * specified by each path. + */ + def rootPaths: Seq[Path] /** * Returns all valid files grouped into partitions when the data is partitioned. If the data is @@ -204,9 +206,33 @@ trait FileCatalog { */ def listFiles(filters: Seq[Expression]): Seq[Partition] + /** Returns the list of files that will be read when scanning this relation. */ + def inputFiles: Array[String] + + /** Refresh any cached file listings */ + def refresh(): Unit + + /** Sum of table file sizes, in bytes */ + def sizeInBytes: Long +} + +/** + * A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from + * those, infer the relation's partition specification. + */ +// TODO: Consider a more descriptive, appropriate name which suggests this is a file catalog for +// which it is safe to list all of its files? +trait FileCatalog extends BasicFileCatalog { + + /** Returns the specification of the partitions inferred from the data. */ + def partitionSpec(): PartitionSpec + /** Returns all the valid files. */ def allFiles(): Seq[FileStatus] - /** Refresh the file listing */ - def refresh(): Unit + /** Returns the list of files that will be read when scanning this relation. */ + override def inputFiles: Array[String] = + allFiles().map(_.getPath.toUri.toString).toArray + + override def sizeInBytes: Long = allFiles().map(_.getLen).sum } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index c7ebe0b76a15..db889edf032d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -28,8 +28,8 @@ import org.apache.spark.sql.types.StructType * Acts as a container for all of the metadata required to read from a datasource. All discovery, * resolution and merging logic for schemas and partitions has been removed. * - * @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise - * this relation. + * @param location A [[BasicFileCatalog]] that can enumerate the locations of all the files that + * comprise this relation. * @param partitionSchema The schema of the columns (if any) that are used to partition the relation * @param dataSchema The schema of any remaining columns. Note that if any partition columns are * present in the actual data files as well, they are preserved. @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType * @param options Configuration used when reading / writing data. */ case class HadoopFsRelation( - location: FileCatalog, + location: BasicFileCatalog, partitionSchema: StructType, dataSchema: StructType, bucketSpec: Option[BucketSpec], @@ -58,10 +58,6 @@ case class HadoopFsRelation( def partitionSchemaOption: Option[StructType] = if (partitionSchema.isEmpty) None else Some(partitionSchema) - def partitionSpec: PartitionSpec = location.partitionSpec() - - def refresh(): Unit = location.refresh() - override def toString: String = { fileFormat match { case source: DataSourceRegister => source.shortName() @@ -69,9 +65,7 @@ case class HadoopFsRelation( } } - /** Returns the list of files that will be read when scanning this relation. */ - override def inputFiles: Array[String] = - location.allFiles().map(_.getPath.toUri.toString).toArray + override def sizeInBytes: Long = location.sizeInBytes - override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum + override def inputFiles: Array[String] = location.inputFiles } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index a68ae523e0fa..6d10501b7265 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -17,32 +17,26 @@ package org.apache.spark.sql.execution.datasources -import java.io.FileNotFoundException - import scala.collection.mutable -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ -import org.apache.hadoop.mapred.{FileInputFormat, JobConf} -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration /** * A [[FileCatalog]] that generates the list of files to process by recursively listing all the * files present in `paths`. * + * @param rootPaths the list of root table paths to scan * @param parameters as set of options to control discovery - * @param paths a list of paths to scan * @param partitionSchema an optional partition schema that will be use to provide types for the * discovered partitions */ class ListingFileCatalog( sparkSession: SparkSession, - override val paths: Seq[Path], + override val rootPaths: Seq[Path], parameters: Map[String, String], partitionSchema: Option[StructType]) extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) { @@ -70,198 +64,17 @@ class ListingFileCatalog( } override def refresh(): Unit = { - val files = listLeafFiles(paths) + val files = listLeafFiles(rootPaths) cachedLeafFiles = new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) cachedPartitionSpec = null } - /** - * List leaf files of given paths. This method will submit a Spark job to do parallel - * listing whenever there is a path having more files than the parallel partition discovery - * discovery threshold. - * - * This is publicly visible for testing. - */ - def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { - val files = - if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) - } else { - ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf) - } - - mutable.LinkedHashSet(files: _*) - } - override def equals(other: Any): Boolean = other match { - case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet + case hdfs: ListingFileCatalog => rootPaths.toSet == hdfs.rootPaths.toSet case _ => false } - override def hashCode(): Int = paths.toSet.hashCode() -} - - -object ListingFileCatalog extends Logging { - - /** A serializable variant of HDFS's BlockLocation. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) - - /** - * List a collection of path recursively. - */ - private def listLeafFilesInSerial( - paths: Seq[Path], - hadoopConf: Configuration): Seq[FileStatus] = { - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val filter = FileInputFormat.getInputPathFilter(jobConf) - - paths.flatMap { path => - val fs = path.getFileSystem(hadoopConf) - listLeafFiles0(fs, path, filter) - } - } - - /** - * List a collection of path recursively in parallel (using Spark executors). - * Each task launched will use [[listLeafFilesInSerial]] to list. - */ - private def listLeafFilesInParallel( - paths: Seq[Path], - hadoopConf: Configuration, - sparkSession: SparkSession): Seq[FileStatus] = { - assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) - logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") - - val sparkContext = sparkSession.sparkContext - val serializableConfiguration = new SerializableConfiguration(hadoopConf) - val serializedPaths = paths.map(_.toString) - - // Set the number of parallelism to prevent following file listing from generating many tasks - // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, 10000) - - val statuses = sparkContext - .parallelize(serializedPaths, numParallelism) - .mapPartitions { paths => - val hadoopConf = serializableConfiguration.value - listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator - }.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - }.collect() - - // Turn SerializableFileStatus back to Status - statuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) - } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)), - blockLocations) - } - } - - /** - * List a single path, provided as a FileStatus, in serial. - */ - private def listLeafFiles0( - fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = { - logTrace(s"Listing $path") - val name = path.getName.toLowerCase - if (shouldFilterOut(name)) { - Seq.empty[FileStatus] - } else { - // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist - // Note that statuses only include FileStatus for the files and dirs directly under path, - // and does not include anything else recursively. - val statuses = try fs.listStatus(path) catch { - case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } - - val allLeafStatuses = { - val (dirs, files) = statuses.partition(_.isDirectory) - val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter)) - if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats - } - - allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { - case f: LocatedFileStatus => - f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of - // paths exceeds threshold. - case f => - // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), - // which is very slow on some file system (RawLocalFileSystem, which is launch a - // subprocess and parse the stdout). - val locations = fs.getFileBlockLocations(f, 0, f.getLen) - val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) - if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) - } - lfs - } - } - } - - /** Checks if we should filter out this path name. */ - def shouldFilterOut(pathName: String): Boolean = { - // We filter everything that starts with _ and ., except _common_metadata and _metadata - // because Parquet needs to find those metadata files from leaf files returned by this method. - // We should refactor this logic to not mix metadata files with data files. - ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) && - !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") - } + override def hashCode(): Int = rootPaths.toSet.hashCode() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index d9562fd32e87..7c28d48f2641 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -94,7 +94,7 @@ case class LogicalRelation( } override def refresh(): Unit = relation match { - case fs: HadoopFsRelation => fs.refresh() + case fs: HadoopFsRelation => fs.location.refresh() case _ => // Do nothing. } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index 702ba97222e3..b2508115c282 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -21,7 +21,6 @@ import scala.collection.mutable import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions._ @@ -40,9 +39,10 @@ abstract class PartitioningAwareFileCatalog( sparkSession: SparkSession, parameters: Map[String, String], partitionSchema: Option[StructType]) - extends FileCatalog with Logging { + extends SessionFileCatalog(sparkSession) with FileCatalog { + import PartitioningAwareFileCatalog.BASE_PATH_PARAM - protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) + override protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] @@ -72,8 +72,8 @@ abstract class PartitioningAwareFileCatalog( override def allFiles(): Seq[FileStatus] = { if (partitionSpec().partitionColumns.isEmpty) { - // For each of the input paths, get the list of files inside them - paths.flatMap { path => + // For each of the root input paths, get the list of files inside them + rootPaths.flatMap { path => // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). val fs = path.getFileSystem(hadoopConf) val qualifiedPathPre = fs.makeQualified(path) @@ -105,8 +105,6 @@ abstract class PartitioningAwareFileCatalog( protected def inferPartitioning(): PartitionSpec = { // We use leaf dirs containing data files to discover the schema. val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => - // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be - // counted as data files, so that they shouldn't participate partition discovery. files.exists(f => isDataPath(f.getPath)) }.keys.toSeq partitionSchema match { @@ -194,24 +192,30 @@ abstract class PartitioningAwareFileCatalog( * and the returned DataFrame will have the column of `something`. */ private def basePaths: Set[Path] = { - parameters.get("basePath").map(new Path(_)) match { + parameters.get(BASE_PATH_PARAM).map(new Path(_)) match { case Some(userDefinedBasePath) => val fs = userDefinedBasePath.getFileSystem(hadoopConf) if (!fs.isDirectory(userDefinedBasePath)) { - throw new IllegalArgumentException("Option 'basePath' must be a directory") + throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory") } Set(fs.makeQualified(userDefinedBasePath)) case None => - paths.map { path => + rootPaths.map { path => // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path) if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet } } + // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be + // counted as data files, so that they shouldn't participate partition discovery. private def isDataPath(path: Path): Boolean = { val name = path.getName !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) } } + +object PartitioningAwareFileCatalog { + val BASE_PATH_PARAM = "basePath" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala new file mode 100644 index 000000000000..29121a47d92d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule + +private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { + case op @ PhysicalOperation(projects, filters, + logicalRelation @ + LogicalRelation(fsRelation @ + HadoopFsRelation( + tableFileCatalog: TableFileCatalog, + partitionSchema, + _, + _, + _, + _), + _, + _)) + if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => + // The attribute name of predicate could be different than the one in schema in case of + // case insensitive, we should change them to match the one in schema, so we donot need to + // worry about case sensitivity anymore. + val normalizedFilters = filters.map { e => + e transform { + case a: AttributeReference => + a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name) + } + } + + val sparkSession = fsRelation.sparkSession + val partitionColumns = + logicalRelation.resolve( + partitionSchema, sparkSession.sessionState.analyzer.resolver) + val partitionSet = AttributeSet(partitionColumns) + val partitionKeyFilters = + ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) + + if (partitionKeyFilters.nonEmpty) { + val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq) + val prunedFsRelation = + fsRelation.copy(location = prunedFileCatalog)(sparkSession) + val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) + + // Keep partition-pruning predicates so that they are visible in physical planning + val filterExpression = filters.reduceLeft(And) + val filter = Filter(filterExpression, prunedLogicalRelation) + Project(projects, filter) + } else { + op + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala new file mode 100644 index 000000000000..4807a92c2e6b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.FileNotFoundException + +import scala.collection.mutable + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} + +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.SerializableConfiguration + + +/** + * A base class for [[BasicFileCatalog]]s that need a [[SparkSession]] and the ability to find leaf + * files in a list of HDFS paths. + * + * @param sparkSession a [[SparkSession]] + * @param ignoreFileNotFound (see [[ListingFileCatalog]]) + */ +abstract class SessionFileCatalog(sparkSession: SparkSession) + extends BasicFileCatalog with Logging { + protected val hadoopConf: Configuration + + /** + * List leaf files of given paths. This method will submit a Spark job to do parallel + * listing whenever there is a path having more files than the parallel partition discovery + * discovery threshold. + * + * This is publicly visible for testing. + */ + def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + val files = + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + SessionFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) + } else { + SessionFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + } + + HiveCatalogMetrics.incrementFilesDiscovered(files.size) + mutable.LinkedHashSet(files: _*) + } +} + +object SessionFileCatalog extends Logging { + + /** A serializable variant of HDFS's BlockLocation. */ + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + /** A serializable variant of HDFS's FileStatus. */ + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** + * List a collection of path recursively. + */ + private def listLeafFilesInSerial( + paths: Seq[Path], + hadoopConf: Configuration): Seq[FileStatus] = { + // Dummy jobconf to get to the pathFilter defined in configuration + val jobConf = new JobConf(hadoopConf, this.getClass) + val filter = FileInputFormat.getInputPathFilter(jobConf) + + paths.flatMap { path => + val fs = path.getFileSystem(hadoopConf) + listLeafFiles0(fs, path, filter) + } + } + + /** + * List a collection of path recursively in parallel (using Spark executors). + * Each task launched will use [[listLeafFilesInSerial]] to list. + */ + private def listLeafFilesInParallel( + paths: Seq[Path], + hadoopConf: Configuration, + sparkSession: SparkSession): Seq[FileStatus] = { + assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) + logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") + + val sparkContext = sparkSession.sparkContext + val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val serializedPaths = paths.map(_.toString) + + // Set the number of parallelism to prevent following file listing from generating many tasks + // in case of large #defaultParallelism. + val numParallelism = Math.min(paths.size, 10000) + + val statuses = sparkContext + .parallelize(serializedPaths, numParallelism) + .mapPartitions { paths => + val hadoopConf = serializableConfiguration.value + listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator + }.map { status => + // Turn FileStatus into SerializableFileStatus so we can send it back to the driver + val blockLocations = status match { + case f: LocatedFileStatus => + f.getBlockLocations.map { loc => + SerializableBlockLocation( + loc.getNames, + loc.getHosts, + loc.getOffset, + loc.getLength) + } + + case _ => + Array.empty[SerializableBlockLocation] + } + + SerializableFileStatus( + status.getPath.toString, + status.getLen, + status.isDirectory, + status.getReplication, + status.getBlockSize, + status.getModificationTime, + status.getAccessTime, + blockLocations) + }.collect() + + // Turn SerializableFileStatus back to Status + statuses.map { f => + val blockLocations = f.blockLocations.map { loc => + new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) + } + new LocatedFileStatus( + new FileStatus( + f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)), + blockLocations) + } + } + + /** + * List a single path, provided as a FileStatus, in serial. + */ + private def listLeafFiles0( + fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = { + logTrace(s"Listing $path") + val name = path.getName.toLowerCase + if (shouldFilterOut(name)) { + Seq.empty[FileStatus] + } else { + // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist + // Note that statuses only include FileStatus for the files and dirs directly under path, + // and does not include anything else recursively. + val statuses = try fs.listStatus(path) catch { + case _: FileNotFoundException => + logWarning(s"The directory $path was not found. Was it deleted very recently?") + Array.empty[FileStatus] + } + + val allLeafStatuses = { + val (dirs, files) = statuses.partition(_.isDirectory) + val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter)) + if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats + } + + allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { + case f: LocatedFileStatus => + f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `listLeafFilesInParallel` when the number of + // paths exceeds threshold. + case f => + // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), + // which is very slow on some file system (RawLocalFileSystem, which is launch a + // subprocess and parse the stdout). + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, + f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { + lfs.setSymlink(f.getSymlink) + } + lfs + } + } + } + + /** Checks if we should filter out this path name. */ + def shouldFilterOut(pathName: String): Boolean = { + // We filter everything that starts with _ and ., except _common_metadata and _metadata + // because Parquet needs to find those metadata files from leaf files returned by this method. + // We should refactor this logic to not mix metadata files with data files. + ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) && + !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala new file mode 100644 index 000000000000..a5c41b244589 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.StructType + + +/** + * A [[BasicFileCatalog]] for a metastore catalog table. + * + * @param sparkSession a [[SparkSession]] + * @param db the table's database name + * @param table the table's (unqualified) name + * @param partitionSchema the schema of a partitioned table's partition columns + * @param sizeInBytes the table's data size in bytes + */ +class TableFileCatalog( + sparkSession: SparkSession, + db: String, + table: String, + partitionSchema: Option[StructType], + override val sizeInBytes: Long) + extends SessionFileCatalog(sparkSession) { + + override protected val hadoopConf = sparkSession.sessionState.newHadoopConf + + private val externalCatalog = sparkSession.sharedState.externalCatalog + + private val catalogTable = externalCatalog.getTable(db, table) + + private val baseLocation = catalogTable.storage.locationUri + + override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq + + override def listFiles(filters: Seq[Expression]): Seq[Partition] = { + filterPartitions(filters).listFiles(Nil) + } + + override def refresh(): Unit = {} + + /** + * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions + * specified by the given partition-pruning filters. + * + * @param filters partition-pruning filters + */ + def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = { + if (filters.isEmpty) { + cachedAllPartitions + } else { + filterPartitions0(filters) + } + } + + private def filterPartitions0(filters: Seq[Expression]): ListingFileCatalog = { + val parameters = baseLocation + .map(loc => Map(PartitioningAwareFileCatalog.BASE_PATH_PARAM -> loc)) + .getOrElse(Map.empty) + partitionSchema match { + case Some(schema) => + val selectedPartitions = externalCatalog.listPartitionsByFilter(db, table, filters) + val partitions = selectedPartitions.map { p => + PartitionDirectory(p.toRow(schema), p.storage.locationUri.get) + } + val partitionSpec = PartitionSpec(schema, partitions) + new PrunedTableFileCatalog( + sparkSession, new Path(baseLocation.get), partitionSpec) + case None => + new ListingFileCatalog(sparkSession, rootPaths, parameters, None) + } + } + + // Not used in the hot path of queries when metastore partition pruning is enabled + lazy val cachedAllPartitions: ListingFileCatalog = filterPartitions0(Nil) + + override def inputFiles: Array[String] = cachedAllPartitions.inputFiles +} + +/** + * An override of the standard HDFS listing based catalog, that overrides the partition spec with + * the information from the metastore. + * + * @param tableBasePath The default base path of the Hive metastore table + * @param partitionSpec The partition specifications from Hive metastore + */ +private class PrunedTableFileCatalog( + sparkSession: SparkSession, + tableBasePath: Path, + override val partitionSpec: PartitionSpec) + extends ListingFileCatalog( + sparkSession, + partitionSpec.partitions.map(_.path), + Map.empty, + Some(partitionSpec.partitionColumns)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index f1a35dd8a620..4dea8cf29ec5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -269,11 +269,15 @@ private[parquet] object ParquetReadSupport { */ private def clipParquetGroupFields( parquetRecord: GroupType, structType: StructType): Seq[Type] = { - val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap + val parquetFieldMap = parquetRecord.getFields.asScala + .map(f => f.getName -> f).toMap + val caseInsensitiveParquetFieldMap = parquetRecord.getFields.asScala + .map(f => f.getName.toLowerCase -> f).toMap val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false) structType.map { f => parquetFieldMap .get(f.name) + .orElse(caseInsensitiveParquetFieldMap.get(f.name.toLowerCase)) .map(clipParquetType(_, f.dataType)) .getOrElse(toParquet.convertField(f)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala index a32c4671e347..82b67cb1ca6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala @@ -47,7 +47,7 @@ class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path) allFilesFromLog.toArray.groupBy(_.getPath.getParent) } - override def paths: Seq[Path] = path :: Nil + override def rootPaths: Seq[Path] = path :: Nil override def refresh(): Unit = { } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c8447651dd67..e73d0187b584 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -269,6 +269,13 @@ object SQLConf { .booleanConf .createWithDefault(false) + val HIVE_FILESOURCE_PARTITION_PRUNING = + SQLConfigBuilder("spark.sql.hive.filesourcePartitionPruning") + .doc("When true, enable metastore partition pruning for file source tables as well. " + + "This is currently implemented for converted Hive tables only.") + .booleanConf + .createWithDefault(true) + val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly") .doc("When true, enable the metadata-only query optimization that use the table's metadata " + "to produce the partition columns instead of table scans. It applies when all the columns " + @@ -676,6 +683,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) + def filesourcePartitionPruning: Boolean = getConf(HIVE_FILESOURCE_PARTITION_PRUNING) + def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT) def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index fa3abd0098f5..2695974b84b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -77,13 +77,14 @@ class FileCatalogSuite extends SharedSQLContext { val catalog1 = new ListingFileCatalog( spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None) // doesn't throw an exception - assert(catalog1.listLeafFiles(catalog1.paths).isEmpty) + assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty) } } test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") { class MockCatalog( - override val paths: Seq[Path]) extends PartitioningAwareFileCatalog(spark, Map.empty, None) { + override val rootPaths: Seq[Path]) + extends PartitioningAwareFileCatalog(spark, Map.empty, None) { override def refresh(): Unit = {} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index c5deb31fec18..c32254d9dfde 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -395,7 +395,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi val fileCatalog = new ListingFileCatalog( sparkSession = spark, - paths = Seq(new Path(tempDir)), + rootPaths = Seq(new Path(tempDir)), parameters = Map.empty[String, String], partitionSchema = None) // This should not fail. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala similarity index 66% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala index f15730aeb11f..df509583377a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala @@ -19,16 +19,16 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.SparkFunSuite -class ListingFileCatalogSuite extends SparkFunSuite { +class SessionFileCatalogSuite extends SparkFunSuite { test("file filtering") { - assert(!ListingFileCatalog.shouldFilterOut("abcd")) - assert(ListingFileCatalog.shouldFilterOut(".ab")) - assert(ListingFileCatalog.shouldFilterOut("_cd")) + assert(!SessionFileCatalog.shouldFilterOut("abcd")) + assert(SessionFileCatalog.shouldFilterOut(".ab")) + assert(SessionFileCatalog.shouldFilterOut("_cd")) - assert(!ListingFileCatalog.shouldFilterOut("_metadata")) - assert(!ListingFileCatalog.shouldFilterOut("_common_metadata")) - assert(ListingFileCatalog.shouldFilterOut("_ab_metadata")) - assert(ListingFileCatalog.shouldFilterOut("_cd_common_metadata")) + assert(!SessionFileCatalog.shouldFilterOut("_metadata")) + assert(!SessionFileCatalog.shouldFilterOut("_common_metadata")) + assert(SessionFileCatalog.shouldFilterOut("_ab_metadata")) + assert(SessionFileCatalog.shouldFilterOut("_cd_common_metadata")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 8d18be9300f7..43357c97c395 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -30,7 +30,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec} +import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -626,8 +626,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution queryExecution.analyzed.collectFirst { - case LogicalRelation(relation: HadoopFsRelation, _, _) => - assert(relation.partitionSpec === PartitionSpec.emptySpec) + case LogicalRelation(HadoopFsRelation(location: FileCatalog, _, _, _, _, _), _, _) => + assert(location.partitionSpec === PartitionSpec.emptySpec) }.getOrElse { fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 8a980a7eb538..c3d202ced24c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1080,6 +1080,34 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } } + testSchemaClipping( + "falls back to case insensitive resolution", + + parquetSchema = + """message root { + | required group A { + | optional int32 B; + | } + | optional int32 c; + |} + """.stripMargin, + + catalystSchema = { + val nestedType = new StructType().add("b", IntegerType, nullable = true) + new StructType() + .add("a", nestedType, nullable = true) + .add("c", IntegerType, nullable = true) + }, + + expectedSchema = + """message root { + | required group A { + | optional int32 B; + | } + | optional int32 c; + |} + """.stripMargin) + testSchemaClipping( "simple nested struct", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index b5d93c3d7c80..ff59b54f5390 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -29,17 +29,17 @@ import org.apache.thrift.TException import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils} import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.StaticSQLConf._ -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.{DataType, StructField, StructType} /** @@ -650,8 +650,35 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def listPartitionsByFilter( db: String, table: String, - predicates: Seq[Expression]): Seq[CatalogTablePartition] = { - client.getPartitionsByFilter(db, table, predicates) + predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient { + val catalogTable = client.getTable(db, table) + val partitionColumnNames = catalogTable.partitionColumnNames.toSet + val nonPartitionPruningPredicates = predicates.filterNot { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + + if (nonPartitionPruningPredicates.nonEmpty) { + sys.error("Expected only partition pruning predicates: " + + predicates.reduceLeft(And)) + } + + val partitionSchema = catalogTable.partitionSchema + + if (predicates.nonEmpty) { + val clientPrunedPartitions = + client.getPartitionsByFilter(catalogTable, predicates) + val boundPredicate = + InterpretedPredicate.create(predicates.reduce(And).transform { + case att: AttributeReference => + val index = partitionSchema.indexWhere(_.name == att.name) + BoundReference(index, partitionSchema(index).dataType, nullable = true) + }) + clientPrunedPartitions.filter { case p: CatalogTablePartition => + boundPredicate(p.toRow(partitionSchema)) + } + } else { + client.getPartitions(catalogTable) + } } // -------------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c44f0adda44c..4a2aaa7d4f6c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -135,12 +135,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log private def getCached( tableIdentifier: QualifiedTableName, - pathsInMetastore: Seq[String], + pathsInMetastore: Seq[Path], metastoreRelation: MetastoreRelation, schemaInMetastore: StructType, expectedFileFormat: Class[_ <: FileFormat], expectedBucketSpec: Option[BucketSpec], - partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { + partitionSchema: Option[StructType]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss @@ -152,12 +152,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // If we have the same paths, same schema, and same partition spec, // we will use the cached relation. val useCached = - relation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet && + relation.location.rootPaths.toSet == pathsInMetastore.toSet && logical.schema.sameType(schemaInMetastore) && relation.bucketSpec == expectedBucketSpec && - relation.partitionSpec == partitionSpecInMetastore.getOrElse { - PartitionSpec(StructType(Nil), Array.empty[PartitionDirectory]) - } + relation.partitionSchema == partitionSchema.getOrElse(StructType(Nil)) if (useCached) { Some(logical) @@ -196,61 +194,59 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. + val lazyPruningEnabled = sparkSession.sqlContext.conf.filesourcePartitionPruning val result = if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) - val partitionColumnDataTypes = partitionSchema.map(_.dataType) - // We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore - // are empty. - val partitions = metastoreRelation.getHiveQlPartitions().map { p => - val location = p.getLocation - val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map { - case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) - }) - PartitionDirectory(values, location) - } - val partitionSpec = PartitionSpec(partitionSchema, partitions) - val partitionPaths = partitions.map(_.path.toString) - - // By convention (for example, see MetaStorePartitionedTableFileCatalog), the definition of a - // partitioned table's paths depends on whether that table has any actual partitions. - // Partitioned tables without partitions use the location of the table's base path. - // Partitioned tables with partitions use the locations of those partitions' data locations, - // _omitting_ the table's base path. - val paths = if (partitionPaths.isEmpty) { - Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) + + val rootPaths: Seq[Path] = if (lazyPruningEnabled) { + Seq(metastoreRelation.hiveQlTable.getDataLocation) } else { - partitionPaths + // By convention (for example, see TableFileCatalog), the definition of a + // partitioned table's paths depends on whether that table has any actual partitions. + // Partitioned tables without partitions use the location of the table's base path. + // Partitioned tables with partitions use the locations of those partitions' data + // locations,_omitting_ the table's base path. + val paths = metastoreRelation.getHiveQlPartitions().map { p => + new Path(p.getLocation) + } + if (paths.isEmpty) { + Seq(metastoreRelation.hiveQlTable.getDataLocation) + } else { + paths + } } val cached = getCached( tableIdentifier, - paths, + rootPaths, metastoreRelation, metastoreSchema, fileFormatClass, bucketSpec, - Some(partitionSpec)) - - val hadoopFsRelation = cached.getOrElse { - val fileCatalog = new MetaStorePartitionedTableFileCatalog( - sparkSession, - new Path(metastoreRelation.catalogTable.storage.locationUri.get), - partitionSpec) - - val inferredSchema = if (fileType.equals("parquet")) { - val inferredSchema = - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) - inferredSchema.map { inferred => - ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) - }.getOrElse(metastoreSchema) - } else { - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get + Some(partitionSchema)) + + val logicalRelation = cached.getOrElse { + val db = metastoreRelation.databaseName + val table = metastoreRelation.tableName + val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong + val fileCatalog = { + val catalog = new TableFileCatalog( + sparkSession, db, table, Some(partitionSchema), sizeInBytes) + if (lazyPruningEnabled) { + catalog + } else { + catalog.cachedAllPartitions + } } + val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet + val dataSchema = + StructType(metastoreSchema + .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase))) val relation = HadoopFsRelation( location = fileCatalog, partitionSchema = partitionSchema, - dataSchema = inferredSchema, + dataSchema = dataSchema, bucketSpec = bucketSpec, fileFormat = defaultSource, options = options)(sparkSession = sparkSession) @@ -260,12 +256,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log created } - hadoopFsRelation + logicalRelation } else { - val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) + val rootPath = metastoreRelation.hiveQlTable.getDataLocation val cached = getCached(tableIdentifier, - paths, + Seq(rootPath), metastoreRelation, metastoreSchema, fileFormatClass, @@ -276,14 +272,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log LogicalRelation( DataSource( sparkSession = sparkSession, - paths = paths, + paths = rootPath.toString :: Nil, userSpecifiedSchema = Some(metastoreRelation.schema), bucketSpec = bucketSpec, options = options, className = fileType).resolveRelation(), catalogTable = Some(metastoreRelation.catalogTable)) - cachedDataSourceTables.put(tableIdentifier, created) created } @@ -371,34 +366,3 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } } - -/** - * An override of the standard HDFS listing based catalog, that overrides the partition spec with - * the information from the metastore. - * - * @param tableBasePath The default base path of the Hive metastore table - * @param partitionSpec The partition specifications from Hive metastore - */ -private[hive] class MetaStorePartitionedTableFileCatalog( - sparkSession: SparkSession, - tableBasePath: Path, - override val partitionSpec: PartitionSpec) - extends ListingFileCatalog( - sparkSession, - MetaStorePartitionedTableFileCatalog.getPaths(tableBasePath, partitionSpec), - Map.empty, - Some(partitionSpec.partitionColumns)) { -} - -private[hive] object MetaStorePartitionedTableFileCatalog { - /** Get the list of paths to list files in the for a metastore table */ - def getPaths(tableBasePath: Path, partitionSpec: PartitionSpec): Seq[Path] = { - // If there are no partitions currently specified then use base path, - // otherwise use the paths corresponding to the partitions. - if (partitionSpec.partitions.isEmpty) { - Seq(tableBasePath) - } else { - partitionSpec.partitions.map(_.path) - } - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 9ee3d629c997..569a9c11398e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -172,15 +172,24 @@ private[hive] trait HiveClient { * Returns the partitions for the given table that match the supplied partition spec. * If no partition spec is specified, all partitions are returned. */ - def getPartitions( + final def getPartitions( db: String, table: String, + partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = { + getPartitions(getTable(db, table), partialSpec) + } + + /** + * Returns the partitions for the given table that match the supplied partition spec. + * If no partition spec is specified, all partitions are returned. + */ + def getPartitions( + catalogTable: CatalogTable, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] /** Returns partitions filtered by predicates for the given table. */ def getPartitionsByFilter( - db: String, - table: String, + catalogTable: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] /** Loads a static partition into an existing table. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5c8f7ff1af9f..e745a8c5b358 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -37,6 +37,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException} @@ -525,22 +526,24 @@ private[hive] class HiveClientImpl( * If no partition spec is specified, all partitions are returned. */ override def getPartitions( - db: String, - table: String, + table: CatalogTable, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(getTable(db, table)) - spec match { + val hiveTable = toHiveTable(table) + val parts = spec match { case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition) case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition) } + HiveCatalogMetrics.incrementFetchedPartitions(parts.length) + parts } override def getPartitionsByFilter( - db: String, - table: String, + table: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(getTable(db, table)) - shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition) + val hiveTable = toHiveTable(table) + val parts = shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition) + HiveCatalogMetrics.incrementFetchedPartitions(parts.length) + parts } override def listTables(dbName: String): Seq[String] = withHiveState { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index e94f49ea8117..1af3280e18a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -313,7 +313,17 @@ private[orc] object OrcRelation extends HiveInspectors { def setRequiredColumns( conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { - val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) + val caseInsensitiveFieldMap: Map[String, Int] = physicalSchema.fieldNames + .zipWithIndex + .map(f => (f._1.toLowerCase, f._2)) + .toMap + val ids = requestedSchema.map { a => + val exactMatch: Option[Int] = physicalSchema.getFieldIndex(a.name) + val res = exactMatch.getOrElse( + caseInsensitiveFieldMap.getOrElse(a.name, + throw new IllegalArgumentException(s"""Field "$a.name" does not exist."""))) + res: Integer + } val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala index 96e9054cd487..f65e74de87a5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala @@ -17,10 +17,14 @@ package org.apache.spark.sql.hive +import java.io.File + +import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.QueryTest -class HiveDataFrameSuite extends QueryTest with TestHiveSingleton { +class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { test("table name with schema") { // regression test for SPARK-11778 spark.sql("create schema usrdb") @@ -34,4 +38,107 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton { val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client assert(hiveClient.getConf("hive.in.test", "") == "true") } + + private def setupPartitionedTable(tableName: String, dir: File): Unit = { + spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write + .partitionBy("partCol1", "partCol2") + .mode("overwrite") + .parquet(dir.getAbsolutePath) + + spark.sql(s""" + |create external table $tableName (id long) + |partitioned by (partCol1 int, partCol2 int) + |stored as parquet + |location "${dir.getAbsolutePath}"""".stripMargin) + spark.sql(s"msck repair table $tableName") + } + + test("partitioned pruned table reports only selected files") { + assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true") + withTable("test") { + withTempDir { dir => + setupPartitionedTable("test", dir) + val df = spark.sql("select * from test") + assert(df.count() == 5) + assert(df.inputFiles.length == 5) // unpruned + + val df2 = spark.sql("select * from test where partCol1 = 3 or partCol2 = 4") + assert(df2.count() == 2) + assert(df2.inputFiles.length == 2) // pruned, so we have less files + + val df3 = spark.sql("select * from test where PARTCOL1 = 3 or partcol2 = 4") + assert(df3.count() == 2) + assert(df3.inputFiles.length == 2) + + val df4 = spark.sql("select * from test where partCol1 = 999") + assert(df4.count() == 0) + assert(df4.inputFiles.length == 0) + } + } + } + + test("lazy partition pruning reads only necessary partition data") { + withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "true") { + withTable("test") { + withTempDir { dir => + setupPartitionedTable("test", dir) + HiveCatalogMetrics.reset() + spark.sql("select * from test where partCol1 = 999").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + + HiveCatalogMetrics.reset() + spark.sql("select * from test where partCol1 < 2").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2) + + HiveCatalogMetrics.reset() + spark.sql("select * from test where partCol1 < 3").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 3) + + // should read all + HiveCatalogMetrics.reset() + spark.sql("select * from test").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + + // read all should be cached + HiveCatalogMetrics.reset() + spark.sql("select * from test").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + } + } + } + } + + test("all partitions read and cached when filesource partition pruning is off") { + withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "false") { + withTable("test") { + withTempDir { dir => + setupPartitionedTable("test", dir) + + // We actually query the partitions from hive each time the table is resolved in this + // mode. This is kind of terrible, but is needed to preserve the legacy behavior + // of doing plan cache validation based on the entire partition set. + HiveCatalogMetrics.reset() + spark.sql("select * from test where partCol1 = 999").count() + // 5 from table resolution, another 5 from ListingFileCatalog + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + + HiveCatalogMetrics.reset() + spark.sql("select * from test where partCol1 < 2").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + + HiveCatalogMetrics.reset() + spark.sql("select * from test").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + } + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index 3414f5e0409a..7af81a3a9050 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -59,4 +59,45 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi } } } + + def testCaching(pruningEnabled: Boolean): Unit = { + test(s"partitioned table is cached when partition pruning is $pruningEnabled") { + withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> pruningEnabled.toString) { + withTable("test") { + withTempDir { dir => + spark.range(5).selectExpr("id", "id as f1", "id as f2").write + .partitionBy("f1", "f2") + .mode("overwrite") + .parquet(dir.getAbsolutePath) + + spark.sql(s""" + |create external table test (id long) + |partitioned by (f1 int, f2 int) + |stored as parquet + |location "${dir.getAbsolutePath}"""".stripMargin) + spark.sql("msck repair table test") + + val df = spark.sql("select * from test") + assert(sql("select * from test").count() == 5) + + // Delete a file, then assert that we tried to read it. This means the table was cached. + val p = new Path(spark.table("test").inputFiles.head) + assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true)) + val e = intercept[SparkException] { + sql("select * from test").count() + } + assert(e.getMessage.contains("FileNotFoundException")) + + // Test refreshing the cache. + spark.catalog.refreshTable("test") + assert(sql("select * from test").count() == 4) + } + } + } + } + } + + for (pruningEnabled <- Seq(true, false)) { + testCaching(pruningEnabled) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index c158bf1ab09c..9a10957c8efa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -295,12 +295,12 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: getPartitions(catalogTable)") { - assert(2 == client.getPartitions("default", "src_part").size) + assert(2 == client.getPartitions(client.getTable("default", "src_part")).size) } test(s"$version: getPartitionsByFilter") { // Only one partition [1, 1] for key2 == 1 - val result = client.getPartitionsByFilter("default", "src_part", + val result = client.getPartitionsByFilter(client.getTable("default", "src_part"), Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1)))) // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index b2ee49c441ef..ecb597298452 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -474,6 +474,28 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } + test("converted ORC table supports resolving mixed case field") { + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") { + withTable("dummy_orc") { + withTempPath { dir => + val df = spark.range(5).selectExpr("id", "id as valueField", "id as partitionValue") + df.write + .partitionBy("partitionValue") + .mode("overwrite") + .orc(dir.getAbsolutePath) + + spark.sql(s""" + |create external table dummy_orc (id long, valueField long) + |partitioned by (partitionValue int) + |stored as orc + |location "${dir.getAbsolutePath}"""".stripMargin) + spark.sql(s"msck repair table dummy_orc") + checkAnswer(spark.sql("select * from dummy_orc"), df) + } + } + } + } + test("SPARK-14962 Produce correct results on array type with isnotnull") { withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { val data = (0 until 10).map(i => Tuple1(Array(i))) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 2f6d9fb96b82..9fc62a389db4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -175,7 +175,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { (1 to 10).map(i => Tuple1(Seq(new Integer(i), null))).toDF("a") .createOrReplaceTempView("jt_array") - setConf(HiveUtils.CONVERT_METASTORE_PARQUET, true) + assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true") } override def afterAll(): Unit = { @@ -187,7 +187,6 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { "jt", "jt_array", "test_parquet") - setConf(HiveUtils.CONVERT_METASTORE_PARQUET, false) } test(s"conversion is working") { @@ -586,6 +585,23 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { checkAnswer( sql("SELECT * FROM test_added_partitions"), Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b")) + + // Check it with pruning predicates + checkAnswer( + sql("SELECT * FROM test_added_partitions where b = 0"), + Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + checkAnswer( + sql("SELECT * FROM test_added_partitions where b = 1"), + Seq(("baz", 1)).toDF("a", "b")) + checkAnswer( + sql("SELECT * FROM test_added_partitions where b = 2"), + Seq[(String, Int)]().toDF("a", "b")) + + // Also verify the inputFiles implementation + assert(sql("select * from test_added_partitions").inputFiles.length == 2) + assert(sql("select * from test_added_partitions where b = 0").inputFiles.length == 1) + assert(sql("select * from test_added_partitions where b = 1").inputFiles.length == 1) + assert(sql("select * from test_added_partitions where b = 2").inputFiles.length == 0) } } }