From 594ec0299fd4ce7fd4d45e8d6fa7c7ba74e3cb39 Mon Sep 17 00:00:00 2001 From: Olivier Toupin Date: Fri, 19 Jun 2015 15:51:41 -0400 Subject: [PATCH 1/2] More efficient way of poking the namenode. --- .../apache/spark/sql/sources/interfaces.scala | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 25887ba9a15b0..07132cc3d010c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.sources -import scala.collection.mutable +import scala.collection.{AbstractIterator, mutable} import scala.util.Try import org.apache.hadoop.conf.Configuration @@ -379,31 +379,25 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] def refresh(): Unit = { - // We don't filter files/directories whose name start with "_" except "_temporary" here, as - // specific data sources may take advantages over them (e.g. Parquet _metadata and - // _common_metadata files). "_temporary" directories are explicitly ignored since failed - // tasks/jobs may leave partial/corrupted data files there. - def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = { - if (status.getPath.getName.toLowerCase == "_temporary") { - Set.empty - } else { - val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) - val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus] - files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir)) - } - } leafFiles.clear() - val statuses = paths.flatMap { path => + val statuses = paths.par.flatMap { path => val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _)) + val it = fs.listFiles(qualified, true) + val its = new AbstractIterator[FileStatus] { + def hasNext: Boolean = it.hasNext + def next(): FileStatus = { + it.next() + } + } + its }.filterNot { status => // SPARK-8037: Ignores files like ".DS_Store" and other hidden files/directories status.getPath.getName.startsWith(".") - } + }.toArray val files = statuses.filterNot(_.isDir) leafFiles ++= files.map(f => f.getPath -> f).toMap From 717c815f2f76bf0a289a507738e818f510f94fbe Mon Sep 17 00:00:00 2001 From: Olivier Toupin Date: Fri, 26 Jun 2015 10:26:01 -0400 Subject: [PATCH 2/2] Lazy footers discovery + Trust metastore schema. This reduce the number of poked files drastically on when a table have a lot of files and/or partitions --- .../apache/spark/sql/parquet/newParquet.scala | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 36b0047c495cd..231d6661ab6c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -122,7 +122,7 @@ private[sql] class ParquetRelation2( .map(DataType.fromJson(_).asInstanceOf[StructType]) private lazy val metadataCache: MetadataCache = { - val meta = new MetadataCache + val meta = new MetadataCache() meta.refresh() meta } @@ -249,8 +249,6 @@ private[sql] class ParquetRelation2( // Create the function to set input paths at the driver side. val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _ - val footers = inputFiles.map(f => metadataCache.footers(f.getPath)) - Utils.withDummyCallSite(sqlContext.sparkContext) { // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects @@ -277,12 +275,6 @@ private[sql] class ParquetRelation2( f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority) }.toSeq - @transient val cachedFooters = footers.map { f => - // In order to encode the authority of a Path containing special characters such as /, - // we need to use the string returned by the URI of the path to create a new Path. - new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata) - }.toSeq - private def escapePathUserInfo(path: Path): Path = { val uri = path.toUri new Path(new URI( @@ -295,7 +287,6 @@ private[sql] class ParquetRelation2( val inputFormat = if (cacheMetadata) { new FilteringParquetRowInputFormat { override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses - override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters } } else { new FilteringParquetRowInputFormat @@ -313,6 +304,8 @@ private[sql] class ParquetRelation2( } private class MetadataCache { + import ParquetRelation2.RelationMemo + // `FileStatus` objects of all "_metadata" files. private var metadataStatuses: Array[FileStatus] = _ @@ -320,7 +313,7 @@ private[sql] class ParquetRelation2( private var commonMetadataStatuses: Array[FileStatus] = _ // Parquet footer cache. - var footers: Map[Path, Footer] = _ + var footers: RelationMemo[Map[Path, Footer]] = _ // `FileStatus` objects of all data files (Parquet part-files). var dataStatuses: Array[FileStatus] = _ @@ -347,8 +340,9 @@ private[sql] class ParquetRelation2( commonMetadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE) - footers = { - val conf = SparkHadoopUtil.get.conf + val conf = SparkHadoopUtil.get.conf + + def getFooters() = { val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true) val rawFooters = if (shouldMergeSchemas) { ParquetFileReader.readAllFootersInParallel( @@ -361,13 +355,15 @@ private[sql] class ParquetRelation2( rawFooters.map(footer => footer.getFile -> footer).toMap } + footers = new RelationMemo(getFooters()) + // If we already get the schema, don't need to re-compute it since the schema merging is // time-consuming. if (dataSchema == null) { dataSchema = { val dataSchema0 = maybeDataSchema - .orElse(readSchema()) .orElse(maybeMetastoreSchema) + .orElse(readSchema()) .getOrElse(throw new AnalysisException( s"Failed to discover schema of Parquet file(s) in the following location(s):\n" + paths.mkString("\n\t"))) @@ -431,12 +427,20 @@ private[sql] class ParquetRelation2( "No schema defined, " + s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.") - ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext) + ParquetRelation2.readSchema(filesToTouch.map(f => footers.value.apply(f.getPath)), sqlContext) } } } private[sql] object ParquetRelation2 extends Logging { + + // TODO: Move to utils + private[sql] class RelationMemo[T](memo: => T) { + lazy val value = { + memo + } + } + // Whether we should merge schemas collected from all Parquet part-files. private[sql] val MERGE_SCHEMA = "mergeSchema"