diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalog.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalog.java
new file mode 100644
index 0000000000..c2db33ab5b
--- /dev/null
+++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalog.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.flink.catalog;
+
+import static com.netease.arctic.ams.api.Constants.THRIFT_TABLE_SERVICE_NAME;
+import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT;
+
+import com.netease.arctic.AlreadyExistsException;
+import com.netease.arctic.AmoroTable;
+import com.netease.arctic.NoSuchDatabaseException;
+import com.netease.arctic.NoSuchTableException;
+import com.netease.arctic.UnifiedCatalog;
+import com.netease.arctic.UnifiedCatalogLoader;
+import com.netease.arctic.ams.api.TableFormat;
+import com.netease.arctic.ams.api.client.ArcticThriftUrl;
+import com.netease.arctic.flink.table.AmoroDynamicTableFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** This is a Flink catalog wrap a unified catalog. */
+public class FlinkUnifiedCatalog extends AbstractCatalog {
+ private UnifiedCatalog unifiedCatalog;
+ private final String amsUri;
+ private final String amoroCatalogName;
+ /**
+ * Available Flink catalogs for Unified Catalog.
+ *
+ *
May include: Iceberg, Mixed and Paimon Catalogs, etc.
+ */
+ private final Map availableCatalogs;
+
+ public FlinkUnifiedCatalog(
+ String amsUri,
+ String name,
+ String defaultDatabase,
+ Map availableCatalogs) {
+ super(name, defaultDatabase);
+ this.amsUri = amsUri;
+ this.amoroCatalogName = ArcticThriftUrl.parse(amsUri, THRIFT_TABLE_SERVICE_NAME).catalogName();
+ this.availableCatalogs = availableCatalogs;
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ unifiedCatalog =
+ UnifiedCatalogLoader.loadUnifiedCatalog(amsUri, amoroCatalogName, Maps.newHashMap());
+ availableCatalogs.forEach((tableFormat, catalog) -> catalog.open());
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (availableCatalogs != null) {
+ availableCatalogs.forEach((tableFormat, catalog) -> catalog.close());
+ }
+ }
+
+ @Override
+ public List listDatabases() {
+ return unifiedCatalog.listDatabases();
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) {
+ throw new UnsupportedOperationException("Unsupported operation: get database.");
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) {
+ return unifiedCatalog.exist(databaseName);
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException {
+ try {
+ unifiedCatalog.createDatabase(name);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new DatabaseAlreadyExistException(getName(), name);
+ }
+ }
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotExistException {
+ try {
+ unifiedCatalog.dropDatabase(name);
+ } catch (NoSuchDatabaseException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ }
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) {
+ throw new UnsupportedOperationException("Unsupported operation: alter database.");
+ }
+
+ @Override
+ public List listTables(String databaseName) {
+ return unifiedCatalog.listTables(databaseName).stream()
+ .map(table -> table.getIdentifier().getTableName())
+ .collect(java.util.stream.Collectors.toList());
+ }
+
+ @Override
+ public List listViews(String databaseName) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ AmoroTable> amoroTable =
+ unifiedCatalog.loadTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+ AbstractCatalog catalog = availableCatalogs.get(amoroTable.format());
+ if (catalog == null) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported operation: get table [%s], %s: %s.",
+ tablePath, TABLE_FORMAT.key(), amoroTable.format()));
+ }
+ CatalogBaseTable catalogBaseTable = catalog.getTable(tablePath);
+ catalogBaseTable.getOptions().put(TABLE_FORMAT.key(), amoroTable.format().toString());
+ return catalogBaseTable;
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) {
+ try {
+ return unifiedCatalog.exist(tablePath.getDatabaseName(), tablePath.getObjectName());
+ } catch (NoSuchDatabaseException | NoSuchTableException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException {
+ try {
+ unifiedCatalog.dropTable(tablePath.getDatabaseName(), tablePath.getObjectName(), true);
+ } catch (NoSuchTableException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ }
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () -> new UnsupportedOperationException("Unsupported operation: rename table."));
+ catalog.renameTable(tablePath, newTableName, ignoreIfNotExists);
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ Configuration configuration = new Configuration();
+ table.getOptions().forEach(configuration::setString);
+ TableFormat format = configuration.get(TABLE_FORMAT);
+ AbstractCatalog catalog = availableCatalogs.get(format);
+ if (catalog == null) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported operation: create table, %s: %s.", TABLE_FORMAT.key(), format));
+ }
+ catalog.createTable(tablePath, table, ignoreIfExists);
+ }
+
+ @Override
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () -> new UnsupportedOperationException("Unsupported operation: alter table."));
+ catalog.alterTable(tablePath, newTable, ignoreIfNotExists);
+ }
+
+ @Override
+ public List listPartitions(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () -> new UnsupportedOperationException("Unsupported operation: list partitions."));
+ return catalog.listPartitions(tablePath);
+ }
+
+ @Override
+ public List listPartitions(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException,
+ CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () -> new UnsupportedOperationException("Unsupported operation: list partitions."));
+ return catalog.listPartitions(tablePath, partitionSpec);
+ }
+
+ @Override
+ public List listPartitionsByFilter(
+ ObjectPath tablePath, List filters)
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ "Unsupported operation: list partitions by filter."));
+ return catalog.listPartitionsByFilter(tablePath, filters);
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () -> new UnsupportedOperationException("Unsupported operation: get partition."));
+ return catalog.getPartition(tablePath, partitionSpec);
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ return getOriginalCatalog(tablePath)
+ .map(catalog -> catalog.partitionExists(tablePath, partitionSpec))
+ .orElseThrow(
+ () -> new UnsupportedOperationException("Unsupported operation: partition exists."));
+ }
+
+ @Override
+ public void createPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition partition,
+ boolean ignoreIfExists)
+ throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException,
+ PartitionAlreadyExistsException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException("Unsupported operation: create partition."));
+ catalog.createPartition(tablePath, partitionSpec, partition, ignoreIfExists);
+ }
+
+ @Override
+ public void dropPartition(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () -> new UnsupportedOperationException("Unsupported operation: drop partition."));
+ catalog.dropPartition(tablePath, partitionSpec, ignoreIfNotExists);
+ }
+
+ @Override
+ public void alterPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition newPartition,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () -> new UnsupportedOperationException("Unsupported operation: alter partition."));
+ catalog.alterPartition(tablePath, partitionSpec, newPartition, ignoreIfNotExists);
+ }
+
+ @Override
+ public Optional getFactory() {
+ return Optional.of(new AmoroDynamicTableFactory(availableCatalogs));
+ }
+
+ @Override
+ public List listFunctions(String dbName) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException {
+ throw new FunctionNotExistException(getName(), functionPath);
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) {
+ return false;
+ }
+
+ @Override
+ public void createFunction(
+ ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) {
+ throw new UnsupportedOperationException("Unsupported operation: create function.");
+ }
+
+ @Override
+ public void alterFunction(
+ ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) {
+ throw new UnsupportedOperationException("Unsupported operation: alter function.");
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) {
+ throw new UnsupportedOperationException("Unsupported operation: drop function.");
+ }
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ "Unsupported operation: get table statistics."));
+ return catalog.getTableStatistics(tablePath);
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ "Unsupported operation: get table column statistics."));
+ return catalog.getTableColumnStatistics(tablePath);
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ "Unsupported operation: get partition statistics."));
+ return catalog.getPartitionStatistics(tablePath, partitionSpec);
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ "Unsupported operation: get partition column statistics."));
+ return catalog.getPartitionColumnStatistics(tablePath, partitionSpec);
+ }
+
+ @Override
+ public void alterTableStatistics(
+ ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ "Unsupported operation: alter table statistics."));
+ catalog.alterTableStatistics(tablePath, tableStatistics, ignoreIfNotExists);
+ }
+
+ @Override
+ public void alterTableColumnStatistics(
+ ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException, TablePartitionedException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ "Unsupported operation: alter table column statistics."));
+ catalog.alterTableColumnStatistics(tablePath, columnStatistics, ignoreIfNotExists);
+ }
+
+ @Override
+ public void alterPartitionStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogTableStatistics partitionStatistics,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ "Unsupported operation: alter partition statistics."));
+ catalog.alterPartitionStatistics(
+ tablePath, partitionSpec, partitionStatistics, ignoreIfNotExists);
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ AbstractCatalog catalog =
+ getOriginalCatalog(tablePath)
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ "Unsupported operation: alter partition column statistics."));
+ catalog.alterPartitionColumnStatistics(
+ tablePath, partitionSpec, columnStatistics, ignoreIfNotExists);
+ }
+
+ private Optional getOriginalCatalog(ObjectPath tablePath) {
+ TableFormat format = getTableFormat(tablePath);
+ return Optional.of(availableCatalogs.get(format));
+ }
+
+ private TableFormat getTableFormat(ObjectPath tablePath) {
+ AmoroTable> amoroTable =
+ unifiedCatalog.loadTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+ return amoroTable.format();
+ }
+
+ @Override
+ public String toString() {
+ return "FlinkUnifiedCatalog{"
+ + "name='"
+ + getName()
+ + '\''
+ + ", defaultDatabase='"
+ + getDefaultDatabase()
+ + '\''
+ + ", amsUri='"
+ + amsUri
+ + '\''
+ + ", amoroCatalogName='"
+ + amoroCatalogName
+ + '\''
+ + ", availableCatalogs size="
+ + availableCatalogs.size()
+ + "}";
+ }
+}
diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java
index 1aa62d25ff..1420164125 100644
--- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java
+++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java
@@ -31,6 +31,7 @@
@Internal
public class ArcticCatalogFactoryOptions {
public static final String IDENTIFIER = "arctic";
+ public static final String UNIFIED_IDENTIFIER = "unified";
public static final ConfigOption DEFAULT_DATABASE =
ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java
new file mode 100644
index 0000000000..8439bc0419
--- /dev/null
+++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.flink.catalog.factories;
+
+import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.DEFAULT_DATABASE;
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+
+import com.netease.arctic.ams.api.TableFormat;
+import com.netease.arctic.flink.catalog.FlinkUnifiedCatalog;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Factory for {@link FlinkUnifiedCatalog}. */
+public class FlinkCatalogFactory implements CatalogFactory {
+
+ private static final Set SUPPORTED_FORMATS =
+ Sets.newHashSet(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE);
+
+ @Override
+ public String factoryIdentifier() {
+ return ArcticCatalogFactoryOptions.UNIFIED_IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ Set> requiredOptions = new HashSet<>();
+ requiredOptions.add(ArcticCatalogFactoryOptions.METASTORE_URL);
+ return requiredOptions;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ final Set> options = new HashSet<>();
+ options.add(PROPERTY_VERSION);
+ options.add(DEFAULT_DATABASE);
+ return options;
+ }
+
+ @Override
+ public Catalog createCatalog(Context context) {
+ final FactoryUtil.CatalogFactoryHelper helper =
+ FactoryUtil.createCatalogFactoryHelper(this, context);
+ helper.validate();
+
+ final String defaultDatabase = helper.getOptions().get(DEFAULT_DATABASE);
+ String metastoreUrl = helper.getOptions().get(ArcticCatalogFactoryOptions.METASTORE_URL);
+
+ Map availableCatalogs = Maps.newHashMap();
+ SUPPORTED_FORMATS.forEach(
+ tableFormat -> {
+ if (!availableCatalogs.containsKey(tableFormat)) {
+ availableCatalogs.put(tableFormat, createCatalog(context, tableFormat));
+ }
+ });
+
+ return new FlinkUnifiedCatalog(
+ metastoreUrl, context.getName(), defaultDatabase, availableCatalogs);
+ }
+
+ private AbstractCatalog createCatalog(Context context, TableFormat tableFormat) {
+ CatalogFactory catalogFactory;
+
+ switch (tableFormat) {
+ case MIXED_ICEBERG:
+ case MIXED_HIVE:
+ catalogFactory = new ArcticCatalogFactory();
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported table format: [%s] in the amoro catalog." + tableFormat));
+ }
+
+ return (AbstractCatalog) catalogFactory.createCatalog(context);
+ }
+}
diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/AmoroDynamicTableFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/AmoroDynamicTableFactory.java
new file mode 100644
index 0000000000..347bfa2bc6
--- /dev/null
+++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/AmoroDynamicTableFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.flink.table;
+
+import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.UNIFIED_IDENTIFIER;
+import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT;
+
+import com.netease.arctic.ams.api.TableFormat;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * AmoroDynamicTableFactory is a factory for creating dynamic table sources and sinks. It implements
+ * both DynamicTableSourceFactory and DynamicTableSinkFactory interfaces.
+ */
+public class AmoroDynamicTableFactory
+ implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ private final Map availableCatalogs;
+
+ public AmoroDynamicTableFactory(Map availableCatalogs) {
+ this.availableCatalogs =
+ Preconditions.checkNotNull(availableCatalogs, "availableCatalogs cannot be null");
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ ObjectIdentifier identifier = context.getObjectIdentifier();
+ FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ Configuration options = (Configuration) helper.getOptions();
+ TableFormat tableFormat = options.get(TABLE_FORMAT);
+
+ return getOriginalCatalog(tableFormat)
+ .flatMap(AbstractCatalog::getFactory)
+ .filter(factory -> factory instanceof DynamicTableSinkFactory)
+ .map(factory -> ((DynamicTableSinkFactory) factory).createDynamicTableSink(context))
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ String.format(
+ "Invalid catalog or factory for table format: %s, table: %s.",
+ tableFormat, identifier)));
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ ObjectIdentifier identifier = context.getObjectIdentifier();
+ FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ Configuration options = (Configuration) helper.getOptions();
+ TableFormat tableFormat = options.get(TABLE_FORMAT);
+
+ return getOriginalCatalog(tableFormat)
+ .flatMap(AbstractCatalog::getFactory)
+ .filter(factory -> factory instanceof DynamicTableSourceFactory)
+ .map(factory -> ((DynamicTableSourceFactory) factory).createDynamicTableSource(context))
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ String.format(
+ "Invalid catalog or factory for table format: %s, table: %s.",
+ tableFormat, identifier)));
+ }
+
+ private Optional getOriginalCatalog(TableFormat format) {
+ return Optional.of(availableCatalogs.get(format));
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return UNIFIED_IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ Set> requiredOptions = Sets.newHashSet();
+ availableCatalogs.forEach(
+ (format, catalog) -> {
+ Optional factory = catalog.getFactory();
+ factory.ifPresent(value -> requiredOptions.addAll(value.requiredOptions()));
+ });
+ requiredOptions.add(TABLE_FORMAT);
+ return requiredOptions;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ Set> optionalOptions = Sets.newHashSet();
+ availableCatalogs.forEach(
+ (format, catalog) -> {
+ Optional factory = catalog.getFactory();
+ factory.ifPresent(value -> optionalOptions.addAll(value.optionalOptions()));
+ });
+ return optionalOptions;
+ }
+}
diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java
index 3dfc82b21a..63380002dc 100644
--- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java
+++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java
@@ -20,6 +20,7 @@
import static org.apache.flink.configuration.description.TextElement.text;
+import com.netease.arctic.ams.api.TableFormat;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
@@ -275,6 +276,19 @@ public class ArcticValidator extends ConnectorDescriptorValidator {
+ " of the key. Default is -1, means it is automatically determined: every shard will be at least 512KB and"
+ " number of shard bits will not exceed 6.");
+ public static final ConfigOption TABLE_FORMAT =
+ ConfigOptions.key("table.format")
+ .enumType(TableFormat.class)
+ .defaultValue(TableFormat.MIXED_ICEBERG)
+ .withDescription(
+ String.format(
+ "The format of the table, valid values are %s, %s, %s or %s, and Flink choose '%s' as default format.",
+ TableFormat.ICEBERG,
+ TableFormat.MIXED_ICEBERG,
+ TableFormat.MIXED_HIVE,
+ TableFormat.PAIMON,
+ TableFormat.MIXED_ICEBERG));
+
@Override
public void validate(DescriptorProperties properties) {
String emitMode = properties.getString(ARCTIC_EMIT_MODE.key());
diff --git a/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 93b485b78e..5081930298 100644
--- a/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -17,4 +17,5 @@
#
com.netease.arctic.flink.catalog.factories.ArcticCatalogFactory
+com.netease.arctic.flink.catalog.factories.FlinkCatalogFactory
com.netease.arctic.flink.table.DynamicTableFactory
diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java
new file mode 100644
index 0000000000..4ea04cdc05
--- /dev/null
+++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.flink.catalog;
+
+import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.METASTORE_URL;
+import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT;
+
+import com.netease.arctic.TestAms;
+import com.netease.arctic.ams.api.CatalogMeta;
+import com.netease.arctic.ams.api.TableFormat;
+import com.netease.arctic.flink.catalog.factories.FlinkCatalogFactory;
+import com.netease.arctic.hive.TestHMS;
+import com.netease.arctic.hive.catalog.HiveCatalogTestHelper;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.params.provider.Arguments;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Stream;
+
+public class FlinkCatalogContext {
+
+ static final TestHMS TEST_HMS = new TestHMS();
+ static final TestAms TEST_AMS = new TestAms();
+ static final FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();
+
+ static ResolvedSchema resolvedSchema =
+ ResolvedSchema.of(
+ Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT()));
+ static Schema schema = Schema.newBuilder().fromResolvedSchema(resolvedSchema).build();
+
+ ObjectPath objectPath = new ObjectPath("default", "test_hive_from_flink");
+
+ static Stream getFlinkCatalogAndTable() {
+ return Stream.of(
+ Arguments.of(
+ initFlinkCatalog(TableFormat.MIXED_HIVE),
+ generateFlinkTable(TableFormat.MIXED_HIVE.toString()),
+ TableFormat.MIXED_HIVE),
+ Arguments.of(
+ initFlinkCatalog(TableFormat.MIXED_ICEBERG),
+ generateFlinkTable(TableFormat.MIXED_ICEBERG.toString()),
+ TableFormat.MIXED_ICEBERG));
+ }
+
+ static ResolvedCatalogTable generateFlinkTable(String tableFormat) {
+ return new ResolvedCatalogTable(
+ CatalogTable.of(
+ schema,
+ "Flink managed table",
+ new ArrayList<>(),
+ Collections.singletonMap(TABLE_FORMAT.key(), tableFormat)),
+ resolvedSchema);
+ }
+
+ void initial() throws Exception {
+ TEST_HMS.before();
+ TEST_AMS.before();
+ }
+
+ void close() {
+ TEST_AMS.after();
+ TEST_HMS.after();
+ }
+
+ static FlinkUnifiedCatalog initFlinkCatalog(TableFormat tableFormat) {
+ FlinkUnifiedCatalog flinkUnifiedCatalog;
+ Map factoryOptions = Maps.newHashMap();
+ CatalogMeta meta =
+ HiveCatalogTestHelper.build(TEST_HMS.getHiveConf(), tableFormat)
+ .buildCatalogMeta(TEST_HMS.getWareHouseLocation());
+ meta.setCatalogName(tableFormat.name().toLowerCase());
+
+ TEST_AMS.getAmsHandler().dropCatalog(meta.getCatalogName());
+ TEST_AMS.getAmsHandler().createCatalog(meta);
+
+ factoryOptions.put(METASTORE_URL.key(), TEST_AMS.getServerUrl() + "/" + meta.getCatalogName());
+ final FactoryUtil.DefaultCatalogContext context =
+ new FactoryUtil.DefaultCatalogContext(
+ "flink_catalog_name",
+ factoryOptions,
+ new Configuration(),
+ FlinkCatalogContext.class.getClassLoader());
+ flinkUnifiedCatalog = (FlinkUnifiedCatalog) flinkCatalogFactory.createCatalog(context);
+ flinkUnifiedCatalog.open();
+ return flinkUnifiedCatalog;
+ }
+
+ HiveMetaStoreClient getHMSClient() {
+ return TEST_HMS.getHiveClient();
+ }
+}
diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalogITCase.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalogITCase.java
new file mode 100644
index 0000000000..977f1c6a10
--- /dev/null
+++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalogITCase.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netease.arctic.flink.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.netease.arctic.BasicTableTestHelper;
+import com.netease.arctic.ams.api.TableFormat;
+import com.netease.arctic.catalog.CatalogTestHelper;
+import com.netease.arctic.flink.table.CatalogITCaseBase;
+import com.netease.arctic.hive.TestHMS;
+import com.netease.arctic.hive.catalog.HiveCatalogTestHelper;
+import com.netease.arctic.table.TableIdentifier;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.types.Row;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(value = Parameterized.class)
+public class FlinkUnifiedCatalogITCase extends CatalogITCaseBase {
+ static final TestHMS TEST_HMS = new TestHMS();
+ AbstractCatalog flinkCatalog;
+ TableIdentifier identifier;
+
+ public FlinkUnifiedCatalogITCase(CatalogTestHelper catalogTestHelper) {
+ super(catalogTestHelper, new BasicTableTestHelper(true, false));
+ }
+
+ @Parameterized.Parameters(name = "catalogTestHelper = {0}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ {new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf())},
+ {new HiveCatalogTestHelper(TableFormat.MIXED_ICEBERG, TEST_HMS.getHiveConf())}
+ };
+ }
+
+ @BeforeClass
+ public static void beforeAll() throws Exception {
+ TEST_HMS.before();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ String catalog = "amoro";
+ exec("CREATE CATALOG %s WITH ('type'='amoro', 'metastore.url'='%s')", catalog, getCatalogUrl());
+ exec("USE CATALOG %s", catalog);
+ exec("USE %s", tableTestHelper().id().getDatabase());
+ Optional catalogOptional = getTableEnv().getCatalog(catalog);
+ assertTrue(catalogOptional.isPresent());
+ flinkCatalog = (AbstractCatalog) catalogOptional.get();
+ assertEquals(catalog, flinkCatalog.getName());
+ identifier = tableTestHelper().id();
+ }
+
+ @After
+ public void teardown() {
+ TEST_HMS.after();
+ if (flinkCatalog != null) {
+ flinkCatalog.close();
+ }
+ }
+
+ @Test
+ public void testTableExists() throws TableNotExistException {
+ CatalogBaseTable catalogBaseTable =
+ flinkCatalog.getTable(new ObjectPath(identifier.getDatabase(), identifier.getTableName()));
+ assertNotNull(catalogBaseTable);
+ assertEquals(
+ tableTestHelper().tableSchema().columns().size(),
+ catalogBaseTable.getUnresolvedSchema().getColumns().size());
+ }
+
+ @Test
+ public void testInsertAndQuery() throws Exception {
+ exec(
+ "INSERT INTO %s SELECT 1, 'Lily', 1234567890, TO_TIMESTAMP('2020-01-01 01:02:03')",
+ identifier.getTableName());
+ TableResult tableResult =
+ exec("select * from %s /*+OPTIONS('monitor-interval'='1s')*/ ", identifier.getTableName());
+
+ tableResult.await(30, TimeUnit.SECONDS);
+
+ Row actualRow = tableResult.collect().next();
+ assertEquals(
+ Row.of(1, "Lily", 1234567890L, "2020-01-01T01:02:03").toString(), actualRow.toString());
+ }
+}
diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java
new file mode 100644
index 0000000000..95496ba9a0
--- /dev/null
+++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java
@@ -0,0 +1,145 @@
+package com.netease.arctic.flink.catalog;
+
+import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.netease.arctic.ams.api.TableFormat;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.thrift.TException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+class TestFlinkCatalogs {
+ static FlinkCatalogContext flinkCatalogContext = new FlinkCatalogContext();
+
+ @BeforeAll
+ public static void setupCatalogMeta() throws Exception {
+ flinkCatalogContext.initial();
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ flinkCatalogContext.close();
+ }
+
+ @ParameterizedTest
+ @MethodSource("com.netease.arctic.flink.catalog.FlinkCatalogContext#getFlinkCatalogAndTable")
+ void testListDatabases(FlinkUnifiedCatalog flinkUnifiedCatalog) throws TException {
+ List expects = flinkCatalogContext.getHMSClient().getAllDatabases();
+ assertEquals(expects, flinkUnifiedCatalog.listDatabases());
+ }
+
+ @ParameterizedTest
+ @MethodSource("com.netease.arctic.flink.catalog.FlinkCatalogContext#getFlinkCatalogAndTable")
+ void testDatabaseExists(FlinkUnifiedCatalog flinkUnifiedCatalog) {
+ assertTrue(flinkUnifiedCatalog.databaseExists("default"));
+ assertFalse(flinkUnifiedCatalog.databaseExists("not_exists_db"));
+ }
+
+ @ParameterizedTest
+ @MethodSource("com.netease.arctic.flink.catalog.FlinkCatalogContext#getFlinkCatalogAndTable")
+ void testCreateAndDropDatabase(FlinkUnifiedCatalog flinkUnifiedCatalog)
+ throws DatabaseAlreadyExistException, DatabaseNotEmptyException, DatabaseNotExistException {
+ flinkUnifiedCatalog.createDatabase(
+ "test", new CatalogDatabaseImpl(Collections.emptyMap(), "test"), false);
+ assertTrue(flinkUnifiedCatalog.databaseExists("test"));
+
+ flinkUnifiedCatalog.dropDatabase("test", false);
+ assertFalse(flinkUnifiedCatalog.databaseExists("test"));
+ }
+
+ @ParameterizedTest
+ @MethodSource("com.netease.arctic.flink.catalog.FlinkCatalogContext#getFlinkCatalogAndTable")
+ void testAlterDatabase(
+ FlinkUnifiedCatalog flinkUnifiedCatalog, CatalogTable table, TableFormat tableFormat)
+ throws DatabaseNotExistException {
+ try {
+ flinkUnifiedCatalog.alterDatabase(
+ "default", new CatalogDatabaseImpl(Collections.emptyMap(), "default"), false);
+ } catch (UnsupportedOperationException e) {
+ // Mixed-format catalog does not support altering database.
+ if (tableFormat != TableFormat.MIXED_HIVE && tableFormat != TableFormat.MIXED_ICEBERG) {
+ throw e;
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("com.netease.arctic.flink.catalog.FlinkCatalogContext#getFlinkCatalogAndTable")
+ void testCreateGetAndDropTable(
+ FlinkUnifiedCatalog flinkUnifiedCatalog, CatalogTable table, TableFormat tableFormat)
+ throws TableAlreadyExistException, DatabaseNotExistException, TableNotExistException {
+ ObjectPath objectPath = flinkCatalogContext.objectPath;
+
+ flinkUnifiedCatalog.createTable(flinkCatalogContext.objectPath, table, false);
+ assertTrue(flinkUnifiedCatalog.tableExists(objectPath));
+
+ CatalogBaseTable actualTable = flinkUnifiedCatalog.getTable(objectPath);
+ assertEquals(table.getUnresolvedSchema(), actualTable.getUnresolvedSchema());
+ assertEquals(tableFormat.toString(), actualTable.getOptions().get(TABLE_FORMAT.key()));
+
+ flinkUnifiedCatalog.dropTable(objectPath, false);
+ assertFalse(flinkUnifiedCatalog.tableExists(objectPath));
+ }
+
+ @ParameterizedTest
+ @MethodSource("com.netease.arctic.flink.catalog.FlinkCatalogContext#getFlinkCatalogAndTable")
+ void testAlterTable(
+ FlinkUnifiedCatalog flinkUnifiedCatalog, CatalogTable table, TableFormat tableFormat)
+ throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException {
+ try {
+ flinkUnifiedCatalog.createTable(flinkCatalogContext.objectPath, table, true);
+
+ ResolvedSchema newResolvedSchema =
+ ResolvedSchema.of(
+ Column.physical("name", DataTypes.STRING()),
+ Column.physical("age", DataTypes.INT()),
+ Column.physical("address", DataTypes.STRING()));
+ String comment = "Flink new Table";
+ Map newProperties = Maps.newHashMap();
+ newProperties.put("new_key", "new_value");
+
+ CatalogBaseTable newTable =
+ new ResolvedCatalogTable(
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build(),
+ comment,
+ new ArrayList<>(),
+ newProperties),
+ newResolvedSchema);
+ try {
+ flinkUnifiedCatalog.alterTable(flinkCatalogContext.objectPath, newTable, false);
+ } catch (UnsupportedOperationException e) {
+ // https://github.com/NetEase/amoro/issues/2 altering Mixed format table is not supported.
+ if (tableFormat != TableFormat.MIXED_HIVE && tableFormat != TableFormat.MIXED_ICEBERG) {
+ throw e;
+ }
+ }
+ } finally {
+ flinkUnifiedCatalog.dropTable(flinkCatalogContext.objectPath, true);
+ }
+ }
+}