diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 72280449ad54..f548ebc99aea 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configurable; @@ -31,6 +32,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.GetTableRequest; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.PrincipalType; @@ -408,23 +410,46 @@ private void validateTableIsIcebergTableOrView( */ @Override public boolean tableExists(TableIdentifier identifier) { + return Objects.nonNull(fetchTable(identifier)); + } + + /** + * Check whether table exists and return its current metadata location. + * + *

Note: If a hive table with the same identifier exists in catalog, this method will return + * {@code null}. + * + * @param identifier a table identifier + * @return the location of the table if it exists, null otherwise + */ + public String getTableMetadataLocation(TableIdentifier identifier) { + Table table = fetchTable(identifier); + if (table == null) { + return null; + } + return table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + } + + private Table fetchTable(TableIdentifier identifier) { TableIdentifier baseTableIdentifier = identifier; if (!isValidIdentifier(identifier)) { if (!isValidMetadataIdentifier(identifier)) { - return false; + return null; } else { baseTableIdentifier = TableIdentifier.of(identifier.namespace().levels()); } } - String database = baseTableIdentifier.namespace().level(0); String tableName = baseTableIdentifier.name(); try { - Table table = clients.run(client -> client.getTable(database, tableName)); + GetTableRequest request = new GetTableRequest(); + request.setDbName(database); + request.setTblName(tableName); + Table table = clients.run(client -> client.getTable(request)); HiveOperationsBase.validateTableIsIceberg(table, fullTableName(name, baseTableIdentifier)); - return true; + return table; } catch (NoSuchTableException | NoSuchObjectException e) { - return false; + return null; } catch (TException e) { throw new RuntimeException("Failed to check table existence of " + baseTableIdentifier, e); } catch (InterruptedException e) { @@ -434,6 +459,7 @@ public boolean tableExists(TableIdentifier identifier) { } } + @Override public boolean viewExists(TableIdentifier viewIdentifier) { if (!isValidIdentifier(viewIdentifier)) { diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 91e68d7921a4..db33a49039cf 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1881,7 +1881,7 @@ public enum ConfVars { "HMS Iceberg Catalog servlet path component of URL endpoint." ), ICEBERG_CATALOG_CACHE_EXPIRY("metastore.iceberg.catalog.cache.expiry", - "hive.metastore.iceberg.catalog.cache.expiry", -1, + "hive.metastore.iceberg.catalog.cache.expiry", 600_000L, "HMS Iceberg Catalog cache expiry." ), HTTPSERVER_THREADPOOL_MIN("hive.metastore.httpserver.threadpool.min", diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java index edb5fbd41a9b..ed3dbb352dec 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java @@ -19,12 +19,19 @@ package org.apache.iceberg.rest; +import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Ticker; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.CachingCatalog; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -35,56 +42,249 @@ import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class that wraps an Iceberg Catalog to cache tables. */ public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { - private final HiveCatalog hiveCatalog; + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + protected final HiveCatalog hiveCatalog; public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + super(catalog, false, expiration, Ticker.systemTicker()); this.hiveCatalog = catalog; } @Override - public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) { - return hiveCatalog.buildTable(identifier, schema); + public void createNamespace(Namespace namespace, Map map) { + hiveCatalog.createNamespace(namespace, map); } @Override - public void createNamespace(Namespace nmspc, Map map) { - hiveCatalog.createNamespace(nmspc, map); + public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + return hiveCatalog.listNamespaces(namespace); + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { + // This method is intentionally left empty. It can be overridden in subclasses if needed. + } + + /** + * Callback when cache loads a table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheLoad(TableIdentifier tid) { + // This method is intentionally left empty. It can be overridden in subclasses if needed. + } + + /** + * Callback when cache hit for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheHit(TableIdentifier tid) { + // This method is intentionally left empty. It can be overridden in subclasses if needed. + } + + /** + * Callback when cache miss occurs for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMiss(TableIdentifier tid) { + // This method is intentionally left empty. It can be overridden in subclasses if needed. + } + /** + * Callback when cache loads a metadata table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMetaLoad(TableIdentifier tid) { + // This method is intentionally left empty. It can be overridden in subclasses if needed. } @Override - public List listNamespaces(Namespace nmspc) throws NoSuchNamespaceException { - return hiveCatalog.listNamespaces(nmspc); + public Table loadTable(final TableIdentifier identifier) { + final TableIdentifier canonicalized = identifier.toLowerCase(); + final Table cachedTable = tableCache.getIfPresent(canonicalized); + if (cachedTable != null) { + final String location = hiveCatalog.getTableMetadataLocation(canonicalized); + if (location == null) { + LOG.debug("Table {} has no location, returning cached table without location", canonicalized); + } else { + String cachedLocation = cachedTable instanceof HasTableOperations tableOps + ? tableOps.operations().current().metadataFileLocation() + : null; + if (!location.equals(cachedLocation)) { + LOG.debug("Invalidate table {}, cached {} != actual {}", canonicalized, cachedLocation, location); + // Invalidate the cached table if the location is different + invalidateTable(canonicalized); + onCacheInvalidate(canonicalized); + } else { + LOG.debug("Returning cached table: {}", canonicalized); + onCacheHit(canonicalized); + return cachedTable; + } + } + } else { + LOG.debug("Cache miss for table: {}", canonicalized); + onCacheMiss(canonicalized); + } + final Table table = tableCache.get(canonicalized, hiveCatalog::loadTable); + if (table instanceof BaseMetadataTable) { + // Cache underlying table + TableIdentifier originTableIdentifier = + TableIdentifier.of(canonicalized.namespace().levels()); + Table originTable = tableCache.get(originTableIdentifier, hiveCatalog::loadTable); + // Share TableOperations instance of origin table for all metadata tables, so that metadata + // table instances are refreshed as well when origin table instance is refreshed. + if (originTable instanceof HasTableOperations tableOps) { + TableOperations ops = tableOps.operations(); + MetadataTableType type = MetadataTableType.from(canonicalized.name()); + Table metadataTable = + MetadataTableUtils.createMetadataTableInstance( + ops, hiveCatalog.name(), originTableIdentifier, canonicalized, type); + tableCache.put(canonicalized, metadataTable); + onCacheMetaLoad(canonicalized); + LOG.debug("Loaded metadata table: {} for origin table: {}", canonicalized, originTableIdentifier); + // Return the metadata table instead of the original table + return metadataTable; + } + } + onCacheLoad(canonicalized); + LOG.debug("Loaded table: {} ", canonicalized); + return table; + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { + // This method is intentionally left empty. It can be overridden in subclasses if needed. + } + + /** + * Callback when cache loads a table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheLoad(TableIdentifier tid) { + // This method is intentionally left empty. It can be overridden in subclasses if needed. + } + + /** + * Callback when cache hit for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheHit(TableIdentifier tid) { + // This method is intentionally left empty. It can be overridden in subclasses if needed. + } + + /** + * Callback when cache miss occurs for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMiss(TableIdentifier tid) { + // This method is intentionally left empty. It can be overridden in subclasses if needed. + } + /** + * Callback when cache loads a metadata table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMetaLoad(TableIdentifier tid) { + // This method is intentionally left empty. It can be overridden in subclasses if needed. + } + + @Override + public Table loadTable(final TableIdentifier identifier) { + final Cache cache = this.tableCache; + final HiveCatalog catalog = this.hiveCatalog; + final TableIdentifier canonicalized = identifier.toLowerCase(); + Table cachedTable = cache.getIfPresent(canonicalized); + if (cachedTable != null) { + final String location = catalog.getTableMetadataLocation(canonicalized); + if (location == null) { + LOG.debug("Table {} has no location, returning cached table without location", canonicalized); + } else { + String cachedLocation = cachedTable instanceof HasTableOperations tableOps + ? tableOps.operations().current().metadataFileLocation() + : null; + if (!location.equals(cachedLocation)) { + LOG.debug("Invalidate table {}, cached {} != actual {}", canonicalized, cachedLocation, location); + // Invalidate the cached table if the location is different + invalidateTable(canonicalized); + onCacheInvalidate(canonicalized); + } else { + LOG.debug("Returning cached table: {}", canonicalized); + onCacheHit(canonicalized); + return cachedTable; + } + } + } else { + LOG.debug("Cache miss for table: {}", canonicalized); + onCacheMiss(canonicalized); + } + Table table = cache.get(canonicalized, catalog::loadTable); + if (table instanceof BaseMetadataTable) { + // Cache underlying table + TableIdentifier originTableIdentifier = + TableIdentifier.of(canonicalized.namespace().levels()); + Table originTable = cache.get(originTableIdentifier, catalog::loadTable); + // Share TableOperations instance of origin table for all metadata tables, so that metadata + // table instances are refreshed as well when origin table instance is refreshed. + if (originTable instanceof HasTableOperations tableOps) { + TableOperations ops = tableOps.operations(); + MetadataTableType type = MetadataTableType.from(canonicalized.name()); + Table metadataTable = + MetadataTableUtils.createMetadataTableInstance( + ops, catalog.name(), originTableIdentifier, canonicalized, type); + cache.put(canonicalized, metadataTable); + onCacheMetaLoad(canonicalized); + LOG.debug("Loaded metadata table: {} for origin table: {}", canonicalized, originTableIdentifier); + // Return the metadata table instead of the original table + return metadataTable; + } + } + onCacheLoad(canonicalized); + LOG.debug("Loaded table: {} ", canonicalized); + return table; } @Override - public Map loadNamespaceMetadata(Namespace nmspc) throws NoSuchNamespaceException { - return hiveCatalog.loadNamespaceMetadata(nmspc); + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + return hiveCatalog.loadNamespaceMetadata(namespace); } @Override - public boolean dropNamespace(Namespace nmspc) throws NamespaceNotEmptyException { - List tables = listTables(nmspc); + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + List tables = listTables(namespace); for (TableIdentifier ident : tables) { invalidateTable(ident); } - return hiveCatalog.dropNamespace(nmspc); + return hiveCatalog.dropNamespace(namespace); } @Override - public boolean setProperties(Namespace nmspc, Map map) throws NoSuchNamespaceException { - return hiveCatalog.setProperties(nmspc, map); + public boolean setProperties(Namespace namespace, Map map) throws NoSuchNamespaceException { + return hiveCatalog.setProperties(namespace, map); } @Override - public boolean removeProperties(Namespace nmspc, Set set) throws NoSuchNamespaceException { - return hiveCatalog.removeProperties(nmspc, set); + public boolean removeProperties(Namespace namespace, Set set) throws NoSuchNamespaceException { + return hiveCatalog.removeProperties(namespace, set); } @Override @@ -92,6 +292,11 @@ public boolean namespaceExists(Namespace namespace) { return hiveCatalog.namespaceExists(namespace); } + @Override + public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) { + return hiveCatalog.buildTable(identifier, schema); + } + @Override public List listViews(Namespace namespace) { return hiveCatalog.listViews(namespace); diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java index 682e7c9e2649..952b9165b141 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java @@ -33,13 +33,15 @@ /** * Catalog & servlet factory. + *

This class is derivable on purpose; the factory class name is a configuration property, this class + * can serve as a base for specialization.

*/ public class HMSCatalogFactory { private static final String SERVLET_ID_KEY = "metastore.in.test.iceberg.catalog.servlet.id"; - private final Configuration configuration; - private final int port; - private final String path; + protected final Configuration configuration; + protected final int port; + protected final String path; /** * Factory constructor. @@ -47,17 +49,17 @@ public class HMSCatalogFactory { * declared in configuration and found through introspection.

* @param conf the configuration */ - private HMSCatalogFactory(Configuration conf) { + protected HMSCatalogFactory(Configuration conf) { port = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.CATALOG_SERVLET_PORT); path = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_CATALOG_SERVLET_PATH); this.configuration = conf; } - - public int getPort() { + + protected final int getPort() { return port; } - - public String getPath() { + + protected final String getPath() { return path; } @@ -65,7 +67,7 @@ public String getPath() { * Creates the catalog instance. * @return the catalog */ - private Catalog createCatalog() { + protected Catalog createCatalog() { final Map properties = new TreeMap<>(); MetastoreConf.setVar(configuration, MetastoreConf.ConfVars.THRIFT_URIS, ""); final String configUri = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.THRIFT_URIS); @@ -89,8 +91,19 @@ private Catalog createCatalog() { hiveCatalog.setConf(configuration); final String catalogName = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.CATALOG_DEFAULT); hiveCatalog.initialize(catalogName, properties); + return cacheCatalog(hiveCatalog); + } + + /** + * Wraps the catalog in a caching catalog. + *

By default, the catalog is wrapped in {@link HMSCachingCatalog} that caches tables.

+ * @param hiveCatalog the Iceberg catalog + * @return the caching catalog + */ + protected Catalog cacheCatalog(HiveCatalog hiveCatalog) { + // If the catalog is not a caching catalog, wrap it in one long expiry = MetastoreConf.getLongVar(configuration, MetastoreConf.ConfVars.ICEBERG_CATALOG_CACHE_EXPIRY); - return expiry > 0 ? new HMSCachingCatalog(hiveCatalog, expiry) : hiveCatalog; + return new HMSCachingCatalog(hiveCatalog, expiry); } /** @@ -98,7 +111,7 @@ private Catalog createCatalog() { * @param catalog the Iceberg catalog * @return the servlet */ - private HttpServlet createServlet(Catalog catalog) { + protected HttpServlet createServlet(Catalog catalog) { String authType = MetastoreConf.getVar(configuration, ConfVars.CATALOG_SERVLET_AUTH); ServletSecurity security = new ServletSecurity(AuthType.fromString(authType), configuration); return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalog))); @@ -108,9 +121,10 @@ private HttpServlet createServlet(Catalog catalog) { * Creates the REST catalog servlet instance. * @return the servlet */ - private HttpServlet createServlet() { + protected HttpServlet createServlet() { if (port >= 0 && path != null && !path.isEmpty()) { - return createServlet(createCatalog()); + Catalog actualCatalog = createCatalog(); + return createServlet(actualCatalog); } return null; } diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestServerCatalogCache.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestServerCatalogCache.java new file mode 100644 index 000000000000..6e10fde67e6c --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestServerCatalogCache.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Files; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.rest.extension.HMSTestCachingCatalog; +import org.apache.iceberg.rest.extension.HMSTestCatalogFactory; +import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * The test verifies the behavior of the HMSCachingCatalog. + *

+ * The cache relies on the table metadata location to determine whether a table has changed between a cached entry + * and the latest version in the HMS database. + * The test will create a table, load it, insert some data, and check that the cache correctly invalidates + * cached entries when the table is modified. + *

+ */ +class TestServerCatalogCache { + private static final String NEWDB = "newdb"; + private static final String TBL = "tbl"; + private static final String TEMPDIR = "file:/tmp/junit" + Long.toHexString(System.currentTimeMillis()) + "/"; + private static final String ID = "id"; + private static final String DATA = "data"; + private static final Namespace NS = Namespace.of(new String[]{NEWDB}); + private static RESTCatalog restCatalog; + private static HiveRESTCatalogServerExtension restServer = + HiveRESTCatalogServerExtension.builder(AuthType.NONE).build(testConfiguration()); + + private static Configuration testConfiguration() { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CATALOG_SERVLET_FACTORY, HMSTestCatalogFactory.class.getName()); + return conf; + } + + private static String baseTableLocation(TableIdentifier identifier) { + Namespace ns = identifier.namespace(); + return TEMPDIR + ns + "/" + identifier.name(); + } + + @BeforeAll + static void setupAll() throws Exception { + restServer.beforeAll(null); + HMSTestCachingCatalog cachingCatalog = HMSTestCatalogFactory.getLastCatalog(); + Assertions.assertNotNull(cachingCatalog); + + Map catalogProperties = new HashMap<>(); + catalogProperties.putIfAbsent("uri", restServer.getRestEndpoint()); + catalogProperties.putIfAbsent("warehouse", "rck_warehouse"); + restCatalog = new RESTCatalog(); + restCatalog.setConf(new Configuration()); + restCatalog.initialize("hive", catalogProperties); + } + + @BeforeEach + void setup() { + try { + restServer.beforeEach(null); + } catch (IOException e) { + throw new RuntimeException(e); + } + RCKUtils.purgeCatalogTestEntries(restCatalog); + } + + @AfterAll + static void teardownAll() throws Exception { + if (restCatalog != null) { + restCatalog.close(); + } + restServer.afterAll(null); + restServer = null; + } + + @Test + void testTableCache() { + restCatalog.createNamespace(NS); + // acquire the server cache + HMSTestCachingCatalog cachingCatalog = HMSTestCatalogFactory.getLastCatalog(); + Assertions.assertNotNull(cachingCatalog); + HiveCatalog catalog = cachingCatalog.getHiveCatalog(); + // create a table schema + Schema schema = new Schema( + required(1, ID, Types.IntegerType.get()), + required(2, DATA, Types.StringType.get())); + TableIdentifier tableIdent = TableIdentifier.of(NEWDB, TBL); + String location = baseTableLocation(tableIdent); + try { + // create a table in the catalog bypassing the cache + // using restCatalog would defeat the purpose since it would use the cache + Table table = catalog + .buildTable(tableIdent, schema) + .withLocation(location) + .create(); + assertThat(table.location()).isEqualTo(location); + assertThat(table.schema().columns()).hasSize(2); + // check that the table is *not* yet in the catalog (hit 0, miss 1, load 1) + Table l0 = cachingCatalog.loadTable(tableIdent); + assertThat(table.location()).isEqualTo(location); + assertThat(l0).isNotEqualTo(table); + Map m0 = cachingCatalog.getCacheMetrics(); + assertThat(m0).containsEntry("hit", 0); + assertThat(m0).containsEntry("miss", 1); + assertThat(m0).containsEntry("invalidation", 0); + assertThat(m0).containsEntry("load", 1); + // load the table multiple times, find it in the cache (hit 10) + for (int i = 0; i < 10; i++) { + Table l = cachingCatalog.loadTable(tableIdent); + assertEquals(l0, l); + } + // load the table multiple times through the REST catalog, find it in the cache (hit 10) + for (int i = 0; i < 10; i++) { + Table l = restCatalog.loadTable(tableIdent); + // not the same instance, but the same metadata location + assertEquals(metadataLocation(l0), metadataLocation(l)); + } + m0 = cachingCatalog.getCacheMetrics(); + assertThat(m0).containsEntry("hit", 20); + // add rows through table; new snapshot implies invalidation + insertRows(table); + // load again, should provoke invalidation (invalidation 1, miss 1, load 2) + Table l1 = cachingCatalog.loadTable(tableIdent); + assertThat(l1).isNotEqualTo(l0); + Map m2 = cachingCatalog.getCacheMetrics(); + assertThat(m2).containsEntry("invalidation", 1); + assertThat(m2).containsEntry("miss",1); + assertThat(m2).containsEntry("load", 2); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + catalog.dropTable(tableIdent); + HMSTestCatalogFactory.clearLastCatalog(); + } + } + + private static String metadataLocation(Table table) { + return table instanceof HasTableOperations tableOps + ? tableOps.operations().current().metadataFileLocation() + : null; + } + + private void insertRows(Table table) throws IOException { + org.apache.iceberg.data.Record genericRecord = GenericRecord.create(table.schema()); + // write the records to a Parquet file + final int records = 8; + URI tempDirUri = URI.create(TEMPDIR); + File file = Paths.get(tempDirUri).resolve("test_cache.parquet").toFile(); + OutputFile outputFile = Files.localOutput(file); + try (FileAppender appender = Parquet.write(outputFile) + .schema(table.schema()) + .createWriterFunc(GenericParquetWriter::buildWriter) + .build()) { + for (int i= 0; i < records; ++i) { + genericRecord.setField(ID, i); + genericRecord.setField(DATA, Integer.toBinaryString(i)); + appender.add(genericRecord); + } + } + // create a DataFile from the Parquet file + DataFile dataFile = DataFiles.builder(table.spec()) + .withPath(file.getAbsolutePath()) + .withFileSizeInBytes(file.length()) + .withRecordCount(records) + .build(); + // append the DataFile to the table + table.newAppend() + .appendFile(dataFile) + .commit(); + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HMSTestCachingCatalog.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HMSTestCachingCatalog.java new file mode 100644 index 000000000000..6fcffb6eaa28 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HMSTestCachingCatalog.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest.extension; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.rest.HMSCachingCatalog; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A test class for HMSCachingCatalog that tracks cache metrics for test verifications. + * It extends HMSCachingCatalog and overrides methods to increment counters for cache hits, + * misses, invalidations, and loads. + */ +public class HMSTestCachingCatalog extends HMSCachingCatalog { + protected final AtomicInteger cacheHitCount = new AtomicInteger(0); + protected final AtomicInteger cacheMissCount = new AtomicInteger(0); + protected final AtomicInteger cacheInvalidationCount = new AtomicInteger(0); + protected final AtomicInteger cacheLoadCount = new AtomicInteger(0); + + /** + * Constructor for HMSTestCachingCatalog. + * + * @param catalog the HiveCatalog to wrap + * @param expiration the cache expiration time in milliseconds + */ + public HMSTestCachingCatalog(HiveCatalog catalog, long expiration) { + super(catalog, expiration); + } + + public Map getCacheMetrics() { + return Map.of( + "hit", cacheHitCount.get(), + "miss", cacheMissCount.get(), + "invalidation", cacheInvalidationCount.get(), + "load", cacheLoadCount.get() + ); + } + + public HiveCatalog getHiveCatalog() { + return hiveCatalog; + } + + @Override + protected void onCacheInvalidate(TableIdentifier tid) { + cacheInvalidationCount.incrementAndGet(); + } + + @Override + protected void onCacheLoad(TableIdentifier tid) { + cacheLoadCount.incrementAndGet(); + } + + @Override + protected void onCacheHit(TableIdentifier tid) { + cacheHitCount.incrementAndGet(); + } + + @Override + protected void onCacheMiss(TableIdentifier tid) { + cacheMissCount.incrementAndGet(); + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HMSTestCatalogFactory.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HMSTestCatalogFactory.java new file mode 100644 index 000000000000..611a5d1e512d --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HMSTestCatalogFactory.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest.extension; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.ServletServerBuilder; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.rest.HMSCatalogFactory; + +import javax.servlet.http.HttpServlet; +import java.lang.ref.Reference; +import java.lang.ref.SoftReference; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Factory for creating a test catalog that caches the last created catalog. + *

This class is used in tests to verify the behavior of the caching catalog.

+ */ +public class HMSTestCatalogFactory extends HMSCatalogFactory { + static final AtomicReference> catRef = new AtomicReference<>(null); + /** + * Factory constructor. + *

Called by the static method {@link HMSTestCatalogFactory#createServlet(Configuration)} that is + * declared in configuration and found through introspection.

+ * + * @param conf the configuration + */ + protected HMSTestCatalogFactory(Configuration conf) { + super(conf); + } + + @Override + protected Catalog cacheCatalog(HiveCatalog hiveCatalog) { + long expiry = MetastoreConf.getLongVar(configuration, MetastoreConf.ConfVars.ICEBERG_CATALOG_CACHE_EXPIRY); + HMSTestCachingCatalog cc = new HMSTestCachingCatalog(hiveCatalog, expiry); + catRef.set(new SoftReference<>(cc)); + return cc; + } + + public static HMSTestCachingCatalog getLastCatalog() { + Reference ref = catRef.get(); + return ref.get(); + } + + public static void clearLastCatalog() { + catRef.set(null); + } + + /** + * Creates the servlet instance. + * @return the servlet + */ + public static ServletServerBuilder.Descriptor createServlet(Configuration configuration) { + HMSTestCatalogFactory hms = new HMSTestCatalogFactory(configuration); + HttpServlet servlet = hms.createServlet(); + if (servlet != null) { + return new ServletServerBuilder.Descriptor(hms.getPort(), hms.getPath(), servlet); + } + return null; + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HiveRESTCatalogServerExtension.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HiveRESTCatalogServerExtension.java index 5b80d2276671..849175459dd6 100644 --- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HiveRESTCatalogServerExtension.java +++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HiveRESTCatalogServerExtension.java @@ -43,8 +43,8 @@ public class HiveRESTCatalogServerExtension implements BeforeAllCallback, Before private final JwksServer jwksServer; private final RESTCatalogServer restCatalogServer; - private HiveRESTCatalogServerExtension(AuthType authType, Class schemaInfoClass) { - this.conf = MetastoreConf.newMetastoreConf(); + private HiveRESTCatalogServerExtension(AuthType authType, Configuration configuration, Class schemaInfoClass) { + this.conf = configuration == null ? MetastoreConf.newMetastoreConf() : configuration; MetastoreConf.setVar(conf, ConfVars.CATALOG_SERVLET_AUTH, authType.name()); if (authType == AuthType.JWT) { jwksServer = new JwksServer(); @@ -59,10 +59,6 @@ private HiveRESTCatalogServerExtension(AuthType authType, Class metaStoreSchemaClass; + private Class schemaInfoClass; - private Builder(AuthType authType) { - this.authType = authType; - } - public Builder addMetaStoreSchemaClassName(Class metaStoreSchemaClass) { - this.metaStoreSchemaClass = metaStoreSchemaClass; + this.schemaInfoClass = metaStoreSchemaClass; return this; } + private Builder(AuthType authType) { + this.authType = authType; + } + public HiveRESTCatalogServerExtension build() { - return new HiveRESTCatalogServerExtension(authType, metaStoreSchemaClass); + return new HiveRESTCatalogServerExtension(authType, null, null); + } + + public HiveRESTCatalogServerExtension build(Configuration configuration) { + return new HiveRESTCatalogServerExtension(authType, configuration, schemaInfoClass ); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index e0368373a60c..7f989726f31d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -658,6 +658,7 @@ private static void constraintHttpMethods(ServletContextHandler ctxHandler, bool } ctxHandler.setSecurityHandler(securityHandler); } + /** * Start Metastore based on a passed {@link HadoopThriftAuthBridge}. * diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ServletSecurity.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ServletSecurity.java index 22985875f223..186d10f568f2 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ServletSecurity.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ServletSecurity.java @@ -19,6 +19,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.metastore.auth.HttpAuthenticationException; import org.apache.hadoop.hive.metastore.auth.jwt.JWTValidator; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -238,6 +239,15 @@ public void execute(HttpServletRequest request, HttpServletResponse response, Me Thread.currentThread().interrupt(); } catch (RuntimeException e) { throw new IOException("Exception when executing http request as user: "+ clientUgi.getUserName(), e); + } finally { + try { + FileSystem.closeAllForUGI(clientUgi); + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully cleaned up FileSystem handles for user: {}", clientUgi.getUserName()); + } + } catch (IOException cleanupException) { + LOG.error("Failed to clean up FileSystem handles for UGI: {}", clientUgi, cleanupException); + } } }