Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object HiveCatalogMetrics extends Source {
val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched"))

/**
* Tracks the total number of files discovered off of the filesystem by ListingFileCatalog.
* Tracks the total number of files discovered off of the filesystem by InMemoryFileIndex.
*/
val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class CacheManager extends Logging {

/**
* Traverses a given `plan` and searches for the occurrences of `qualifiedPath` in the
* [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any [[HadoopFsRelation]] nodes
* [[org.apache.spark.sql.execution.datasources.FileIndex]] of any [[HadoopFsRelation]] nodes
* in the plan. If found, we refresh the metadata and return true. Otherwise, this method returns
* false.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ import org.apache.spark.sql.types.StructType


/**
* A [[FileCatalog]] for a metastore catalog table.
* A [[FileIndex]] for a metastore catalog table.
*
* @param sparkSession a [[SparkSession]]
* @param table the metadata of the table
* @param sizeInBytes the table's data size in bytes
*/
class TableFileCatalog(
class CatalogFileIndex(
sparkSession: SparkSession,
val table: CatalogTable,
override val sizeInBytes: Long) extends FileCatalog {
override val sizeInBytes: Long) extends FileIndex {

protected val hadoopConf = sparkSession.sessionState.newHadoopConf

private val fileStatusCache = FileStatusCache.newCache(sparkSession)

assert(table.identifier.database.isDefined,
"The table identifier must be qualified in TableFileCatalog")
"The table identifier must be qualified in CatalogFileIndex")

private val baseLocation = table.storage.locationUri

Expand All @@ -57,33 +57,33 @@ class TableFileCatalog(
override def refresh(): Unit = fileStatusCache.invalidateAll()

/**
* Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
* Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions
* specified by the given partition-pruning filters.
*
* @param filters partition-pruning filters
*/
def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
if (table.partitionColumnNames.nonEmpty) {
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
new PrunedTableFileCatalog(
new PrunedInMemoryFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
} else {
new ListingFileCatalog(sparkSession, rootPaths, table.storage.properties, None)
new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties, None)
}
}

override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles

// `TableFileCatalog` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
// `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
// of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
// implement `equals` and `hashCode` here, to make it work with cache lookup.
override def equals(o: Any): Boolean = o match {
case other: TableFileCatalog => this.table.identifier == other.table.identifier
case other: CatalogFileIndex => this.table.identifier == other.table.identifier
case _ => false
}

Expand All @@ -97,12 +97,12 @@ class TableFileCatalog(
* @param tableBasePath The default base path of the Hive metastore table
* @param partitionSpec The partition specifications from Hive metastore
*/
private class PrunedTableFileCatalog(
private class PrunedInMemoryFileIndex(
sparkSession: SparkSession,
tableBasePath: Path,
fileStatusCache: FileStatusCache,
override val partitionSpec: PartitionSpec)
extends ListingFileCatalog(
extends InMemoryFileIndex(
sparkSession,
partitionSpec.partitions.map(_.path),
Map.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ case class DataSource(
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
val fileCatalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
val partitionSchema = fileCatalog.partitionSpec().partitionColumns
val inferred = format.inferSchema(
sparkSession,
Expand Down Expand Up @@ -364,7 +364,7 @@ case class DataSource(
case (format: FileFormat, _)
if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sparkSession,
Expand Down Expand Up @@ -417,12 +417,12 @@ case class DataSource(

val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) {
new TableFileCatalog(
new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
} else {
new ListingFileCatalog(
new InMemoryFileIndex(
sparkSession, globbedPaths, options, partitionSchema)
}

Expand All @@ -433,7 +433,7 @@ case class DataSource(
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
fileCatalog.asInstanceOf[ListingFileCatalog].allFiles())
fileCatalog.asInstanceOf[InMemoryFileIndex].allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
* An interface for objects capable of enumerating the root paths of a relation as well as the
* partitions of a relation subject to some pruning expressions.
*/
trait FileCatalog {
trait FileIndex {

/**
* Returns the list of root input paths from which the catalog will get files. There may be a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
* Acts as a container for all of the metadata required to read from a datasource. All discovery,
* resolution and merging logic for schemas and partitions has been removed.
*
* @param location A [[FileCatalog]] that can enumerate the locations of all the files that
* @param location A [[FileIndex]] that can enumerate the locations of all the files that
* comprise this relation.
* @param partitionSchema The schema of the columns (if any) that are used to partition the relation
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
Expand All @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
* @param options Configuration used when reading / writing data.
*/
case class HadoopFsRelation(
location: FileCatalog,
location: FileIndex,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@ import org.apache.spark.sql.types.StructType


/**
* A [[FileCatalog]] that generates the list of files to process by recursively listing all the
* A [[FileIndex]] that generates the list of files to process by recursively listing all the
* files present in `paths`.
*
* @param rootPaths the list of root table paths to scan
* @param parameters as set of options to control discovery
* @param partitionSchema an optional partition schema that will be use to provide types for the
* discovered partitions
*/
class ListingFileCatalog(
class InMemoryFileIndex(
sparkSession: SparkSession,
override val rootPaths: Seq[Path],
parameters: Map[String, String],
partitionSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache)
extends PartitioningAwareFileCatalog(
extends PartitioningAwareFileIndex(
sparkSession, parameters, partitionSchema, fileStatusCache) {

@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
Expand Down Expand Up @@ -79,7 +79,7 @@ class ListingFileCatalog(
}

override def equals(other: Any): Boolean = other match {
case hdfs: ListingFileCatalog => rootPaths.toSet == hdfs.rootPaths.toSet
case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration

/**
* An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables.
* An abstract class that represents [[FileIndex]]s that are aware of partitioned tables.
* It provides the necessary methods to parse partition data based on a set of files.
*
* @param parameters as set of options to control partition discovery
* @param userPartitionSchema an optional partition schema that will be use to provide types for
* the discovered partitions
*/
abstract class PartitioningAwareFileCatalog(
abstract class PartitioningAwareFileIndex(
sparkSession: SparkSession,
parameters: Map[String, String],
userPartitionSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging {
import PartitioningAwareFileCatalog.BASE_PATH_PARAM
fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging {
import PartitioningAwareFileIndex.BASE_PATH_PARAM

/** Returns the specification of the partitions inferred from the data. */
def partitionSpec(): PartitionSpec
Expand Down Expand Up @@ -253,9 +253,9 @@ abstract class PartitioningAwareFileCatalog(
}
val discovered = if (pathsToFetch.length >=
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
PartitioningAwareFileCatalog.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
} else {
PartitioningAwareFileCatalog.listLeafFilesInSerial(pathsToFetch, hadoopConf)
PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
}
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
Expand All @@ -266,7 +266,7 @@ abstract class PartitioningAwareFileCatalog(
}
}

object PartitioningAwareFileCatalog extends Logging {
object PartitioningAwareFileIndex extends Logging {
val BASE_PATH_PARAM = "basePath"

/** A serializable variant of HDFS's BlockLocation. */
Expand Down Expand Up @@ -383,7 +383,7 @@ object PartitioningAwareFileCatalog extends Logging {
if (shouldFilterOut(name)) {
Seq.empty[FileStatus]
} else {
// [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
// [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses = try fs.listStatus(path) catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
logicalRelation @
LogicalRelation(fsRelation @
HadoopFsRelation(
tableFileCatalog: TableFileCatalog,
catalogFileIndex: CatalogFileIndex,
partitionSchema,
_,
_,
Expand Down Expand Up @@ -56,9 +56,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))

if (partitionKeyFilters.nonEmpty) {
val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
fsRelation.copy(location = prunedFileCatalog)(sparkSession)
fsRelation.copy(location = prunedFileIndex)(sparkSession)
val prunedLogicalRelation = logicalRelation.copy(
relation = prunedFsRelation,
expectedOutputAttributes = Some(logicalRelation.output))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
*/
def allFiles(): Array[T] = {
var latestId = getLatest().map(_._1).getOrElse(-1L)
// There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog`
// There is a race condition when `FileStreamSink` is deleting old files and `StreamFileIndex`
// is calling this method. This loop will retry the reading to deal with the
// race condition.
while (true) {
Expand All @@ -158,7 +158,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
} catch {
case e: IOException =>
// Another process using `CompactibleFileStreamLog` may delete the batch files when
// `StreamFileCatalog` are reading. However, it only happens when a compaction is
// `StreamFileIndex` are reading. However, it only happens when a compaction is
// deleting old files. If so, let's try the next compaction batch and we should find it.
// Otherwise, this is a real IO issue and we should throw it.
latestId = nextCompactionBatchId(latestId, compactInterval)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -156,7 +156,7 @@ class FileStreamSource(
private def fetchAllFiles(): Seq[(String, Long)] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
(status.getPath.toUri.toString, status.getModificationTime)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.datasources._


/**
* A [[FileCatalog]] that generates the list of files to processing by reading them from the
* A [[FileIndex]] that generates the list of files to processing by reading them from the
* metadata log files generated by the [[FileStreamSink]].
*/
class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path)
extends PartitioningAwareFileCatalog(sparkSession, Map.empty, None) {
class MetadataLogFileIndex(sparkSession: SparkSession, path: Path)
extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) {

private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
logInfo(s"Reading streaming file log from $metadataDirectory")
Expand Down
Loading