Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1906]Refactor code of "Table" and "Catalog" to support Paimon format integration #1960

Merged
merged 39 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
d2ca938
Integration paimon
shidayang Sep 12, 2023
3f93f96
Merge remote-tracking branch 'origin/master' into paimon-integration-…
shidayang Sep 12, 2023
59be5cd
Integration paimon
shidayang Sep 12, 2023
4639621
Integration paimon
shidayang Sep 13, 2023
06926dc
Integration paimon
shidayang Sep 13, 2023
0df88f7
tmp
shidayang Sep 13, 2023
ecaa102
Support registering for the Paimon Catalog
shidayang Sep 13, 2023
ebe4b7e
Merge branch 'master' into paimon-integration-copy
shidayang Sep 13, 2023
0c6b09a
Support registering for the Paimon Catalog
shidayang Sep 14, 2023
da3a94c
Merge remote-tracking branch 'shidayang/paimon-integration-copy' into…
shidayang Sep 14, 2023
5f66917
Merge branch 'master' into paimon-integration-copy
shidayang Sep 14, 2023
4bb61cf
Support registering for the Paimon Catalog
shidayang Sep 14, 2023
55de2ae
Merge remote-tracking branch 'shidayang/paimon-integration-copy' into…
shidayang Sep 14, 2023
2b37a6e
Support registering for the Paimon Catalog
shidayang Sep 14, 2023
cb0c241
Fix ut
shidayang Sep 14, 2023
587d23f
Polish code
shidayang Sep 15, 2023
bde9ec9
Merge branch 'master' into paimon-integration-copy
shidayang Sep 15, 2023
9720442
Fix compile error
shidayang Sep 15, 2023
80e9a75
Merge remote-tracking branch 'shidayang/paimon-integration-copy' into…
shidayang Sep 15, 2023
f1df45a
Merge remote-tracking branch 'origin/master' into paimon-integration-…
shidayang Sep 19, 2023
ee9c632
Merge master
shidayang Sep 19, 2023
b3df91a
polish code
shidayang Sep 20, 2023
9ccfaed
Merge remote-tracking branch 'origin/master' into paimon-integration-…
shidayang Sep 20, 2023
2aea58f
Merge master
shidayang Sep 20, 2023
2b8088f
Fix UT
shidayang Sep 20, 2023
7ec198e
Merge branch 'master' into paimon-integration-copy
shidayang Sep 20, 2023
722f0ff
Polish code
shidayang Sep 25, 2023
2686ef3
Merge branch 'master' into paimon-integration-copy
shidayang Sep 25, 2023
cfc36ce
Polish code
shidayang Sep 25, 2023
52c7923
polish code
shidayang Oct 9, 2023
3e5982e
Merge remote-tracking branch 'origin/master' into paimon-integration-…
shidayang Oct 9, 2023
d43afd1
Merge remote-tracking branch 'origin/master' into paimon-integration-…
shidayang Oct 9, 2023
4c5457a
polish code
shidayang Oct 9, 2023
d8b769e
Merge branch 'master' into paimon-integration-copy
zhoujinsong Oct 9, 2023
661b136
Update ams/server/src/main/java/com/netease/arctic/server/dashboard/M…
shidayang Oct 9, 2023
2306f02
Update ams/server/src/main/java/com/netease/arctic/server/dashboard/P…
shidayang Oct 9, 2023
5f87798
tmp
shidayang Oct 9, 2023
0b2c393
polish code
shidayang Oct 9, 2023
a5beb36
polish code
shidayang Oct 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.netease.arctic.server;

import com.google.common.base.Preconditions;
import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.OptimizerRegisterInfo;
import com.netease.arctic.ams.api.OptimizingService;
Expand All @@ -45,7 +46,6 @@
import com.netease.arctic.server.table.TableRuntime;
import com.netease.arctic.server.table.TableRuntimeMeta;
import com.netease.arctic.server.utils.Configurations;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.TableProperties;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -331,7 +331,7 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or
}

