-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-29035: Fixing cache handling for REST catalog #6022
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f5c717e
563c31d
364a257
dc4c029
d691ff1
7a37552
6f5bd6c
58f38c1
3db7b2f
54d51ce
7d611d3
1a7cf16
2203dab
4e9a52f
f8eb5e4
8c7de03
235771d
210a2cc
717766c
308892f
99b6d44
dc6fc63
4e5249f
ce5fb8a
e51cbd2
71ecdaf
7c79f15
7d80bc2
2c307df
4ca34b4
ff34452
9e1c76b
6314755
6f22e78
eef7525
17b159c
8a818c5
58d94ee
3087edb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |||
|
|
||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.Objects; | ||||
| import java.util.Set; | ||||
| import java.util.stream.Collectors; | ||||
| import org.apache.hadoop.conf.Configurable; | ||||
|
|
@@ -31,6 +32,7 @@ | |||
| import org.apache.hadoop.hive.metastore.TableType; | ||||
| import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; | ||||
| import org.apache.hadoop.hive.metastore.api.Database; | ||||
| import org.apache.hadoop.hive.metastore.api.GetTableRequest; | ||||
| import org.apache.hadoop.hive.metastore.api.InvalidOperationException; | ||||
| import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; | ||||
| import org.apache.hadoop.hive.metastore.api.PrincipalType; | ||||
|
|
@@ -408,23 +410,46 @@ private void validateTableIsIcebergTableOrView( | |||
| */ | ||||
| @Override | ||||
| public boolean tableExists(TableIdentifier identifier) { | ||||
| return Objects.nonNull(fetchTable(identifier)); | ||||
| } | ||||
|
|
||||
| /** | ||||
| * Check whether table exists and return its current metadata location. | ||||
| * | ||||
| * <p>Note: If a hive table with the same identifier exists in catalog, this method will return | ||||
| * {@code null}. | ||||
| * | ||||
| * @param identifier a table identifier | ||||
| * @return the location of the table if it exists, null otherwise | ||||
| */ | ||||
| public String getTableMetadataLocation(TableIdentifier identifier) { | ||||
| Table table = fetchTable(identifier); | ||||
| if (table == null) { | ||||
| return null; | ||||
| } | ||||
| return table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); | ||||
| } | ||||
|
|
||||
| private Table fetchTable(TableIdentifier identifier) { | ||||
| TableIdentifier baseTableIdentifier = identifier; | ||||
| if (!isValidIdentifier(identifier)) { | ||||
| if (!isValidMetadataIdentifier(identifier)) { | ||||
| return false; | ||||
| return null; | ||||
| } else { | ||||
| baseTableIdentifier = TableIdentifier.of(identifier.namespace().levels()); | ||||
| } | ||||
| } | ||||
|
|
||||
| String database = baseTableIdentifier.namespace().level(0); | ||||
| String tableName = baseTableIdentifier.name(); | ||||
| try { | ||||
| Table table = clients.run(client -> client.getTable(database, tableName)); | ||||
| GetTableRequest request = new GetTableRequest(); | ||||
| request.setDbName(database); | ||||
| request.setTblName(tableName); | ||||
| Table table = clients.run(client -> client.getTable(request)); | ||||
| HiveOperationsBase.validateTableIsIceberg(table, fullTableName(name, baseTableIdentifier)); | ||||
| return true; | ||||
| return table; | ||||
| } catch (NoSuchTableException | NoSuchObjectException e) { | ||||
| return false; | ||||
| return null; | ||||
| } catch (TException e) { | ||||
| throw new RuntimeException("Failed to check table existence of " + baseTableIdentifier, e); | ||||
| } catch (InterruptedException e) { | ||||
|
|
@@ -434,6 +459,7 @@ public boolean tableExists(TableIdentifier identifier) { | |||
| } | ||||
| } | ||||
|
|
||||
|
|
||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The cache in HIVE-29035 is limited to serving loadTable() for REST and resides server-side; the Table objects it serves are marshaled by to a client so there is no 'external' instance sharing. It is dependent upon HMS being the actual catalog implementation to acquire the latest known metadata location for a given table. This makes this PR pretty much tied to Hive; there is no need to involve Iceberg.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without using a REST catalog, a client retrieves table metadata through XYZCatalog -> TableMetadataParser -> S3/HDFS/etc. With a REST catalog, a client does it through RESTCatalog, where REST API(in our case, the servlet) serves metadata via XYZCatalog(in our case, HiveCatalog or HMSCachingCatalog) -> TableMetadataParser -> S3/HDFS/etc. So, TableMetadataParser might be a better place to maintain. It can support our use case, and we can remove HMSCachingCatalog, which uses CachingCatalog introduced for client-side caching and utilized only in SparkCatalog and FlinkCatalog.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@okumin are you saying
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. some notes from iceberg dev lists:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I asked the Iceberg community with the sample PR.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @okumin, do you know if apache/iceberg@72d5fd6 solves the same problem? |
||||
| @Override | ||||
| public boolean viewExists(TableIdentifier viewIdentifier) { | ||||
| if (!isValidIdentifier(viewIdentifier)) { | ||||
|
|
||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,12 +19,19 @@ | |
|
|
||
| package org.apache.iceberg.rest; | ||
|
|
||
| import com.github.benmanes.caffeine.cache.Cache; | ||
| import com.github.benmanes.caffeine.cache.Ticker; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import org.apache.iceberg.BaseMetadataTable; | ||
| import org.apache.iceberg.CachingCatalog; | ||
| import org.apache.iceberg.HasTableOperations; | ||
| import org.apache.iceberg.MetadataTableType; | ||
| import org.apache.iceberg.MetadataTableUtils; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.TableOperations; | ||
| import org.apache.iceberg.catalog.Catalog; | ||
| import org.apache.iceberg.catalog.Namespace; | ||
| import org.apache.iceberg.catalog.SupportsNamespaces; | ||
|
|
@@ -35,63 +42,261 @@ | |
| import org.apache.iceberg.hive.HiveCatalog; | ||
| import org.apache.iceberg.view.View; | ||
| import org.apache.iceberg.view.ViewBuilder; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * Class that wraps an Iceberg Catalog to cache tables. | ||
| */ | ||
| public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { | ||
| private final HiveCatalog hiveCatalog; | ||
| protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); | ||
| protected final HiveCatalog hiveCatalog; | ||
deniskuzZ marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| public HMSCachingCatalog(HiveCatalog catalog, long expiration) { | ||
| super(catalog, true, expiration, Ticker.systemTicker()); | ||
| super(catalog, false, expiration, Ticker.systemTicker()); | ||
| this.hiveCatalog = catalog; | ||
| } | ||
|
|
||
| @Override | ||
| public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) { | ||
| return hiveCatalog.buildTable(identifier, schema); | ||
| public void createNamespace(Namespace namespace, Map<String, String> map) { | ||
| hiveCatalog.createNamespace(namespace, map); | ||
| } | ||
|
|
||
| @Override | ||
| public void createNamespace(Namespace nmspc, Map<String, String> map) { | ||
| hiveCatalog.createNamespace(nmspc, map); | ||
| public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException { | ||
| return hiveCatalog.listNamespaces(namespace); | ||
| } | ||
|
|
||
| /** | ||
| * Callback when cache invalidates the entry for a given table identifier. | ||
| * | ||
| * @param tid the table identifier to invalidate | ||
| */ | ||
| protected void onCacheInvalidate(TableIdentifier tid) { | ||
| // This method is intentionally left empty. It can be overridden in subclasses if needed. | ||
deniskuzZ marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| /** | ||
| * Callback when cache loads a table for a given table identifier. | ||
| * | ||
| * @param tid the table identifier | ||
| */ | ||
| protected void onCacheLoad(TableIdentifier tid) { | ||
| // This method is intentionally left empty. It can be overridden in subclasses if needed. | ||
| } | ||
|
|
||
| /** | ||
| * Callback when cache hit for a given table identifier. | ||
| * | ||
| * @param tid the table identifier | ||
| */ | ||
| protected void onCacheHit(TableIdentifier tid) { | ||
| // This method is intentionally left empty. It can be overridden in subclasses if needed. | ||
| } | ||
|
|
||
| /** | ||
| * Callback when cache miss occurs for a given table identifier. | ||
| * | ||
| * @param tid the table identifier | ||
| */ | ||
| protected void onCacheMiss(TableIdentifier tid) { | ||
| // This method is intentionally left empty. It can be overridden in subclasses if needed. | ||
| } | ||
| /** | ||
| * Callback when cache loads a metadata table for a given table identifier. | ||
| * | ||
| * @param tid the table identifier | ||
| */ | ||
| protected void onCacheMetaLoad(TableIdentifier tid) { | ||
| // This method is intentionally left empty. It can be overridden in subclasses if needed. | ||
| } | ||
|
|
||
| @Override | ||
| public List<Namespace> listNamespaces(Namespace nmspc) throws NoSuchNamespaceException { | ||
| return hiveCatalog.listNamespaces(nmspc); | ||
| public Table loadTable(final TableIdentifier identifier) { | ||
| final TableIdentifier canonicalized = identifier.toLowerCase(); | ||
| final Table cachedTable = tableCache.getIfPresent(canonicalized); | ||
| if (cachedTable != null) { | ||
| final String location = hiveCatalog.getTableMetadataLocation(canonicalized); | ||
| if (location == null) { | ||
| LOG.debug("Table {} has no location, returning cached table without location", canonicalized); | ||
| } else { | ||
| String cachedLocation = cachedTable instanceof HasTableOperations tableOps | ||
| ? tableOps.operations().current().metadataFileLocation() | ||
| : null; | ||
| if (!location.equals(cachedLocation)) { | ||
| LOG.debug("Invalidate table {}, cached {} != actual {}", canonicalized, cachedLocation, location); | ||
| // Invalidate the cached table if the location is different | ||
| invalidateTable(canonicalized); | ||
| onCacheInvalidate(canonicalized); | ||
| } else { | ||
| LOG.debug("Returning cached table: {}", canonicalized); | ||
| onCacheHit(canonicalized); | ||
| return cachedTable; | ||
| } | ||
| } | ||
| } else { | ||
| LOG.debug("Cache miss for table: {}", canonicalized); | ||
| onCacheMiss(canonicalized); | ||
| } | ||
| final Table table = tableCache.get(canonicalized, hiveCatalog::loadTable); | ||
| if (table instanceof BaseMetadataTable) { | ||
| // Cache underlying table | ||
| TableIdentifier originTableIdentifier = | ||
| TableIdentifier.of(canonicalized.namespace().levels()); | ||
| Table originTable = tableCache.get(originTableIdentifier, hiveCatalog::loadTable); | ||
| // Share TableOperations instance of origin table for all metadata tables, so that metadata | ||
| // table instances are refreshed as well when origin table instance is refreshed. | ||
| if (originTable instanceof HasTableOperations tableOps) { | ||
| TableOperations ops = tableOps.operations(); | ||
| MetadataTableType type = MetadataTableType.from(canonicalized.name()); | ||
| Table metadataTable = | ||
| MetadataTableUtils.createMetadataTableInstance( | ||
| ops, hiveCatalog.name(), originTableIdentifier, canonicalized, type); | ||
| tableCache.put(canonicalized, metadataTable); | ||
| onCacheMetaLoad(canonicalized); | ||
| LOG.debug("Loaded metadata table: {} for origin table: {}", canonicalized, originTableIdentifier); | ||
| // Return the metadata table instead of the original table | ||
| return metadataTable; | ||
| } | ||
| } | ||
| onCacheLoad(canonicalized); | ||
| LOG.debug("Loaded table: {} ", canonicalized); | ||
| return table; | ||
| } | ||
|
|
||
| /** | ||
| * Callback when cache invalidates the entry for a given table identifier. | ||
| * | ||
| * @param tid the table identifier to invalidate | ||
| */ | ||
| protected void onCacheInvalidate(TableIdentifier tid) { | ||
| // This method is intentionally left empty. It can be overridden in subclasses if needed. | ||
| } | ||
|
|
||
| /** | ||
| * Callback when cache loads a table for a given table identifier. | ||
| * | ||
| * @param tid the table identifier | ||
| */ | ||
| protected void onCacheLoad(TableIdentifier tid) { | ||
| // This method is intentionally left empty. It can be overridden in subclasses if needed. | ||
| } | ||
|
|
||
| /** | ||
| * Callback when cache hit for a given table identifier. | ||
| * | ||
| * @param tid the table identifier | ||
| */ | ||
| protected void onCacheHit(TableIdentifier tid) { | ||
| // This method is intentionally left empty. It can be overridden in subclasses if needed. | ||
| } | ||
|
|
||
| /** | ||
| * Callback when cache miss occurs for a given table identifier. | ||
| * | ||
| * @param tid the table identifier | ||
| */ | ||
| protected void onCacheMiss(TableIdentifier tid) { | ||
| // This method is intentionally left empty. It can be overridden in subclasses if needed. | ||
| } | ||
| /** | ||
| * Callback when cache loads a metadata table for a given table identifier. | ||
| * | ||
| * @param tid the table identifier | ||
| */ | ||
| protected void onCacheMetaLoad(TableIdentifier tid) { | ||
| // This method is intentionally left empty. It can be overridden in subclasses if needed. | ||
| } | ||
|
|
||
| @Override | ||
| public Table loadTable(final TableIdentifier identifier) { | ||
| final Cache<TableIdentifier, Table> cache = this.tableCache; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the point in those local vars?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Usual (old?) pattern of using locals instead of dereferencing members (marginally faster).
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| final HiveCatalog catalog = this.hiveCatalog; | ||
| final TableIdentifier canonicalized = identifier.toLowerCase(); | ||
| Table cachedTable = cache.getIfPresent(canonicalized); | ||
| if (cachedTable != null) { | ||
| final String location = catalog.getTableMetadataLocation(canonicalized); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can't you use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't this imply the cache is rematerializing a table object to verify it might have avoided it ?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry, I might be missing something. does the catalog.loadTable(canonicalized) use the cache? |
||
| if (location == null) { | ||
| LOG.debug("Table {} has no location, returning cached table without location", canonicalized); | ||
| } else { | ||
| String cachedLocation = cachedTable instanceof HasTableOperations tableOps | ||
| ? tableOps.operations().current().metadataFileLocation() | ||
| : null; | ||
| if (!location.equals(cachedLocation)) { | ||
| LOG.debug("Invalidate table {}, cached {} != actual {}", canonicalized, cachedLocation, location); | ||
| // Invalidate the cached table if the location is different | ||
| invalidateTable(canonicalized); | ||
| onCacheInvalidate(canonicalized); | ||
| } else { | ||
| LOG.debug("Returning cached table: {}", canonicalized); | ||
| onCacheHit(canonicalized); | ||
| return cachedTable; | ||
| } | ||
| } | ||
| } else { | ||
| LOG.debug("Cache miss for table: {}", canonicalized); | ||
| onCacheMiss(canonicalized); | ||
| } | ||
| Table table = cache.get(canonicalized, catalog::loadTable); | ||
| if (table instanceof BaseMetadataTable) { | ||
| // Cache underlying table | ||
| TableIdentifier originTableIdentifier = | ||
| TableIdentifier.of(canonicalized.namespace().levels()); | ||
| Table originTable = cache.get(originTableIdentifier, catalog::loadTable); | ||
| // Share TableOperations instance of origin table for all metadata tables, so that metadata | ||
| // table instances are refreshed as well when origin table instance is refreshed. | ||
| if (originTable instanceof HasTableOperations tableOps) { | ||
| TableOperations ops = tableOps.operations(); | ||
| MetadataTableType type = MetadataTableType.from(canonicalized.name()); | ||
| Table metadataTable = | ||
| MetadataTableUtils.createMetadataTableInstance( | ||
| ops, catalog.name(), originTableIdentifier, canonicalized, type); | ||
| cache.put(canonicalized, metadataTable); | ||
| onCacheMetaLoad(canonicalized); | ||
| LOG.debug("Loaded metadata table: {} for origin table: {}", canonicalized, originTableIdentifier); | ||
| // Return the metadata table instead of the original table | ||
| return metadataTable; | ||
| } | ||
| } | ||
| onCacheLoad(canonicalized); | ||
| LOG.debug("Loaded table: {} ", canonicalized); | ||
| return table; | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, String> loadNamespaceMetadata(Namespace nmspc) throws NoSuchNamespaceException { | ||
| return hiveCatalog.loadNamespaceMetadata(nmspc); | ||
| public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { | ||
| return hiveCatalog.loadNamespaceMetadata(namespace); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean dropNamespace(Namespace nmspc) throws NamespaceNotEmptyException { | ||
| List<TableIdentifier> tables = listTables(nmspc); | ||
| public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { | ||
| List<TableIdentifier> tables = listTables(namespace); | ||
| for (TableIdentifier ident : tables) { | ||
| invalidateTable(ident); | ||
| } | ||
| return hiveCatalog.dropNamespace(nmspc); | ||
| return hiveCatalog.dropNamespace(namespace); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean setProperties(Namespace nmspc, Map<String, String> map) throws NoSuchNamespaceException { | ||
| return hiveCatalog.setProperties(nmspc, map); | ||
| public boolean setProperties(Namespace namespace, Map<String, String> map) throws NoSuchNamespaceException { | ||
| return hiveCatalog.setProperties(namespace, map); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean removeProperties(Namespace nmspc, Set<String> set) throws NoSuchNamespaceException { | ||
| return hiveCatalog.removeProperties(nmspc, set); | ||
| public boolean removeProperties(Namespace namespace, Set<String> set) throws NoSuchNamespaceException { | ||
| return hiveCatalog.removeProperties(namespace, set); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean namespaceExists(Namespace namespace) { | ||
| return hiveCatalog.namespaceExists(namespace); | ||
| } | ||
|
|
||
| @Override | ||
| public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) { | ||
| return hiveCatalog.buildTable(identifier, schema); | ||
| } | ||
|
|
||
| @Override | ||
| public List<TableIdentifier> listViews(Namespace namespace) { | ||
| return hiveCatalog.listViews(namespace); | ||
|
|
||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please do not change anything in iceberg-catalog or submit an iceberg PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg PR: apache/iceberg#13800
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!