Skip to content

Commit

Permalink
[Improvement]: Using unified catalog for server external catalog. (#2232
Browse files Browse the repository at this point in the history
)

* Using unified catalog for server external catalog.

* fix hard code texts.

* spotless:apply

* add deprecated of ArcticCatalog initialize

* remove useless initialize

* remove useless code.

* remove catalog.refresh()

* spotless:apply

* fix unit tests.

* catalog impl to private

* build catalog to private

* remove IcebergCatalogWrapper.java

* spotless:apply

* remove useless comments.

* remove useless code.

* spotless:apply

* fix unit tests in core

* fix unit tests.

* apply spotless
  • Loading branch information
baiyangtx authored Nov 8, 2023
1 parent 6a198b3 commit 91ce56d
Show file tree
Hide file tree
Showing 74 changed files with 1,281 additions and 1,568 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class CatalogMetaProperties {
public static final long CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);

// only used for unified catalog
public static final String AMS_URI = "ams.uri";

// only used for engine properties
public static final String LOAD_AUTH_FROM_AMS = "auth.load-from-ams";
public static final boolean LOAD_AUTH_FROM_AMS_DEFAULT = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,60 @@
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.server.utils.Configurations;
import com.netease.arctic.utils.CatalogUtil;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

import java.util.Map;
import java.util.Set;

public class CatalogBuilder {

// TODO: use internal or external concepts
/** matrix of catalog type and supported table formats */
private static final Map<String, Set<TableFormat>> formatSupportedMatrix =
ImmutableMap.of(
CATALOG_TYPE_HADOOP,
Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.PAIMON),
CATALOG_TYPE_GLUE, Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG),
CATALOG_TYPE_CUSTOM, Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG),
CATALOG_TYPE_HIVE,
Sets.newHashSet(
TableFormat.ICEBERG,
TableFormat.MIXED_ICEBERG,
TableFormat.MIXED_HIVE,
TableFormat.PAIMON),
CATALOG_TYPE_AMS, Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG));

