diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index f72881f928f..5dbc89533df 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.quoteIfNeeded -import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.command.DDLUtils @@ -49,6 +49,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider, toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper} import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature +import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors} /** @@ -377,29 +378,45 @@ class HiveTableCatalog(sparkSession: SparkSession) } try { - catalog.alterTable( - catalogTable.copy( - properties = properties, - schema = schema, - owner = owner, - comment = comment, - storage = storage)) + catalog.alterTable(newCatalogTable( + identifier = catalogTable.identifier, + tableType = catalogTable.tableType, + storage = storage, + schema = schema, + provider = catalogTable.provider, + partitionColumnNames = catalogTable.partitionColumnNames, + bucketSpec = catalogTable.bucketSpec, + owner = owner, + createTime = catalogTable.createTime, + lastAccessTime = catalogTable.lastAccessTime, + createVersion = catalogTable.createVersion, + properties = properties, + stats = catalogTable.stats, + viewText = catalogTable.viewText, + comment = comment, + unsupportedFeatures = catalogTable.unsupportedFeatures, + tracksPartitionsInCatalog = catalogTable.tracksPartitionsInCatalog, + schemaPreservesCase = catalogTable.schemaPreservesCase, + ignoredProperties = catalogTable.ignoredProperties, + viewOriginalText = catalogTable.viewOriginalText)) } catch { case _: NoSuchTableException => throw new NoSuchTableException(ident) } - + invalidateTable(ident) loadTable(ident) } override def dropTable(ident: Identifier): Boolean = withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") { try { - if (loadTable(ident) != null) { + val table = loadTable(ident) + if (table != null) { catalog.dropTable( ident.asTableIdentifier, ignoreIfNotExists = true, purge = true /* skip HDFS trash */ ) + invalidateTable(ident) true } else { false @@ -417,10 +434,17 @@ class HiveTableCatalog(sparkSession: SparkSession) } // Load table to make sure the table exists - loadTable(oldIdent) + val table = loadTable(oldIdent) catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier) + invalidateTable(oldIdent) } + override def invalidateTable(ident: Identifier): Unit = { + super.invalidateTable(ident) + val qualifiedName = s"$catalogName.$ident" + HiveFileStatusCache.getOrCreate(sparkSession, qualifiedName).invalidateAll() + } + private def toOptions(properties: Map[String, String]): Map[String, String] = { properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map { case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala index 98968a7d41c..f50229181ff 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala @@ -57,4 +57,16 @@ object KyuubiHiveConnectorConf { .version("1.11.0") .booleanConf .createWithDefault(true) + + val HIVE_FILE_STATUS_CACHE_SCOPE = + buildConf("spark.sql.kyuubi.hive.file.status.cache.scope") + .doc("The scope of hive file status cache, globe and none.") + .version("1.11.0") + .stringConf + .transform(policy => policy.toUpperCase(Locale.ROOT)) + .checkValue( + policy => Set("GLOBE", "NONE").contains(policy), + "Invalid value for 'spark.sql.kyuubi.hive.file.status.cache.scope'." + + "Valid values are 'GLOBE', 'NONE'.") + .createWithDefault("GLOBE") } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala index 0142c556194..a82d61a0b11 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala @@ -48,7 +48,9 @@ class HiveCatalogFileIndex( private val partPathToBindHivePart: mutable.Map[PartitionPath, CatalogTablePartition] = mutable.Map() - private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + private val fileStatusCache = HiveFileStatusCache.getOrCreate( + sparkSession, + hiveCatalog.name() + "." + catalogTable.qualifiedName) private val baseLocation: Option[URI] = table.storage.locationUri diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala new file mode 100644 index 00000000000..6cfef9e4c37 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.hive.read + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.JavaConverters._ + +import com.google.common.cache._ +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} +import org.apache.spark.util.SizeEstimator + +import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.HIVE_FILE_STATUS_CACHE_SCOPE + +/** + * Forked from Apache Spark's [[org.apache.spark.sql.execution.datasources.FileStatusCache]] 3.5.5. + * + * Because the original FileStatusCache cannot take effect (see https://github.com/apache/kyuubi + * /issues/7192). + * + * The main modification point is that at the globally level, the cache key is the qualified name + * of the table (in the form of `catalog.database.table`) + path. The previous key was an + * object + path generated during initialization, and the current scenario is that FileStatusCache + * is not preserved by the outside, resulting in different keys and ineffective caching. + * + * Use [[HiveFileStatusCache.getOrCreate()]] to construct a globe/none shared file status cache. + */ +object HiveFileStatusCache { + private var sharedCache: HiveSharedInMemoryCache = _ + + /** + * @return a new FileStatusCache based on session configuration. Cache memory quota is + * shared across all clients. + */ + def getOrCreate(session: SparkSession, qualifiedName: String): FileStatusCache = + synchronized { + val conf = session.sessionState.conf + if (conf.manageFilesourcePartitions && conf.filesourcePartitionFileCacheSize > 0) { + if (sharedCache == null) { + sharedCache = new HiveSharedInMemoryCache( + session.sessionState.conf.filesourcePartitionFileCacheSize, + session.sessionState.conf.metadataCacheTTL) + } + conf.getConf(HIVE_FILE_STATUS_CACHE_SCOPE) match { + case "GLOBE" => sharedCache.createForNewClient(qualifiedName) + case "NONE" => NoopCache + } + } else { + NoopCache + } + } + + def resetForTesting(): Unit = synchronized { + sharedCache = null + } +} + +/** + * An implementation that caches partition file statuses in memory. + * + * @param maxSizeInBytes max allowable cache size before entries start getting evicted + */ +private class HiveSharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends Logging { + + // Opaque object that uniquely identifies a shared cache user + private type ClientId = Object + + private val warnedAboutEviction = new AtomicBoolean(false) + + // we use a composite cache key in order to distinguish entries inserted by different clients + private val cache: Cache[(ClientId, Path), Array[FileStatus]] = { + // [[Weigher]].weigh returns Int so we could only cache objects < 2GB + // instead, the weight is divided by this factor (which is smaller + // than the size of one [[FileStatus]]). + // so it will support objects up to 64GB in size. + val weightScale = 32 + val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] { + override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = { + val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale + if (estimate > Int.MaxValue) { + logWarning(s"Cached table partition metadata size is too big. Approximating to " + + s"${Int.MaxValue.toLong * weightScale}.") + Int.MaxValue + } else { + estimate.toInt + } + } + } + val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() { + override def onRemoval( + removed: RemovalNotification[(ClientId, Path), Array[FileStatus]]): Unit = { + if (removed.getCause == RemovalCause.SIZE && + warnedAboutEviction.compareAndSet(false, true)) { + logWarning( + "Evicting cached table partition metadata from memory due to size constraints " + + "(spark.sql.hive.filesourcePartitionFileCacheSize = " + + maxSizeInBytes + " bytes). This may impact query planning performance.") + } + } + } + + var builder = CacheBuilder.newBuilder() + .weigher(weigher) + .removalListener(removalListener) + .maximumWeight(maxSizeInBytes / weightScale) + + if (cacheTTL > 0) { + builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) + } + + builder.build[(ClientId, Path), Array[FileStatus]]() + } + + /** + * @return a FileStatusCache that does not share any entries with any other client, but does + * share memory resources for the purpose of cache eviction. + */ + def createForNewClient(clientId: Object): HiveFileStatusCache = new HiveFileStatusCache { + + override def getLeafFiles(path: Path): Option[Array[FileStatus]] = { + Option(cache.getIfPresent((clientId, path))) + } + + override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit = { + cache.put((clientId, path), leafFiles) + } + + override def invalidateAll(): Unit = { + cache.asMap.asScala.foreach { case (key, value) => + if (key._1 == clientId) { + cache.invalidate(key) + } + } + } + } + + abstract class HiveFileStatusCache extends FileStatusCache {} +} diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala index 2a30ac434c8..53b3848bd0d 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala @@ -27,6 +27,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult} import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite @@ -69,6 +70,8 @@ class HiveBatchWrite( // un-cache this table. hiveTableCatalog.catalog.invalidateCachedTable(table.identifier) + hiveTableCatalog.invalidateTable( + Identifier.of(Array(table.identifier.database.getOrElse("")), table.identifier.table)) val catalog = hiveTableCatalog.catalog if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala index da53b898927..e4e41f5f562 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET} -import org.apache.kyuubi.spark.connector.hive.read.HiveScan +import org.apache.kyuubi.spark.connector.hive.read.{HiveFileStatusCache, HiveScan} class HiveCatalogSuite extends KyuubiHiveTest { @@ -284,16 +284,26 @@ class HiveCatalogSuite extends KyuubiHiveTest { } test("invalidateTable") { - val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) - // Hive v2 don't cache table - catalog.invalidateTable(testIdent) - - val loaded = catalog.loadTable(testIdent) - - assert(table.name == loaded.name) - assert(table.schema == loaded.schema) - assert(table.properties == loaded.properties) - catalog.dropTable(testIdent) + withSparkSession() { spark => + val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) + val qualifiedName = s"$catalogName.$testIdent" + val location = table.asInstanceOf[HiveTable].catalogTable.location + + spark.sql(s"select * from $qualifiedName").collect() + assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName) + .getLeafFiles(new Path(location)).isDefined) + + catalog.invalidateTable(testIdent) + // invalidate filestatus cache + assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName) + .getLeafFiles(new Path(location)).isEmpty) + + val loaded = catalog.loadTable(testIdent) + assert(table.name == loaded.name) + assert(table.schema == loaded.schema) + assert(table.properties == loaded.properties) + catalog.dropTable(testIdent) + } } test("listNamespaces: fail if missing namespace") { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala new file mode 100644 index 00000000000..f553ccaafe7 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.hive + +import scala.concurrent.duration.DurationInt + +import com.google.common.collect.Maps +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.concurrent.Futures.timeout + +import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.HIVE_FILE_STATUS_CACHE_SCOPE +import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache + +class HiveFileStatusCacheSuite extends KyuubiHiveTest { + + test("use different cache scope") { + Seq("GLOBE", "NONE").foreach { value => + withSparkSession(Map(HIVE_FILE_STATUS_CACHE_SCOPE.key -> value)) { _ => + val path = new Path("/dummy_tmp", "abc") + val files = (1 to 3).map(_ => new FileStatus()) + + HiveFileStatusCache.resetForTesting() + val fileStatusCacheTabel = HiveFileStatusCache.getOrCreate(spark, "catalog.db.catTable") + fileStatusCacheTabel.putLeafFiles(path, files.toArray) + + value match { + // Exactly 3 files are cached. + case "GLOBE" => + assert(fileStatusCacheTabel.getLeafFiles(path).get.length === 3) + case "NONE" => + assert(fileStatusCacheTabel.getLeafFiles(path).isEmpty) + case _ => + throw new IllegalArgumentException( + s"Unexpected value: '$value'. Only 'GLOBE' or 'NONE' are allowed.") + } + + fileStatusCacheTabel.invalidateAll() + assert(fileStatusCacheTabel.getLeafFiles(path).isEmpty) + } + } + } + + test("cached by qualifiedName") { + val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + try { + // using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L) + + val path = new Path("/dummy_tmp", "abc") + val files = (1 to 3).map(_ => new FileStatus()) + + HiveFileStatusCache.resetForTesting() + val fileStatusCacheTabel1 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.cat1Table") + fileStatusCacheTabel1.putLeafFiles(path, files.toArray) + val fileStatusCacheTabel2 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.cat1Table") + val fileStatusCacheTabel3 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table2") + + // Exactly 3 files are cached. + assert(fileStatusCacheTabel1.getLeafFiles(path).get.length === 3) + assert(fileStatusCacheTabel2.getLeafFiles(path).get.length === 3) + assert(fileStatusCacheTabel3.getLeafFiles(path).isEmpty) + // Wait until the cache expiration. + eventually(timeout(3.seconds)) { + // And the cache is gone. + assert(fileStatusCacheTabel1.getLeafFiles(path).isEmpty) + assert(fileStatusCacheTabel2.getLeafFiles(path).isEmpty) + assert(fileStatusCacheTabel3.getLeafFiles(path).isEmpty) + } + } finally { + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) + } + } + + test("expire FileStatusCache if TTL is configured") { + val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + try { + // using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L) + + val path = new Path("/dummy_tmp", "abc") + val files = (1 to 3).map(_ => new FileStatus()) + + HiveFileStatusCache.resetForTesting() + val fileStatusCache = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table") + fileStatusCache.putLeafFiles(path, files.toArray) + + // Exactly 3 files are cached. + assert(fileStatusCache.getLeafFiles(path).get.length === 3) + // Wait until the cache expiration. + eventually(timeout(3.seconds)) { + // And the cache is gone. + assert(fileStatusCache.getLeafFiles(path).isEmpty) + } + } finally { + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) + } + } + + private def newCatalog(): HiveTableCatalog = { + val catalog = new HiveTableCatalog + val properties = Maps.newHashMap[String, String]() + properties.put("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:memorydb;create=true") + properties.put("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver") + catalog.initialize(catalogName, new CaseInsensitiveStringMap(properties)) + catalog + } + + test("expire FileStatusCache when insert into") { + val dbName = "default" + val tbName = "tbl_partition" + val table = s"${catalogName}.${dbName}.${tbName}" + + withTable(table) { + spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect() + val location = newCatalog() + .loadTable(Identifier.of(Array(dbName), tbName)) + .asInstanceOf[HiveTable] + .catalogTable.location.toString + + spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect() + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isEmpty) + + assert(spark.sql(s"select * from $table").count() === 5) + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isDefined) + + // should clear cache + spark.sql(s"insert into $table partition(city='ct') values(11),(21),(31),(41),(51)").collect() + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isEmpty) + + assert(spark.sql(s"select * from $table").count() === 10) + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isDefined) + } + } + + test("expire FileStatusCache when insert overwrite") { + val dbName = "default" + val tbName = "tbl_partition" + val table = s"${catalogName}.${dbName}.${tbName}" + + withTable(table) { + spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect() + val location = newCatalog() + .loadTable(Identifier.of(Array(dbName), tbName)) + .asInstanceOf[HiveTable] + .catalogTable.location.toString + + spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect() + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isEmpty) + + assert(spark.sql(s"select * from $table").count() === 5) + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isDefined) + + // should clear cache + spark.sql(s"insert overwrite $table partition(city='ct') values(11),(21),(31),(41),(51)") + .collect() + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isEmpty) + + assert(spark.sql(s"select * from $table").count() === 5) + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isDefined) + } + } + + test("expire FileStatusCache when alter Table") { + val dbName = "default" + val tbName = "tbl_partition" + val table = s"${catalogName}.${dbName}.${tbName}" + + withTable(table) { + spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect() + val location = newCatalog() + .loadTable(Identifier.of(Array(dbName), tbName)) + .asInstanceOf[HiveTable] + .catalogTable.location.toString + + spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect() + spark.sql(s"select * from $table").collect() + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isDefined) + + // should clear cache + spark.sql(s"ALTER TABLE $table ADD COLUMNS (name string)").collect() + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isEmpty) + } + } + + test("expire FileStatusCache when rename Table") { + val dbName = "default" + val oldTbName = "tbl_partition" + val newTbName = "tbl_partition_new" + val oldTable = s"$catalogName.$dbName.$oldTbName" + val newTable = s"$catalogName.$dbName.$newTbName" + + withTable(newTable) { + spark.sql(s"create table ${oldTable} (age int)partitioned by(city string) stored as orc") + .collect() + spark.sql(s"insert into $oldTable partition(city='ct') values(10),(20),(30),(40),(50)") + .collect() + spark.sql(s"select * from $oldTable").collect() + + val oldLocation = newCatalog() + .loadTable(Identifier.of(Array(dbName), oldTbName)) + .asInstanceOf[HiveTable] + .catalogTable.location.toString + assert(HiveFileStatusCache.getOrCreate(spark, oldTable) + .getLeafFiles(new Path(s"$oldLocation/city=ct")).isDefined) + + spark.sql(s"DROP TABLE IF EXISTS ${newTable}").collect() + spark.sql(s"use ${catalogName}.${dbName}").collect() + spark.sql(s"ALTER TABLE $oldTbName RENAME TO $newTbName").collect() + val newLocation = newCatalog() + .loadTable(Identifier.of(Array(dbName), newTbName)) + .asInstanceOf[HiveTable] + .catalogTable.location.toString + + assert(HiveFileStatusCache.getOrCreate(spark, oldTable) + .getLeafFiles(new Path(s"$oldLocation/city=ct")) + .isEmpty) + + assert(HiveFileStatusCache.getOrCreate(spark, newTable) + .getLeafFiles(new Path(s"$newLocation/city=ct")) + .isEmpty) + } + } + + test("FileStatusCache isolated between different catalogs with same database.table") { + val catalog1 = catalogName + val catalog2 = "hive2" + val dbName = "default" + val tbName = "tbl_partition" + val cat1Table = s"${catalog1}.${dbName}.${tbName}" + val cat2Table = s"${catalog2}.${dbName}.${tbName}" + + withTable(cat1Table, cat2Table) { + spark.sql(s"CREATE TABLE IF NOT EXISTS $cat1Table (age int)partitioned by(city string)" + + s" stored as orc").collect() + spark.sql(s"CREATE TABLE IF NOT EXISTS $cat2Table (age int)partitioned by(city string)" + + s" stored as orc").collect() + + val location = newCatalog() + .loadTable(Identifier.of(Array(dbName), tbName)) + .asInstanceOf[HiveTable] + .catalogTable.location.toString + + spark.sql(s"insert into $cat1Table partition(city='ct1') " + + s"values(11),(12),(13),(14),(15)").collect() + spark.sql(s"select * from $cat1Table where city='ct1'").collect() + assert(HiveFileStatusCache.getOrCreate(spark, cat1Table) + .getLeafFiles(new Path(s"$location/city=ct1")).isDefined) + assert(HiveFileStatusCache.getOrCreate(spark, cat2Table) + .getLeafFiles(new Path(s"$location/city=ct1")).isEmpty) + + spark.sql(s"insert into $cat2Table partition(city='ct2') " + + s"values(21),(22),(23),(24),(25)").collect() + spark.sql(s"select * from $cat2Table where city='ct2'").collect() + assert(HiveFileStatusCache.getOrCreate(spark, cat1Table) + .getLeafFiles(new Path(s"$location/city=ct2")).isEmpty) + assert(HiveFileStatusCache.getOrCreate(spark, cat2Table) + .getLeafFiles(new Path(s"$location/city=ct2")).isDefined) + } + } +} diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala index fb5bcd62184..fa9a05e2d13 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog} import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.Utils import org.apache.kyuubi.spark.connector.common.LocalSparkSession +import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache abstract class KyuubiHiveTest extends QueryTest with Logging { @@ -49,11 +50,13 @@ abstract class KyuubiHiveTest extends QueryTest with Logging { override def beforeEach(): Unit = { super.beforeAll() + HiveFileStatusCache.resetForTesting() getOrCreateSpark() } override def afterEach(): Unit = { super.afterAll() + HiveFileStatusCache.resetForTesting() LocalSparkSession.stop(innerSpark) } @@ -63,6 +66,7 @@ abstract class KyuubiHiveTest extends QueryTest with Logging { .set("spark.ui.enabled", "false") .set("spark.sql.catalogImplementation", "hive") .set("spark.sql.catalog.hive", classOf[HiveTableCatalog].getName) + .set("spark.sql.catalog.hive2", classOf[HiveTableCatalog].getName) .set("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:memorydb;create=true") .set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver")