Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import scala.collection.JavaConverters._

import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.google.common.util.concurrent.Striped
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -65,6 +66,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
t.identifier.table.toLowerCase)
}

/** These locks guard against multiple attempts to instantiate a table, which wastes memory. */
private val tableCreationLocks = Striped.lazyWeakLock(100)

/** Acquires a lock on the table cache for the duration of `f`. */
private def withTableCreationLock[A](tableName: QualifiedTableName, f: => A): A = {
val lock = tableCreationLocks.get(tableName)
lock.lock()
try f finally {
lock.unlock()
}
}

/** A cache of Spark SQL data source tables that have been accessed. */
protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
Expand Down Expand Up @@ -274,77 +287,81 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
partitionPaths
}

val cached = getCached(
tableIdentifier,
paths,
metastoreRelation,
metastoreSchema,
fileFormatClass,
bucketSpec,
Some(partitionSpec))

val hadoopFsRelation = cached.getOrElse {
val fileCatalog = new MetaStorePartitionedTableFileCatalog(
sparkSession,
new Path(metastoreRelation.catalogTable.storage.locationUri.get),
partitionSpec)

val inferredSchema = if (fileType.equals("parquet")) {
val inferredSchema =
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles())
inferredSchema.map { inferred =>
ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred)
}.getOrElse(metastoreSchema)
} else {
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get
}
withTableCreationLock(tableIdentifier, {
val cached = getCached(
tableIdentifier,
paths,
metastoreRelation,
metastoreSchema,
fileFormatClass,
bucketSpec,
Some(partitionSpec))

val hadoopFsRelation = cached.getOrElse {
val fileCatalog = new MetaStorePartitionedTableFileCatalog(
sparkSession,
new Path(metastoreRelation.catalogTable.storage.locationUri.get),
partitionSpec)

val inferredSchema = if (fileType.equals("parquet")) {
val inferredSchema =
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles())
inferredSchema.map { inferred =>
ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred)
}.getOrElse(metastoreSchema)
} else {
defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get
}

val relation = HadoopFsRelation(
location = fileCatalog,
partitionSchema = partitionSchema,
dataSchema = inferredSchema,
bucketSpec = bucketSpec,
fileFormat = defaultSource,
options = options)(sparkSession = sparkSession)

val created = LogicalRelation(
relation,
metastoreTableIdentifier =
Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database))))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
val relation = HadoopFsRelation(
location = fileCatalog,
partitionSchema = partitionSchema,
dataSchema = inferredSchema,
bucketSpec = bucketSpec,
fileFormat = defaultSource,
options = options)(sparkSession = sparkSession)

val created = LogicalRelation(
relation,
metastoreTableIdentifier =
Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database))))
cachedDataSourceTables.put(tableIdentifier, created)
created
}

hadoopFsRelation
hadoopFsRelation
})
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)

val cached = getCached(tableIdentifier,
paths,
metastoreRelation,
metastoreSchema,
fileFormatClass,
bucketSpec,
None)
val logicalRelation = cached.getOrElse {
val created =
LogicalRelation(
DataSource(
sparkSession = sparkSession,
paths = paths,
userSpecifiedSchema = Some(metastoreRelation.schema),
bucketSpec = bucketSpec,
options = options,
className = fileType).resolveRelation(),
metastoreTableIdentifier =
Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database))))


cachedDataSourceTables.put(tableIdentifier, created)
created
}
withTableCreationLock(tableIdentifier, {
val cached = getCached(tableIdentifier,
paths,
metastoreRelation,
metastoreSchema,
fileFormatClass,
bucketSpec,
None)
val logicalRelation = cached.getOrElse {
val created =
LogicalRelation(
DataSource(
sparkSession = sparkSession,
paths = paths,
userSpecifiedSchema = Some(metastoreRelation.schema),
bucketSpec = bucketSpec,
options = options,
className = fileType).resolveRelation(),
metastoreTableIdentifier =
Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database))))


cachedDataSourceTables.put(tableIdentifier, created)
created
}

logicalRelation
logicalRelation
})
}
result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
}
Expand Down