diff --git a/core/src/main/java/com/netease/arctic/mixed/BasicMixedIcebergCatalog.java b/core/src/main/java/com/netease/arctic/mixed/BasicMixedIcebergCatalog.java index a7922562b6..a6158063de 100644 --- a/core/src/main/java/com/netease/arctic/mixed/BasicMixedIcebergCatalog.java +++ b/core/src/main/java/com/netease/arctic/mixed/BasicMixedIcebergCatalog.java @@ -35,6 +35,8 @@ import com.netease.arctic.table.blocker.TableBlockerManager; import com.netease.arctic.utils.TablePropertyUtil; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CachingCatalog; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -47,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.PropertyUtil; import java.util.List; import java.util.Map; @@ -63,6 +66,7 @@ public class BasicMixedIcebergCatalog implements ArcticCatalog { private Pattern databaseFilterPattern; private AmsClient client; private MixedTables tables; + private SupportsNamespaces asNamespaceCatalog; @Override public String name() { @@ -71,18 +75,46 @@ public String name() { @Override public void initialize(String name, Map properties, TableMetaStore metaStore) { + boolean cacheEnabled = + PropertyUtil.propertyAsBoolean( + properties, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT); + + boolean cacheCaseSensitive = + PropertyUtil.propertyAsBoolean( + properties, + CatalogProperties.CACHE_CASE_SENSITIVE, + CatalogProperties.CACHE_CASE_SENSITIVE_DEFAULT); + + long cacheExpirationIntervalMs = + PropertyUtil.propertyAsLong( + properties, + CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, + CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT); + + // An expiration interval of 0ms effectively disables caching. + // Do not wrap with CachingCatalog. + if (cacheExpirationIntervalMs == 0) { + cacheEnabled = false; + } Pattern databaseFilterPattern = null; if (properties.containsKey(CatalogMetaProperties.KEY_DATABASE_FILTER_REGULAR_EXPRESSION)) { String databaseFilter = properties.get(CatalogMetaProperties.KEY_DATABASE_FILTER_REGULAR_EXPRESSION); databaseFilterPattern = Pattern.compile(databaseFilter); } + Catalog catalog = buildIcebergCatalog(name, properties, metaStore.getConfiguration()); this.name = name; this.tableMetaStore = metaStore; - this.icebergCatalog = buildIcebergCatalog(name, properties, metaStore.getConfiguration()); + this.icebergCatalog = + cacheEnabled + ? CachingCatalog.wrap(catalog, cacheCaseSensitive, cacheExpirationIntervalMs) + : catalog; + if (catalog instanceof SupportsNamespaces) { + this.asNamespaceCatalog = (SupportsNamespaces) catalog; + } this.databaseFilterPattern = databaseFilterPattern; this.catalogProperties = properties; - this.tables = newMixedTables(metaStore, properties, icebergCatalog); + this.tables = newMixedTables(metaStore, properties, icebergCatalog()); if (properties.containsKey(CatalogMetaProperties.AMS_URI)) { this.client = new PooledAmsClient(properties.get(CatalogMetaProperties.AMS_URI)); } @@ -218,14 +250,13 @@ private org.apache.iceberg.catalog.TableIdentifier toIcebergTableIdentifier( } private SupportsNamespaces asNamespaceCatalog() { - Catalog icebergCatalog = icebergCatalog(); - if (!(icebergCatalog instanceof SupportsNamespaces)) { + if (asNamespaceCatalog == null) { throw new UnsupportedOperationException( String.format( "Iceberg catalog: %s doesn't implement SupportsNamespaces", - icebergCatalog.getClass().getName())); + icebergCatalog().getClass().getName())); } - return (SupportsNamespaces) icebergCatalog; + return asNamespaceCatalog; } private class MixedIcebergTableBuilder implements TableBuilder { diff --git a/trino/src/main/java/com/netease/arctic/trino/ArcticConnectorMetadata.java b/trino/src/main/java/com/netease/arctic/trino/ArcticConnectorMetadata.java index 8f7df35210..eb8ab5dc27 100644 --- a/trino/src/main/java/com/netease/arctic/trino/ArcticConnectorMetadata.java +++ b/trino/src/main/java/com/netease/arctic/trino/ArcticConnectorMetadata.java @@ -66,6 +66,7 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** @@ -81,6 +82,8 @@ public class ArcticConnectorMetadata implements ConnectorMetadata { private final ArcticCatalog arcticCatalog; + private final Map tableCache = new ConcurrentHashMap<>(); + public ArcticConnectorMetadata( KeyedConnectorMetadata keyedConnectorMetadata, IcebergMetadata icebergMetadata, @@ -99,8 +102,7 @@ public List listSchemaNames(ConnectorSession session) { @Override public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { - // 需要缓存 - ArcticTable arcticTable = null; + ArcticTable arcticTable; try { arcticTable = getArcticTable(tableName); } catch (NoSuchTableException e) { @@ -169,7 +171,7 @@ public ColumnMetadata getColumnMetadata( public Iterator streamTableColumns( ConnectorSession session, SchemaTablePrefix prefix) { if (prefix.getTable().isPresent()) { - ArcticTable arcticTable = null; + ArcticTable arcticTable; try { arcticTable = getArcticTable(new SchemaTableName(prefix.getSchema().get(), prefix.getTable().get())); @@ -452,7 +454,8 @@ public void rollback() { } public ArcticTable getArcticTable(SchemaTableName schemaTableName) { - return arcticCatalog.loadTable(getTableIdentifier(schemaTableName)); + return tableCache.computeIfAbsent( + schemaTableName, ignore -> arcticCatalog.loadTable(getTableIdentifier(schemaTableName))); } private TableIdentifier getTableIdentifier(SchemaTableName schemaTableName) { diff --git a/trino/src/main/java/com/netease/arctic/trino/unkeyed/IcebergMetadata.java b/trino/src/main/java/com/netease/arctic/trino/unkeyed/IcebergMetadata.java index 1e9ebbc9d0..532968010d 100644 --- a/trino/src/main/java/com/netease/arctic/trino/unkeyed/IcebergMetadata.java +++ b/trino/src/main/java/com/netease/arctic/trino/unkeyed/IcebergMetadata.java @@ -305,6 +305,8 @@ public class IcebergMetadata implements ConnectorMetadata { private final Map tableStatisticsCache = new ConcurrentHashMap<>(); + private final Map tableCache = new ConcurrentHashMap<>(); + private Transaction transaction; public IcebergMetadata( @@ -355,8 +357,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa Table table; try { table = - catalog.loadTable( - session, new SchemaTableName(tableName.getSchemaName(), name.getTableName())); + loadTable(session, new SchemaTableName(tableName.getSchemaName(), name.getTableName())); } catch (TableNotFoundException e) { return null; } @@ -423,8 +424,7 @@ private Optional getRawSystemTable( Table table; try { table = - catalog.loadTable( - session, new SchemaTableName(tableName.getSchemaName(), name.getTableName())); + loadTable(session, new SchemaTableName(tableName.getSchemaName(), name.getTableName())); } catch (TableNotFoundException e) { return Optional.empty(); } catch (UnknownTableTypeException e) { @@ -472,7 +472,7 @@ public ConnectorTableProperties getTableProperties( ImmutableList.of()); } - Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + Table icebergTable = loadTable(session, table.getSchemaTableName()); // Extract identity partition fields that are present in all partition specs, for creating the // discrete predicates. @@ -558,7 +558,7 @@ public ConnectorTableProperties getTableProperties( public ConnectorTableMetadata getTableMetadata( ConnectorSession session, ConnectorTableHandle table) { IcebergTableHandle tableHandle = (IcebergTableHandle) table; - Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + Table icebergTable = loadTable(session, tableHandle.getSchemaTableName()); List columns = getColumnMetadatas(SchemaParser.fromJson(tableHandle.getTableSchemaJson())); ImmutableMap.Builder properties = ImmutableMap.builder(); @@ -636,7 +636,7 @@ public Iterator streamTableColumns( return Stream.of(TableColumnsMetadata.forRedirectedTable(tableName)); } - Table icebergTable = catalog.loadTable(session, tableName); + Table icebergTable = loadTable(session, tableName); List columns = getColumnMetadatas(icebergTable.schema()); return Stream.of(TableColumnsMetadata.forTable(tableName, columns)); } catch (TableNotFoundException e) { @@ -767,7 +767,7 @@ public ConnectorMergeTableHandle beginMerge( IcebergTableHandle table = (IcebergTableHandle) tableHandle; verifyTableVersionForUpdate(table); - Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + Table icebergTable = loadTable(session, table.getSchemaTableName()); validateNotModifyingOldSnapshot(table, icebergTable); validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec()); @@ -1039,7 +1039,7 @@ public ConnectorInsertTableHandle beginInsert( List columns, RetryMode retryMode) { IcebergTableHandle table = (IcebergTableHandle) tableHandle; - Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + Table icebergTable = loadTable(session, table.getSchemaTableName()); validateNotModifyingOldSnapshot(table, icebergTable); validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec()); @@ -1220,7 +1220,7 @@ public Optional getTableHandleForExecute( "Cannot execute table procedure %s on non-DATA table: %s", procedureName, tableHandle.getTableType()); - Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + Table icebergTable = loadTable(session, tableHandle.getSchemaTableName()); if (tableHandle.getSnapshotId().isPresent() && (tableHandle.getSnapshotId().get() != icebergTable.currentSnapshot().snapshotId())) { throw new TrinoException( @@ -1271,7 +1271,7 @@ private Optional getTableHandleForOptimize( private Optional getTableHandleForDropExtendedStats( ConnectorSession session, IcebergTableHandle tableHandle) { - Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + Table icebergTable = loadTable(session, tableHandle.getSchemaTableName()); return Optional.of( new IcebergTableExecuteHandle( @@ -1286,7 +1286,7 @@ private Optional getTableHandleForExpireSnapshots( IcebergTableHandle tableHandle, Map executeProperties) { Duration retentionThreshold = (Duration) executeProperties.get(RETENTION_THRESHOLD); - Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + Table icebergTable = loadTable(session, tableHandle.getSchemaTableName()); return Optional.of( new IcebergTableExecuteHandle( @@ -1301,7 +1301,7 @@ private Optional getTableHandleForRemoveOrphanFiles IcebergTableHandle tableHandle, Map executeProperties) { Duration retentionThreshold = (Duration) executeProperties.get(RETENTION_THRESHOLD); - Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + Table icebergTable = loadTable(session, tableHandle.getSchemaTableName()); return Optional.of( new IcebergTableExecuteHandle( @@ -1329,7 +1329,7 @@ public Optional getLayoutForTableExecute( private Optional getLayoutForOptimize( ConnectorSession session, IcebergTableExecuteHandle executeHandle) { - Table icebergTable = catalog.loadTable(session, executeHandle.getSchemaTableName()); + Table icebergTable = loadTable(session, executeHandle.getSchemaTableName()); // from performance perspective it is better to have lower number of bigger files than other way // around // thus we force repartitioning for optimize to achieve this @@ -1360,7 +1360,7 @@ private BeginTableExecuteResult> properties) { IcebergTableHandle table = (IcebergTableHandle) tableHandle; - Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + Table icebergTable = loadTable(session, table.getSchemaTableName()); transaction = icebergTable.newTransaction(); UpdateProperties updateProperties = transaction.updateProperties(); @@ -1848,7 +1848,7 @@ private static void updateProperty( public void addColumn( ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column) { Table icebergTable = - catalog.loadTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName()); + loadTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName()); icebergTable .updateSchema() .addColumn(column.getName(), toIcebergType(column.getType()), column.getComment()) @@ -1860,7 +1860,7 @@ public void dropColumn( ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column) { IcebergColumnHandle handle = (IcebergColumnHandle) column; Table icebergTable = - catalog.loadTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName()); + loadTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName()); icebergTable.updateSchema().deleteColumn(handle.getName()).commit(); } @@ -1872,7 +1872,7 @@ public void renameColumn( String target) { IcebergColumnHandle columnHandle = (IcebergColumnHandle) source; Table icebergTable = - catalog.loadTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName()); + loadTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName()); icebergTable.updateSchema().renameColumn(columnHandle.getName(), target).commit(); } @@ -1968,7 +1968,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata( public ConnectorTableHandle beginStatisticsCollection( ConnectorSession session, ConnectorTableHandle tableHandle) { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; - Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName()); + Table icebergTable = loadTable(session, handle.getSchemaTableName()); beginTransaction(icebergTable); return handle; } @@ -2168,7 +2168,7 @@ public ColumnHandle getMergeRowIdColumnHandle( public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle tableHandle) { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; - Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName()); + Table icebergTable = loadTable(session, handle.getSchemaTableName()); DeleteFiles deleteFiles = icebergTable @@ -2199,7 +2199,7 @@ public void rollback() { public Optional> applyFilter( ConnectorSession session, ConnectorTableHandle handle, Constraint constraint) { IcebergTableHandle table = (IcebergTableHandle) handle; - Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + Table icebergTable = loadTable(session, table.getSchemaTableName()); Set partitionSourceIds = identityPartitionColumnsInAllSpecs(icebergTable); BiPredicate isIdentityPartition = @@ -2449,7 +2449,7 @@ public TableStatistics getTableStatistics( originalHandle.isRecordScannedFiles(), originalHandle.getMaxScannedFileSize()), handle -> { - Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName()); + Table icebergTable = loadTable(session, handle.getSchemaTableName()); return TableStatisticsReader.getTableStatistics( typeManager, session, handle, icebergTable); }); @@ -2466,7 +2466,7 @@ private Optional getCurrentSnapshotId(Table table) { } Table getIcebergTable(ConnectorSession session, SchemaTableName schemaTableName) { - return catalog.loadTable(session, schemaTableName); + return loadTable(session, schemaTableName); } @Override @@ -2497,7 +2497,7 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView( List sourceTableHandles, RetryMode retryMode) { IcebergTableHandle table = (IcebergTableHandle) tableHandle; - Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + Table icebergTable = loadTable(session, table.getSchemaTableName()); beginTransaction(icebergTable); return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); @@ -2626,7 +2626,7 @@ public void renameMaterializedView( public Optional getTableToken( ConnectorSession session, ConnectorTableHandle tableHandle) { IcebergTableHandle table = (IcebergTableHandle) tableHandle; - Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + Table icebergTable = loadTable(session, table.getSchemaTableName()); return Optional.ofNullable(icebergTable.currentSnapshot()) .map(snapshot -> new TableToken(snapshot.snapshotId())); } @@ -2663,7 +2663,7 @@ public MaterializedViewFreshness getMaterializedViewFreshness( "Storage table missing in definition of materialized view " + materializedViewName)); - Table icebergTable = catalog.loadTable(session, storageTableName); + Table icebergTable = loadTable(session, storageTableName); String dependsOnTables = icebergTable.currentSnapshot().summary().getOrDefault(DEPENDS_ON_TABLES, ""); if (dependsOnTables.isEmpty()) { @@ -2755,6 +2755,10 @@ private void beginTransaction(Table icebergTable) { transaction = icebergTable.newTransaction(); } + private Table loadTable(ConnectorSession session, SchemaTableName tableName) { + return tableCache.computeIfAbsent(tableName, ignore -> catalog.loadTable(session, tableName)); + } + private static class TableToken { // Current Snapshot ID of the table private final long snapshotId;