Skip to content

Commit 6a08b02

Browse files
committed
WIP: Moves file status cache into HadoopFSRelation
1 parent ce63912 commit 6a08b02

File tree

4 files changed

+76
-58
lines changed

4 files changed

+76
-58
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -234,15 +234,15 @@ private[sql] class ParquetRelation2(
234234
override def buildScan(
235235
requiredColumns: Array[String],
236236
filters: Array[Filter],
237-
inputPaths: Array[String]): RDD[Row] = {
237+
inputFiles: Array[FileStatus]): RDD[Row] = {
238238

239239
val job = new Job(SparkHadoopUtil.get.conf)
240240
val conf = ContextUtil.getConfiguration(job)
241241

242242
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
243243

244-
if (inputPaths.nonEmpty) {
245-
FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
244+
if (inputFiles.nonEmpty) {
245+
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
246246
}
247247

248248
// Try to push down filters when filter push-down is enabled.
@@ -269,10 +269,7 @@ private[sql] class ParquetRelation2(
269269
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
270270
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
271271

272-
val inputFileStatuses =
273-
metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
274-
275-
val footers = inputFileStatuses.map(metadataCache.footers)
272+
val footers = inputFiles.map(metadataCache.footers)
276273

277274
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
278275
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
@@ -287,7 +284,7 @@ private[sql] class ParquetRelation2(
287284

288285
val cacheMetadata = useMetadataCache
289286

290-
@transient val cachedStatuses = inputFileStatuses.map { f =>
287+
@transient val cachedStatuses = inputFiles.map { f =>
291288
// In order to encode the authority of a Path containing special characters such as /,
292289
// we need to use the string returned by the URI of the path to create a new Path.
293290
val pathWithAuthority = new Path(f.getPath.toUri.toString)

sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
5858
filters,
5959
(a, _) => t.buildScan(a)) :: Nil
6060

61-
// Scanning partitioned FSBasedRelation
61+
// Scanning partitioned HadoopFsRelation
6262
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation))
6363
if t.partitionSpec.partitionColumns.nonEmpty =>
6464
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
@@ -86,22 +86,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
8686
t.partitionSpec.partitionColumns,
8787
selectedPartitions) :: Nil
8888

89-
// Scanning non-partitioned FSBasedRelation
89+
// Scanning non-partitioned HadoopFsRelation
9090
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
91-
val inputPaths = t.paths.map(new Path(_)).flatMap { path =>
92-
val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration)
93-
val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
94-
SparkHadoopUtil.get.listLeafStatuses(fs, qualifiedPath).map(_.getPath).filterNot { path =>
95-
val name = path.getName
96-
name.startsWith("_") || name.startsWith(".")
97-
}.map(fs.makeQualified(_).toString)
98-
}
99-
10091
pruneFilterProject(
10192
l,
10293
projectList,
10394
filters,
104-
(a, f) => t.buildScan(a, f, inputPaths)) :: Nil
95+
(a, f) => t.buildScan(a, f, t.paths)) :: Nil
10596

10697
case l @ LogicalRelation(t: TableScan) =>
10798
createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil
@@ -130,16 +121,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
130121

131122
// Builds RDD[Row]s for each selected partition.
132123
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
133-
// Paths to all data files within this partition
134-
val dataFilePaths = {
135-
val dirPath = new Path(dir)
136-
val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf)
137-
fs.listStatus(dirPath).map(_.getPath).filterNot { path =>
138-
val name = path.getName
139-
name.startsWith("_") || name.startsWith(".")
140-
}.map(fs.makeQualified(_).toString)
141-
}
142-
143124
// The table scan operator (PhysicalRDD) which retrieves required columns from data files.
144125
// Notice that the schema of data files, represented by `relation.dataSchema`, may contain
145126
// some partition column(s).
@@ -155,7 +136,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
155136
// assuming partition columns data stored in data files are always consistent with those
156137
// partition values encoded in partition directory paths.
157138
val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains)
158-
val dataRows = relation.buildScan(nonPartitionColumns, filters, dataFilePaths)
139+
val dataRows = relation.buildScan(nonPartitionColumns, filters, Array(dir))
159140