@Override
public void handleTableAdded(ArcticTable table, TableRuntime tableRuntime) {
public void handleTableAdded(AmoroTable<?> table, TableRuntime tableRuntime) {
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup()).ifPresent(q -> q.refreshTable(tableRuntime));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public static ServerCatalog buildServerCatalog(CatalogMeta catalogMeta, Configur
Set<TableFormat> tableFormats = CatalogUtil.tableFormats(catalogMeta);
TableFormat tableFormat = tableFormats.iterator().next();

if (tableFormat == TableFormat.PAIMON) {
return new PaimonServerCatalog(catalogMeta);
}

switch (type) {
case CATALOG_TYPE_HADOOP:
if (TableFormat.ICEBERG == tableFormat) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.netease.arctic.server.catalog;

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.TableIdentifier;
import com.netease.arctic.catalog.IcebergCatalogWrapper;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.formats.iceberg.IcebergTable;
import com.netease.arctic.utils.CatalogUtil;
import org.apache.iceberg.Table;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -55,8 +57,11 @@ public List<TableIdentifier> toAmsIdList(List<com.netease.arctic.table.TableIden
}

@Override
public ArcticTable loadTable(String database, String tableName) {
return catalogWrapper.loadTable(com.netease.arctic.table.TableIdentifier.of(catalogWrapper.name(), database,
tableName));
public AmoroTable<Table> loadTable(String database, String tableName) {
com.netease.arctic.table.TableIdentifier identifier = com.netease.arctic.table.TableIdentifier.of(
catalogWrapper.name(),
database,
tableName);
return new IcebergTable(identifier, catalogWrapper.loadTable(identifier).asUnkeyedTable());
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.netease.arctic.server.catalog;

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.catalog.IcebergCatalogWrapper;
import com.netease.arctic.formats.iceberg.IcebergTable;
import com.netease.arctic.io.ArcticFileIO;
import com.netease.arctic.io.ArcticFileIOAdapter;
import com.netease.arctic.server.ArcticManagementConf;
Expand All @@ -11,7 +13,6 @@
import com.netease.arctic.server.table.TableMetadata;
import com.netease.arctic.server.utils.Configurations;
import com.netease.arctic.server.utils.IcebergTableUtil;
import com.netease.arctic.table.ArcticTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -54,7 +55,7 @@ public void updateMetadata(CatalogMeta metadata) {
}

@Override
public ArcticTable loadTable(String database, String tableName) {
public AmoroTable<?> loadTable(String database, String tableName) {
TableMetadata tableMetadata = getAs(TableMetaMapper.class, mapper ->
mapper.selectTableMetaByName(getMetadata().getCatalogName(), database, tableName));
if (tableMetadata == null) {
Expand All @@ -64,9 +65,11 @@ public ArcticTable loadTable(String database, String tableName) {
ArcticFileIO fileIO = new ArcticFileIOAdapter(io);
TableOperations ops = InternalTableOperations.buildForLoad(tableMetadata, io);
BaseTable table = new BaseTable(ops, TableIdentifier.of(database, tableName).toString());
return new IcebergCatalogWrapper.BasicIcebergTable(
com.netease.arctic.table.TableIdentifier.of(name(), database, tableName),
table, fileIO, getMetadata().getCatalogProperties()
com.netease.arctic.table.TableIdentifier tableIdentifier =
com.netease.arctic.table.TableIdentifier.of(name(), database, tableName);
return new IcebergTable(tableIdentifier,
new IcebergCatalogWrapper.BasicIcebergTable(
tableIdentifier, table, fileIO, getMetadata().getCatalogProperties())
shidayang marked this conversation as resolved.
Show resolved Hide resolved
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.netease.arctic.server.catalog;

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.catalog.MixedTables;
import com.netease.arctic.formats.mixed.MixedIcebergTable;
import com.netease.arctic.server.persistence.mapper.TableMetaMapper;
import com.netease.arctic.server.table.TableMetadata;
import com.netease.arctic.table.ArcticTable;

public class InternalMixedCatalogImpl extends InternalCatalog {

private final MixedTables tables;
protected final MixedTables tables;

protected InternalMixedCatalogImpl(CatalogMeta metadata) {
super(metadata);
Expand All @@ -27,10 +28,13 @@ public void updateMetadata(CatalogMeta metadata) {
}

@Override
public ArcticTable loadTable(String database, String tableName) {
public AmoroTable<?> loadTable(String database, String tableName) {
TableMetadata tableMetadata = getAs(TableMetaMapper.class, mapper ->
mapper.selectTableMetaByName(getMetadata().getCatalogName(), database, tableName));
return tableMetadata == null ? null : tables.loadTableByMeta(tableMetadata.buildTableMeta());
if (tableMetadata == null) {
return null;
}
return new MixedIcebergTable(tables.loadTableByMeta(tableMetadata.buildTableMeta()));
}

protected MixedTables tables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

package com.netease.arctic.server.catalog;

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.formats.mixed.MixedHiveTable;
import com.netease.arctic.hive.CachedHiveClientPool;
import com.netease.arctic.hive.HMSClient;
import com.netease.arctic.hive.catalog.MixedHiveTables;
import com.netease.arctic.server.persistence.mapper.TableMetaMapper;
import com.netease.arctic.server.table.TableMetadata;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.thrift.TException;

Expand All @@ -42,6 +46,16 @@ public void updateMetadata(CatalogMeta metadata) {
hiveClientPool = ((MixedHiveTables) tables()).getHiveClientPool();
}

@Override
public AmoroTable<?> loadTable(String database, String tableName) {
TableMetadata tableMetadata = getAs(TableMetaMapper.class, mapper ->
mapper.selectTableMetaByName(getMetadata().getCatalogName(), database, tableName));
if (tableMetadata == null) {
return null;
}
return new MixedHiveTable(tables.loadTableByMeta(tableMetadata.buildTableMeta()));
}

@Override
public void createDatabase(String databaseName) {
// do not handle database operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

package com.netease.arctic.server.catalog;

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.TableIdentifier;
import com.netease.arctic.formats.mixed.MixedIcebergTable;
import com.netease.arctic.mixed.BasicMixedIcebergCatalog;
import com.netease.arctic.table.ArcticTable;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -71,7 +72,8 @@ public List<TableIdentifier> listTables(String database) {
}

@Override
public ArcticTable loadTable(String database, String tableName) {
return mixedIcebergCatalog.loadTable(com.netease.arctic.table.TableIdentifier.of(name(), database, tableName));
public AmoroTable<?> loadTable(String database, String tableName) {
return new MixedIcebergTable(mixedIcebergCatalog.loadTable(
com.netease.arctic.table.TableIdentifier.of(name(), database, tableName)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.netease.arctic.server.catalog;

import com.netease.arctic.AmoroTable;
import com.netease.arctic.CommonUnifiedCatalog;
import com.netease.arctic.UnifiedCatalog;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.TableIdentifier;
import com.netease.arctic.table.TableMetaStore;
import com.netease.arctic.utils.CatalogUtil;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

public class PaimonServerCatalog extends ExternalCatalog {
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved

private volatile UnifiedCatalog paimonCatalog;

private volatile TableMetaStore tableMetaStore;

protected PaimonServerCatalog(CatalogMeta metadata) {
super(metadata);
this.tableMetaStore = CatalogUtil.buildMetaStore(metadata);
this.paimonCatalog = doAs(() -> new CommonUnifiedCatalog(
null,
metadata,
metadata.catalogProperties
));
}

@Override
public void updateMetadata(CatalogMeta metadata) {
super.updateMetadata(metadata);
this.tableMetaStore = CatalogUtil.buildMetaStore(metadata);
this.paimonCatalog = doAs(() -> new CommonUnifiedCatalog(
null,
metadata,
metadata.catalogProperties
));
}

@Override
public boolean exist(String database) {
return doAs(() -> paimonCatalog.exist(database));
}

@Override
public boolean exist(String database, String tableName) {
return doAs(() -> paimonCatalog.exist(database, tableName));
}

@Override
public List<String> listDatabases() {
return doAs(() -> paimonCatalog.listDatabases());
}

@Override
public List<TableIdentifier> listTables() {
return doAs(() -> paimonCatalog.listDatabases()
.stream()
.map(this::listTables)
.flatMap(List::stream)
.collect(Collectors.toList()));
}

@Override
public List<TableIdentifier> listTables(String database) {
return doAs(() -> paimonCatalog.listTableMetas(database)
.stream()
.map(t -> t.getIdentifier().buildTableIdentifier())
.collect(Collectors.toList()));
}

@Override
public AmoroTable<?> loadTable(String database, String tableName) {
return doAs(() -> paimonCatalog.loadTable(database, tableName));
}

private <T> T doAs(Callable<T> callable) {
return tableMetaStore.doAs(callable);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.netease.arctic.server.catalog;

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.TableIdentifier;
import com.netease.arctic.server.persistence.PersistentBase;
import com.netease.arctic.server.persistence.mapper.CatalogMetaMapper;
import com.netease.arctic.table.ArcticTable;

import java.util.List;

Expand Down Expand Up @@ -40,5 +40,5 @@ public void updateMetadata(CatalogMeta metadata) {

public abstract List<TableIdentifier> listTables(String database);

public abstract ArcticTable loadTable(String database, String tableName);
public abstract AmoroTable<?> loadTable(String database, String tableName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.netease.arctic.server.dashboard;

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.server.dashboard.model.DDLInfo;
import com.netease.arctic.server.dashboard.model.PartitionBaseInfo;
import com.netease.arctic.server.dashboard.model.PartitionFileBaseInfo;
import com.netease.arctic.server.dashboard.model.ServerTableMeta;
import com.netease.arctic.server.dashboard.model.TransactionsOfTable;
import java.util.List;

public interface FormatTableDescriptor {
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved
List<TableFormat> supportFormat();

ServerTableMeta getTableDetail(AmoroTable<?> amoroTable);

List<TransactionsOfTable> getTransactions(AmoroTable<?> amoroTable);

List<PartitionFileBaseInfo> getTransactionDetail(AmoroTable<?> amoroTable, long transactionId);

List<DDLInfo> getTableOperations(AmoroTable<?> amoroTable);

List<PartitionBaseInfo> getTablePartition(AmoroTable<?> amoroTable);

List<PartitionFileBaseInfo> getTableFile(AmoroTable<?> amoroTable, String partition);
}
Loading