Skip to content

Commit

Permalink
Merge branch 'master' into amoro-2290
Browse files Browse the repository at this point in the history
  • Loading branch information
baiyangtx authored Nov 23, 2023
2 parents 1f2496f + 244c1cf commit 7985602
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import com.netease.arctic.table.blocker.TableBlockerManager;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
Expand All @@ -47,6 +49,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;

import java.util.List;
import java.util.Map;
Expand All @@ -63,6 +66,7 @@ public class BasicMixedIcebergCatalog implements ArcticCatalog {
private Pattern databaseFilterPattern;
private AmsClient client;
private MixedTables tables;
private SupportsNamespaces asNamespaceCatalog;

@Override
public String name() {
Expand All @@ -71,18 +75,46 @@ public String name() {

@Override
public void initialize(String name, Map<String, String> properties, TableMetaStore metaStore) {
boolean cacheEnabled =
PropertyUtil.propertyAsBoolean(
properties, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);

boolean cacheCaseSensitive =
PropertyUtil.propertyAsBoolean(
properties,
CatalogProperties.CACHE_CASE_SENSITIVE,
CatalogProperties.CACHE_CASE_SENSITIVE_DEFAULT);

long cacheExpirationIntervalMs =
PropertyUtil.propertyAsLong(
properties,
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);

// An expiration interval of 0ms effectively disables caching.
// Do not wrap with CachingCatalog.
if (cacheExpirationIntervalMs == 0) {
cacheEnabled = false;
}
Pattern databaseFilterPattern = null;
if (properties.containsKey(CatalogMetaProperties.KEY_DATABASE_FILTER_REGULAR_EXPRESSION)) {
String databaseFilter =
properties.get(CatalogMetaProperties.KEY_DATABASE_FILTER_REGULAR_EXPRESSION);
databaseFilterPattern = Pattern.compile(databaseFilter);
}
Catalog catalog = buildIcebergCatalog(name, properties, metaStore.getConfiguration());
this.name = name;
this.tableMetaStore = metaStore;
this.icebergCatalog = buildIcebergCatalog(name, properties, metaStore.getConfiguration());
this.icebergCatalog =
cacheEnabled
? CachingCatalog.wrap(catalog, cacheCaseSensitive, cacheExpirationIntervalMs)
: catalog;
if (catalog instanceof SupportsNamespaces) {
this.asNamespaceCatalog = (SupportsNamespaces) catalog;
}
this.databaseFilterPattern = databaseFilterPattern;
this.catalogProperties = properties;
this.tables = newMixedTables(metaStore, properties, icebergCatalog);
this.tables = newMixedTables(metaStore, properties, icebergCatalog());
if (properties.containsKey(CatalogMetaProperties.AMS_URI)) {
this.client = new PooledAmsClient(properties.get(CatalogMetaProperties.AMS_URI));
}
Expand Down Expand Up @@ -218,14 +250,13 @@ private org.apache.iceberg.catalog.TableIdentifier toIcebergTableIdentifier(
}

private SupportsNamespaces asNamespaceCatalog() {
Catalog icebergCatalog = icebergCatalog();
if (!(icebergCatalog instanceof SupportsNamespaces)) {
if (asNamespaceCatalog == null) {
throw new UnsupportedOperationException(
String.format(
"Iceberg catalog: %s doesn't implement SupportsNamespaces",
icebergCatalog.getClass().getName()));
icebergCatalog().getClass().getName()));
}
return (SupportsNamespaces) icebergCatalog;
return asNamespaceCatalog;
}

private class MixedIcebergTableBuilder implements TableBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
Expand All @@ -81,6 +82,8 @@ public class ArcticConnectorMetadata implements ConnectorMetadata {

private final ArcticCatalog arcticCatalog;

private final Map<SchemaTableName, ArcticTable> tableCache = new ConcurrentHashMap<>();

public ArcticConnectorMetadata(
KeyedConnectorMetadata keyedConnectorMetadata,
IcebergMetadata icebergMetadata,
Expand All @@ -99,8 +102,7 @@ public List<String> listSchemaNames(ConnectorSession session) {

@Override
public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
// 需要缓存
ArcticTable arcticTable = null;
ArcticTable arcticTable;
try {
arcticTable = getArcticTable(tableName);
} catch (NoSuchTableException e) {
Expand Down Expand Up @@ -169,7 +171,7 @@ public ColumnMetadata getColumnMetadata(
public Iterator<TableColumnsMetadata> streamTableColumns(
ConnectorSession session, SchemaTablePrefix prefix) {
if (prefix.getTable().isPresent()) {
ArcticTable arcticTable = null;
ArcticTable arcticTable;
try {
arcticTable =
getArcticTable(new SchemaTableName(prefix.getSchema().get(), prefix.getTable().get()));
Expand Down Expand Up @@ -452,7 +454,8 @@ public void rollback() {
}

public ArcticTable getArcticTable(SchemaTableName schemaTableName) {
return arcticCatalog.loadTable(getTableIdentifier(schemaTableName));
return tableCache.computeIfAbsent(
schemaTableName, ignore -> arcticCatalog.loadTable(getTableIdentifier(schemaTableName)));
}

private TableIdentifier getTableIdentifier(SchemaTableName schemaTableName) {
Expand Down
Loading

0 comments on commit 7985602

Please sign in to comment.