Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ object HiveCatalogMetrics extends Source {
*/
val METRIC_HIVE_CLIENT_CALLS = metricRegistry.counter(MetricRegistry.name("hiveClientCalls"))

/**
* Tracks the total number of Spark jobs launched for parallel file listing.
*/
val METRIC_PARALLEL_LISTING_JOB_COUNT = metricRegistry.counter(
MetricRegistry.name("parallelListingJobCount"))

/**
* Resets the values of all metrics to zero. This is useful in tests.
*/
Expand All @@ -98,11 +104,13 @@ object HiveCatalogMetrics extends Source {
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
METRIC_HIVE_CLIENT_CALLS.dec(METRIC_HIVE_CLIENT_CALLS.getCount())
METRIC_PARALLEL_LISTING_JOB_COUNT.dec(METRIC_PARALLEL_LISTING_JOB_COUNT.getCount())
}

// clients can use these to avoid classloader issues with the codahale classes
def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n)
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
def incrementHiveClientCalls(n: Int): Unit = METRIC_HIVE_CLIENT_CALLS.inc(n)
def incrementParallelListingJobCount(n: Int): Unit = METRIC_PARALLEL_LISTING_JOB_COUNT.inc(n)
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,9 @@ abstract class PartitioningAwareFileIndex(
pathsToFetch += path
}
}
val discovered = if (pathsToFetch.length >=
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
} else {
PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
}
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = PartitioningAwareFileIndex.bulkListLeafFiles(
pathsToFetch, hadoopConf, filter, sparkSession)
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
Expand Down Expand Up @@ -286,31 +283,28 @@ object PartitioningAwareFileIndex extends Logging {
blockLocations: Array[SerializableBlockLocation])

/**
* List a collection of path recursively.
*/
private def listLeafFilesInSerial(
paths: Seq[Path],
hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(hadoopConf, this.getClass)
val filter = FileInputFormat.getInputPathFilter(jobConf)

paths.map { path =>
val fs = path.getFileSystem(hadoopConf)
(path, listLeafFiles0(fs, path, filter))
}
}

/**
* List a collection of path recursively in parallel (using Spark executors).
* Each task launched will use [[listLeafFilesInSerial]] to list.
* Lists a collection of paths recursively. Picks the listing strategy adaptively depending
* on the number of paths to list.
*
* This may only be called on the driver.
*
* @return for each input path, the set of discovered files for the path
*/
private def listLeafFilesInParallel(
private def bulkListLeafFiles(
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)

// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
return paths.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
}
}

logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
HiveCatalogMetrics.incrementParallelListingJobCount(1)

val sparkContext = sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
Expand All @@ -324,9 +318,11 @@ object PartitioningAwareFileIndex extends Logging {

val statusMap = sparkContext
.parallelize(serializedPaths, numParallelism)
.mapPartitions { paths =>
.mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
pathStrings.map(new Path(_)).toSeq.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, None))
}.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
Expand Down Expand Up @@ -374,11 +370,20 @@ object PartitioningAwareFileIndex extends Logging {
}

/**
* List a single path, provided as a FileStatus, in serial.
* Lists a single filesystem path recursively. If a SparkSession object is specified, this
* function may launch Spark jobs to parallelize listing.
*
* If sessionOpt is None, this may be called on executors.
*
* @return all children of path that match the specified filter.
*/
private def listLeafFiles0(
fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
private def listLeafFiles(
path: Path,
hadoopConf: Configuration,
filter: PathFilter,
sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)
val name = path.getName.toLowerCase
if (shouldFilterOut(name)) {
Seq.empty[FileStatus]
Expand All @@ -393,9 +398,15 @@ object PartitioningAwareFileIndex extends Logging {
}

val allLeafStatuses = {
val (dirs, files) = statuses.partition(_.isDirectory)
val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = sessionOpt match {
case Some(session) =>
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
case _ =>
dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
}
val allFiles = topLevelFiles ++ nestedFiles
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}

allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.language.reflectiveCalls

import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}

import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.SharedSQLContext

Expand Down Expand Up @@ -81,6 +82,58 @@ class FileIndexSuite extends SharedSQLContext {
}
}

test("PartitioningAwareFileIndex listing parallelized with many top level dirs") {
for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD -> "xxx") { test code } to make the test more robust?

withTempDir { dir =>
val topLevelDirs = (1 to scale).map { i =>
val tmp = new File(dir, s"foo=$i.txt")
tmp.mkdir()
new Path(tmp.getCanonicalPath)
}
HiveCatalogMetrics.reset()
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
new InMemoryFileIndex(spark, topLevelDirs, Map.empty, None)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
}
}
}

test("PartitioningAwareFileIndex listing parallelized with large child dirs") {
for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
withTempDir { dir =>
for (i <- 1 to scale) {
new File(dir, s"foo=$i.txt").mkdir()
}
HiveCatalogMetrics.reset()
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
}
}
}

test("PartitioningAwareFileIndex listing parallelized with large, deeply nested child dirs") {
for ((scale, expectedNumPar) <- Seq((10, 0), (50, 4))) {
withTempDir { dir =>
for (i <- 1 to 2) {
val subdirA = new File(dir, s"a=$i")
subdirA.mkdir()
for (j <- 1 to 2) {
val subdirB = new File(subdirA, s"b=$j")
subdirB.mkdir()
for (k <- 1 to scale) {
new File(subdirB, s"foo=$k.txt").mkdir()
}
}
}
HiveCatalogMetrics.reset()
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
}
}
}

test("PartitioningAwareFileIndex - file filtering") {
assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))
Expand Down