public static ServerCatalog buildServerCatalog(
CatalogMeta catalogMeta, Configurations serverConfiguration) {
String type = catalogMeta.getCatalogType();
Set<TableFormat> tableFormats = CatalogUtil.tableFormats(catalogMeta);
TableFormat tableFormat = tableFormats.iterator().next();

if (tableFormat == TableFormat.PAIMON) {
return new PaimonServerCatalog(catalogMeta);
}
Preconditions.checkState(
formatSupportedMatrix.containsKey(type), "unsupported catalog type: %s", type);
Preconditions.checkState(
tableFormats.size() == 1,
"only 1 types format is supported: %s",
Joiner.on(",").join(tableFormats));

Set<TableFormat> supportedFormats = formatSupportedMatrix.get(type);
TableFormat tableFormat = tableFormats.iterator().next();
Preconditions.checkState(
supportedFormats.contains(tableFormat),
"Table format %s is not supported for metastore type: %s",
tableFormat,
type);

switch (type) {
case CATALOG_TYPE_HADOOP:
if (TableFormat.ICEBERG == tableFormat) {
return new IcebergCatalogImpl(catalogMeta);
} else if (TableFormat.MIXED_ICEBERG == tableFormat) {
return new MixedIcebergCatalogImpl(catalogMeta);
} else {
throw new IllegalStateException(
"Hadoop catalog support iceberg/mixed-iceberg table only.");
}

case CATALOG_TYPE_GLUE:
case CATALOG_TYPE_CUSTOM:
return new ExternalCatalog(catalogMeta);
case CATALOG_TYPE_HIVE:
if (tableFormat.equals(TableFormat.ICEBERG)) {
return new IcebergCatalogImpl(catalogMeta);
} else if (tableFormat.equals(TableFormat.MIXED_HIVE)) {
if (tableFormat.equals(TableFormat.MIXED_HIVE)) {
return new MixedHiveCatalogImpl(catalogMeta);
} else if (TableFormat.MIXED_ICEBERG == tableFormat) {
return new MixedIcebergCatalogImpl(catalogMeta);
} else {
throw new IllegalArgumentException(
"Hive Catalog support iceberg/mixed-hive/mixed-iceberg table only");
}
return new ExternalCatalog(catalogMeta);
case CATALOG_TYPE_AMS:
if (tableFormat.equals(TableFormat.MIXED_ICEBERG)) {
return new InternalMixedCatalogImpl(catalogMeta);
Expand All @@ -58,26 +73,6 @@ public static ServerCatalog buildServerCatalog(
} else {
throw new IllegalStateException("AMS catalog support iceberg/mixed-iceberg table only.");
}
case CATALOG_TYPE_GLUE:
if (TableFormat.ICEBERG == tableFormat) {
return new IcebergCatalogImpl(catalogMeta);
} else if (TableFormat.MIXED_ICEBERG == tableFormat) {
return new MixedIcebergCatalogImpl(catalogMeta);
} else {
throw new IllegalStateException("Glue catalog support iceberg table only.");
}
case CATALOG_TYPE_CUSTOM:
Preconditions.checkArgument(
catalogMeta.getCatalogProperties().containsKey(CatalogProperties.CATALOG_IMPL),
"Custom catalog properties must contains " + CatalogProperties.CATALOG_IMPL);
if (TableFormat.ICEBERG == tableFormat) {
return new IcebergCatalogImpl(catalogMeta);
} else if (TableFormat.MIXED_ICEBERG == tableFormat) {
return new MixedIcebergCatalogImpl(catalogMeta);
} else {
throw new IllegalStateException(
"Custom catalog support iceberg/mixed-iceberg table only.");
}
default:
throw new IllegalStateException("unsupported catalog type:" + type);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
package com.netease.arctic.server.catalog;

import com.netease.arctic.AmoroTable;
import com.netease.arctic.CommonUnifiedCatalog;
import com.netease.arctic.TableIDWithFormat;
import com.netease.arctic.UnifiedCatalog;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.server.persistence.mapper.TableMetaMapper;
import com.netease.arctic.server.table.ServerTableIdentifier;
import com.netease.arctic.table.TableMetaStore;
import com.netease.arctic.utils.CatalogUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public abstract class ExternalCatalog extends ServerCatalog {
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

public class ExternalCatalog extends ServerCatalog {

UnifiedCatalog unifiedCatalog;
TableMetaStore tableMetaStore;

protected ExternalCatalog(CatalogMeta metadata) {
super(metadata);
this.tableMetaStore = CatalogUtil.buildMetaStore(metadata);
this.unifiedCatalog =
this.tableMetaStore.doAs(
() -> new CommonUnifiedCatalog(this::getMetadata, Maps.newHashMap()));
}

public void syncTable(String database, String tableName, TableFormat format) {
Expand All @@ -29,4 +48,50 @@ public void disposeTable(String database, String tableName) {
TableMetaMapper.class,
mapper -> mapper.deleteTableIdByName(getMetadata().getCatalogName(), database, tableName));
}

@Override
public void updateMetadata(CatalogMeta metadata) {
super.updateMetadata(metadata);
this.tableMetaStore = CatalogUtil.buildMetaStore(metadata);
this.unifiedCatalog.refresh();
}

@Override
public boolean exist(String database) {
return doAs(() -> unifiedCatalog.exist(database));
}

@Override
public boolean exist(String database, String tableName) {
return doAs(() -> unifiedCatalog.exist(database, tableName));
}

@Override
public List<String> listDatabases() {
return doAs(() -> unifiedCatalog.listDatabases());
}

@Override
public List<TableIDWithFormat> listTables() {
return doAs(
() ->
unifiedCatalog.listDatabases().stream()
.map(this::listTables)
.flatMap(List::stream)
.collect(Collectors.toList()));
}

@Override
public List<TableIDWithFormat> listTables(String database) {
return doAs(() -> new ArrayList<>(unifiedCatalog.listTables(database)));
}

@Override
public AmoroTable<?> loadTable(String database, String tableName) {
return doAs(() -> unifiedCatalog.loadTable(database, tableName));
}

private <T> T doAs(Callable<T> callable) {
return tableMetaStore.doAs(callable);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.catalog.IcebergCatalogWrapper;
import com.netease.arctic.formats.iceberg.IcebergTable;
import com.netease.arctic.io.ArcticFileIO;
import com.netease.arctic.io.ArcticFileIOAdapter;
import com.netease.arctic.server.ArcticManagementConf;
import com.netease.arctic.server.IcebergRestCatalogService;
import com.netease.arctic.server.iceberg.InternalTableOperations;
import com.netease.arctic.server.persistence.mapper.TableMetaMapper;
import com.netease.arctic.server.table.TableMetadata;
import com.netease.arctic.server.utils.Configurations;
import com.netease.arctic.server.utils.IcebergTableUtil;
import com.netease.arctic.utils.CatalogUtil;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -64,15 +62,16 @@ public AmoroTable<?> loadTable(String database, String tableName) {
return null;
}
FileIO io = IcebergTableUtil.newIcebergFileIo(getMetadata());
ArcticFileIO fileIO = new ArcticFileIOAdapter(io);
TableOperations ops = InternalTableOperations.buildForLoad(tableMetadata, io);
BaseTable table = new BaseTable(ops, TableIdentifier.of(database, tableName).toString());
com.netease.arctic.table.TableIdentifier tableIdentifier =
com.netease.arctic.table.TableIdentifier.of(name(), database, tableName);
return new IcebergTable(

return IcebergTable.newIcebergTable(
tableIdentifier,
new IcebergCatalogWrapper.BasicIcebergTable(
tableIdentifier, table, fileIO, getMetadata().getCatalogProperties()));
table,
CatalogUtil.buildMetaStore(getMetadata()),
getMetadata().getCatalogProperties());
}

private String defaultRestURI() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,33 @@

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.catalog.MixedTables;
import com.netease.arctic.formats.mixed.MixedIcebergTable;
import com.netease.arctic.formats.mixed.MixedTable;
import com.netease.arctic.server.persistence.mapper.TableMetaMapper;
import com.netease.arctic.server.table.TableMetadata;
import com.netease.arctic.table.TableMetaStore;
import com.netease.arctic.utils.CatalogUtil;

import java.util.Map;

public class InternalMixedCatalogImpl extends InternalCatalog {

protected final MixedTables tables;
protected MixedTables tables;

protected InternalMixedCatalogImpl(CatalogMeta metadata) {
super(metadata);
this.tables = new MixedTables(metadata);
this.tables = newTables(metadata.getCatalogProperties(), CatalogUtil.buildMetaStore(metadata));
}

protected InternalMixedCatalogImpl(CatalogMeta metadata, MixedTables tables) {
super(metadata);
this.tables = tables;
protected MixedTables newTables(Map<String, String> catalogProperties, TableMetaStore metaStore) {
return new MixedTables(catalogProperties, metaStore);
}

@Override
public void updateMetadata(CatalogMeta metadata) {
super.updateMetadata(metadata);
this.tables.refreshCatalogMeta(getMetadata());
this.tables = newTables(metadata.getCatalogProperties(), CatalogUtil.buildMetaStore(metadata));
}

@Override
Expand All @@ -37,7 +41,8 @@ public AmoroTable<?> loadTable(String database, String tableName) {
if (tableMetadata == null) {
return null;
}
return new MixedIcebergTable(tables.loadTableByMeta(tableMetadata.buildTableMeta()));
return new MixedTable(
tables.loadTableByMeta(tableMetadata.buildTableMeta()), TableFormat.MIXED_ICEBERG);
}

protected MixedTables tables() {
Expand Down
Loading

0 comments on commit 91ce56d

Please sign in to comment.