160141
// Merges data values with partition values.
161142
mergeWithPartitionValues(

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
package org.apache.spark.sql.sources
1919

20-
import scala.util.Try
20+
import scala.collection.mutable
2121

2222
import org.apache.hadoop.conf.Configuration
23-
import org.apache.hadoop.fs.{FileStatus, Path}
23+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2424
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
2525

2626
import org.apache.spark.annotation.{DeveloperApi, Experimental}
27-
import org.apache.spark.deploy.SparkHadoopUtil
2827
import org.apache.spark.rdd.RDD
2928
import org.apache.spark.sql._
3029
import org.apache.spark.sql.catalyst.expressions._
@@ -368,18 +367,55 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
368367

369368
private var _partitionSpec: PartitionSpec = _
370369

370+
private class FileStatusCache {
371+
var leafFiles = mutable.Map.empty[Path, FileStatus]
372+
373+
var leafDirs = mutable.Map.empty[Path, FileStatus]
374+
375+
def refresh() = {
376+
def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
377+
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
378+
val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
379+
files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
380+
}
381+
382+
leafDirs.clear()
383+
leafFiles.clear()
384+
385+
val statuses = paths.flatMap { path =>
386+
val hdfsPath = new Path(path)
387+
val fs = hdfsPath.getFileSystem(hadoopConf)
388+
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
389+
listLeafFilesAndDirs(fs, fs.getFileStatus(qualified)).filterNot { status =>
390+
val name = status.getPath.getName
391+
!status.isDir && (name.startsWith("_") || name.startsWith("."))
392+
}
393+
}
394+
395+
val (dirs, files) = statuses.partition(_.isDir)
396+
leafDirs ++= dirs.map(d => d.getPath -> d).toMap
397+
leafFiles ++= files.map(f => f.getPath -> f).toMap
398+
}
399+
}
400+
401+
private lazy val fileStatusCache = {
402+
val cache = new FileStatusCache
403+
cache.refresh()
404+
cache
405+
}
406+
371407
final private[sql] def partitionSpec: PartitionSpec = {
372408
if (_partitionSpec == null) {
373409
_partitionSpec = maybePartitionSpec
374410
.map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
375411
.orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
376412
.getOrElse {
377-
if (sqlContext.conf.partitionDiscoveryEnabled()) {
378-
discoverPartitions()
379-
} else {
380-
PartitionSpec(StructType(Nil), Array.empty[Partition])
413+
if (sqlContext.conf.partitionDiscoveryEnabled()) {
414+
discoverPartitions()
415+
} else {
416+
PartitionSpec(StructType(Nil), Array.empty[Partition])
417+
}
381418
}
382-
}
383419
}
384420
_partitionSpec
385421
}
@@ -409,20 +445,14 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
409445
def userDefinedPartitionColumns: Option[StructType] = None
410446

411447
private[sql] def refresh(): Unit = {
448+
fileStatusCache.refresh()
412449
if (sqlContext.conf.partitionDiscoveryEnabled()) {
413450
_partitionSpec = discoverPartitions()
414451
}
415452
}
416453

417454
private def discoverPartitions(): PartitionSpec = {
418-
val basePaths = paths.map(new Path(_))
419-
val leafDirs = basePaths.flatMap { path =>
420-
val fs = path.getFileSystem(hadoopConf)
421-
Try(fs.getFileStatus(path.makeQualified(fs.getUri, fs.getWorkingDirectory)))
422-
.filter(_.isDir)
423-
.map(SparkHadoopUtil.get.listLeafDirStatuses(fs, _))
424-
.getOrElse(Seq.empty[FileStatus])
425-
}.map(_.getPath)
455+
val leafDirs = fileStatusCache.leafDirs.keys.toSeq
426456

427457
if (leafDirs.nonEmpty) {
428458
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
@@ -444,6 +474,16 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
444474
})
445475
}
446476

