Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,20 @@ class SessionCatalog(
externalCatalog.listPartitions(db, table, partialSpec)
}

/**
* List the metadata of partitions that belong to the specified table, assuming it exists, that
* satisfy the given partition-pruning predicate expressions.
*/
def listPartitionsByFilter(
tableName: TableIdentifier,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding this, @cloud-fan ! I think I can use this in #15302 .

val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
externalCatalog.listPartitionsByFilter(db, table, predicates)
}

/**
* Verify if the input partition spec exactly matches the existing defined partition spec
* The columns must be the same but the orders could be different.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,30 @@ package org.apache.spark.sql.execution.datasources
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType


/**
* A [[FileCatalog]] for a metastore catalog table.
*
* @param sparkSession a [[SparkSession]]
* @param db the table's database name
* @param table the table's (unqualified) name
* @param partitionSchema the schema of a partitioned table's partition columns
* @param table the metadata of the table
* @param sizeInBytes the table's data size in bytes
* @param fileStatusCache optional cache implementation to use for file listing
*/
class TableFileCatalog(
sparkSession: SparkSession,
db: String,
table: String,
partitionSchema: Option[StructType],
val table: CatalogTable,
override val sizeInBytes: Long) extends FileCatalog {

protected val hadoopConf = sparkSession.sessionState.newHadoopConf

private val fileStatusCache = FileStatusCache.newCache(sparkSession)

private val externalCatalog = sparkSession.sharedState.externalCatalog
assert(table.identifier.database.isDefined,
"The table identifier must be qualified in TableFileCatalog")

private val catalogTable = externalCatalog.getTable(db, table)

private val baseLocation = catalogTable.storage.locationUri
private val baseLocation = table.storage.locationUri

override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq

Expand All @@ -66,24 +60,32 @@ class TableFileCatalog(
* @param filters partition-pruning filters
*/
def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
val parameters = baseLocation
.map(loc => Map(PartitioningAwareFileCatalog.BASE_PATH_PARAM -> loc))
.getOrElse(Map.empty)
partitionSchema match {
case Some(schema) =>
val selectedPartitions = externalCatalog.listPartitionsByFilter(db, table, filters)
val partitions = selectedPartitions.map { p =>
PartitionPath(p.toRow(schema), p.storage.locationUri.get)
}
val partitionSpec = PartitionSpec(schema, partitions)
new PrunedTableFileCatalog(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
case None =>
new ListingFileCatalog(sparkSession, rootPaths, parameters, None, fileStatusCache)
if (table.partitionColumnNames.nonEmpty) {
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitionSchema = table.partitionSchema
val partitions = selectedPartitions.map { p =>
PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
new PrunedTableFileCatalog(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
} else {
new ListingFileCatalog(sparkSession, rootPaths, table.storage.properties, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @cloud-fan, here I have a question why should we remove the fileStatusCache for ListingFileCatalog? Do we need to add it back?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems it was a mistake. Can you send a PR to add it? thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, sure. Thanks for your answer.

}
}

override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles

// `TableFileCatalog` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
// of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
// implement `equals` and `hashCode` here, to make it work with cache lookup.
override def equals(o: Any): Boolean = o match {
case other: TableFileCatalog => this.table.identifier == other.table.identifier
case _ => false
}

override def hashCode(): Int = table.identifier.hashCode()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
Some(partitionSchema))

val logicalRelation = cached.getOrElse {
val db = metastoreRelation.databaseName
val table = metastoreRelation.tableName
val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
val fileCatalog = {
val catalog = new TableFileCatalog(
sparkSession, db, table, Some(partitionSchema), sizeInBytes)
sparkSession, metastoreRelation.catalogTable, sizeInBytes)
if (lazyPruningEnabled) {
catalog
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package org.apache.spark.sql.hive

import java.io.File

import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, TableFileCatalog}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -317,4 +320,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto

sql("DROP TABLE cachedTable")
}

test("cache a table using TableFileCatalog") {
withTable("test") {
sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet")
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)

val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
})
val relation = HadoopFsRelation(
location = tableFileCatalog,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)

val plan = LogicalRelation(relation, catalogTable = Some(tableMeta))
spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan))

assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)

val sameCatalog = new TableFileCatalog(spark, tableMeta, 0)
val sameRelation = HadoopFsRelation(
location = sameCatalog,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)
val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta))

assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
|LOCATION '${dir.getAbsolutePath}'""".stripMargin)

val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
val tableFileCatalog = new TableFileCatalog(
spark,
tableMeta.database,
tableMeta.identifier.table,
Some(tableMeta.partitionSchema),
0)
val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)

val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
Expand Down