477+
private[sources] final def buildScan(
478+
requiredColumns: Array[String],
479+
filters: Array[Filter],
480+
inputPaths: Array[String]): RDD[Row] = {
481+
val inputStatuses = inputPaths.flatMap { path =>
482+
fileStatusCache.leafFiles.values.filter(_.getPath.getParent == new Path(path))
483+
}
484+
buildScan(requiredColumns, filters, inputStatuses)
485+
}
486+
447487
/**
448488
* Specifies schema of actual data files. For partitioned relations, if one or more partitioned
449489
* columns are contained in the data files, they should also appear in `dataSchema`.
@@ -457,13 +497,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
457497
* this relation. For partitioned relations, this method is called for each selected partition,
458498
* and builds an `RDD[Row]` containing all rows within that single partition.
459499
*
460-
* @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
500+
* @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
461501
* relation. For a partitioned relation, it contains paths of all data files in a single
462502
* selected partition.
463503
*
464504
* @since 1.4.0
465505
*/
466-
def buildScan(inputPaths: Array[String]): RDD[Row] = {
506+
def buildScan(inputFiles: Array[FileStatus]): RDD[Row] = {
467507
throw new UnsupportedOperationException(
468508
"At least one buildScan() method should be overridden to read the relation.")
469509
}
@@ -474,13 +514,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
474514
* and builds an `RDD[Row]` containing all rows within that single partition.
475515
*
476516
* @param requiredColumns Required columns.
477-
* @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
517+
* @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
478518
* relation. For a partitioned relation, it contains paths of all data files in a single
479519
* selected partition.
480520
*
481521
* @since 1.4.0
482522
*/
483-
def buildScan(requiredColumns: Array[String], inputPaths: Array[String]): RDD[Row] = {
523+
def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = {
484524
// Yeah, to workaround serialization...
485525
val dataSchema = this.dataSchema
486526
val codegenEnabled = this.codegenEnabled
@@ -490,7 +530,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
490530
BoundReference(dataSchema.fieldIndex(col), field.dataType, field.nullable)
491531
}.toSeq
492532

493-
buildScan(inputPaths).mapPartitions { rows =>
533+
buildScan(inputFiles).mapPartitions { rows =>
494534
val buildProjection = if (codegenEnabled) {
495535
GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
496536
} else {
@@ -512,7 +552,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
512552
* of all `filters`. The pushed down filters are currently purely an optimization as they
513553
* will all be evaluated again. This means it is safe to use them with methods that produce
514554
* false positives such as filtering partitions based on a bloom filter.
515-
* @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
555+
* @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
516556
* relation. For a partitioned relation, it contains paths of all data files in a single
517557
* selected partition.
518558
*
@@ -521,8 +561,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
521561
def buildScan(
522562
requiredColumns: Array[String],
523563
filters: Array[Filter],
524-
inputPaths: Array[String]): RDD[Row] = {
525-
buildScan(requiredColumns, inputPaths)
564+
inputFiles: Array[FileStatus]): RDD[Row] = {
565+
buildScan(requiredColumns, inputFiles)
526566
}
527567

528568
/**

sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.text.NumberFormat
2121
import java.util.UUID
2222

2323
import com.google.common.base.Objects
24-
import org.apache.hadoop.fs.Path
24+
import org.apache.hadoop.fs.{FileStatus, Path}
2525
import org.apache.hadoop.io.{NullWritable, Text}
2626
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
2727
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
@@ -101,10 +101,10 @@ class SimpleTextRelation(
101101
override def hashCode(): Int =
102102
Objects.hashCode(paths, maybeDataSchema, dataSchema)
103103

104-
override def buildScan(inputPaths: Array[String]): RDD[Row] = {
104+
override def buildScan(inputStatuses: Array[FileStatus]): RDD[Row] = {
105105
val fields = dataSchema.map(_.dataType)
106106

107-
sparkContext.textFile(inputPaths.mkString(",")).map { record =>
107+
sparkContext.textFile(inputStatuses.map(_.getPath).mkString(",")).map { record =>
108108
Row(record.split(",").zip(fields).map { case (value, dataType) =>
109109
Cast(Literal(value), dataType).eval()
110110
}: _*)

0 commit comments

Comments
 (0)