From 0d70636cda963c09653d02d8823ea5d0f1858602 Mon Sep 17 00:00:00 2001 From: Xianxun Ye Date: Wed, 13 Dec 2023 14:36:48 +0800 Subject: [PATCH 1/5] [AMORO-2344] [Flink]: Support UnifiedCatalog to contain Iceberg format table in Flink Engine --- mixed/flink/flink-common/pom.xml | 6 ++ .../flink/catalog/FlinkUnifiedCatalog.java | 18 ++++-- .../factories/ArcticCatalogFactory.java | 14 ++++- .../ArcticCatalogFactoryOptions.java | 7 +++ .../factories/FlinkCatalogFactory.java | 56 +++++++++++++++++-- .../iceberg/IcebergFlinkCatalogFactory.java | 39 +++++++++++++ .../flink/catalog/FlinkCatalogContext.java | 8 ++- .../catalog/FlinkUnifiedCatalogITCase.java | 9 ++- .../flink/catalog/TestFlinkCatalogs.java | 8 ++- 9 files changed, 143 insertions(+), 22 deletions(-) create mode 100644 mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/iceberg/IcebergFlinkCatalogFactory.java diff --git a/mixed/flink/flink-common/pom.xml b/mixed/flink/flink-common/pom.xml index ccc0a547cc..5fa0a4ac4f 100644 --- a/mixed/flink/flink-common/pom.xml +++ b/mixed/flink/flink-common/pom.xml @@ -143,6 +143,12 @@ + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + provided + diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalog.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalog.java index c2db33ab5b..1a126455b0 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalog.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalog.java @@ -26,7 +26,6 @@ import com.netease.arctic.NoSuchDatabaseException; import com.netease.arctic.NoSuchTableException; import com.netease.arctic.UnifiedCatalog; -import com.netease.arctic.UnifiedCatalogLoader; import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.ams.api.client.ArcticThriftUrl; import com.netease.arctic.flink.table.AmoroDynamicTableFactory; @@ -37,6 +36,7 @@ import org.apache.flink.table.catalog.CatalogFunction; import org.apache.flink.table.catalog.CatalogPartition; import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; @@ -76,17 +76,17 @@ public FlinkUnifiedCatalog( String amsUri, String name, String defaultDatabase, + UnifiedCatalog unifiedCatalog, Map availableCatalogs) { super(name, defaultDatabase); this.amsUri = amsUri; this.amoroCatalogName = ArcticThriftUrl.parse(amsUri, THRIFT_TABLE_SERVICE_NAME).catalogName(); + this.unifiedCatalog = unifiedCatalog; this.availableCatalogs = availableCatalogs; } @Override public void open() throws CatalogException { - unifiedCatalog = - UnifiedCatalogLoader.loadUnifiedCatalog(amsUri, amoroCatalogName, Maps.newHashMap()); availableCatalogs.forEach((tableFormat, catalog) -> catalog.open()); } @@ -165,9 +165,15 @@ public CatalogBaseTable getTable(ObjectPath tablePath) "Unsupported operation: get table [%s], %s: %s.", tablePath, TABLE_FORMAT.key(), amoroTable.format())); } - CatalogBaseTable catalogBaseTable = catalog.getTable(tablePath); - catalogBaseTable.getOptions().put(TABLE_FORMAT.key(), amoroTable.format().toString()); - return catalogBaseTable; + CatalogTable catalogTable = (CatalogTable) catalog.getTable(tablePath); + final Map flinkProperties = Maps.newHashMap(catalogTable.getOptions()); + flinkProperties.put(TABLE_FORMAT.key(), amoroTable.format().toString()); + + return CatalogTable.of( + catalogTable.getUnresolvedSchema(), + catalogTable.getComment(), + catalogTable.getPartitionKeys(), + flinkProperties); } @Override diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactory.java index 38482e4b9f..d50e237920 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactory.java @@ -19,6 +19,7 @@ package com.netease.arctic.flink.catalog.factories; import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.DEFAULT_DATABASE; +import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.FLINK_TABLE_FORMATS; import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.METASTORE_URL; import static com.netease.arctic.flink.table.KafkaConnectorOptionsUtil.getKafkaParams; import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; @@ -29,12 +30,14 @@ import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; /** Factory for {@link ArcticCatalog} */ @@ -57,13 +60,16 @@ public Catalog createCatalog(Context context) { final String defaultDatabase = helper.getOptions().get(DEFAULT_DATABASE); String metastoreUrl = helper.getOptions().get(METASTORE_URL); final Map arcticCatalogProperties = getKafkaParams(context.getOptions()); + final Map catalogProperties = Maps.newHashMap(arcticCatalogProperties); + + Optional tableFormatsOptional = helper.getOptions().getOptional(FLINK_TABLE_FORMATS); + tableFormatsOptional.ifPresent( + tableFormats -> catalogProperties.put(FLINK_TABLE_FORMATS.key(), tableFormats)); return new ArcticCatalog( context.getName(), defaultDatabase, - InternalCatalogBuilder.builder() - .metastoreUrl(metastoreUrl) - .properties(arcticCatalogProperties)); + InternalCatalogBuilder.builder().metastoreUrl(metastoreUrl).properties(catalogProperties)); } @Override @@ -87,6 +93,8 @@ public Set> optionalOptions() { options.add(ArcticCatalogFactoryOptions.KRB5_CONF_ENCODE); options.add(ArcticCatalogFactoryOptions.KEYTAB_PATH); options.add(ArcticCatalogFactoryOptions.KEYTAB_ENCODE); + + options.add(ArcticCatalogFactoryOptions.FLINK_TABLE_FORMATS); return options; } } diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java index cc83285afc..17652bb763 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java @@ -18,6 +18,7 @@ package com.netease.arctic.flink.catalog.factories; +import static com.netease.arctic.ams.api.properties.CatalogMetaProperties.TABLE_FORMATS; import static com.netease.arctic.flink.catalog.ArcticCatalog.DEFAULT_DB; import com.netease.arctic.ams.api.properties.CatalogMetaProperties; @@ -81,4 +82,10 @@ public class ArcticCatalogFactoryOptions { PROPERTIES_PREFIX + "." + CatalogMetaProperties.AUTH_CONFIGS_KEY_KEYTAB_ENCODE) .stringType() .noDefaultValue(); + + public static final ConfigOption FLINK_TABLE_FORMATS = + ConfigOptions.key(TABLE_FORMATS) + .stringType() + .noDefaultValue() + .withDescription("This illustrates the table format contained in the catalog."); } diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java index 8439bc0419..7e776f484e 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java @@ -18,16 +18,28 @@ package com.netease.arctic.flink.catalog.factories; +import static com.netease.arctic.ams.api.Constants.THRIFT_TABLE_SERVICE_NAME; +import static com.netease.arctic.ams.api.properties.CatalogMetaProperties.TABLE_FORMATS; import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.DEFAULT_DATABASE; +import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.FLINK_TABLE_FORMATS; import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; +import com.netease.arctic.AmsClient; +import com.netease.arctic.PooledAmsClient; +import com.netease.arctic.UnifiedCatalog; +import com.netease.arctic.UnifiedCatalogLoader; +import com.netease.arctic.ams.api.CatalogMeta; import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.ams.api.client.ArcticThriftUrl; import com.netease.arctic.flink.catalog.FlinkUnifiedCatalog; +import com.netease.arctic.flink.catalog.factories.iceberg.IcebergFlinkCatalogFactory; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -39,7 +51,7 @@ public class FlinkCatalogFactory implements CatalogFactory { private static final Set SUPPORTED_FORMATS = - Sets.newHashSet(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE); + Sets.newHashSet(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, TableFormat.ICEBERG); @Override public String factoryIdentifier() { @@ -70,19 +82,42 @@ public Catalog createCatalog(Context context) { final String defaultDatabase = helper.getOptions().get(DEFAULT_DATABASE); String metastoreUrl = helper.getOptions().get(ArcticCatalogFactoryOptions.METASTORE_URL); + String amoroCatalogName = + ArcticThriftUrl.parse(metastoreUrl, THRIFT_TABLE_SERVICE_NAME).catalogName(); + UnifiedCatalog unifiedCatalog = + UnifiedCatalogLoader.loadUnifiedCatalog(metastoreUrl, amoroCatalogName, Maps.newHashMap()); + Configuration hadoopConf = unifiedCatalog.authenticationContext().getConfiguration(); + + AmsClient client = new PooledAmsClient(metastoreUrl); + TableFormat catalogTableFormat; + try { + CatalogMeta catalogMeta = client.getCatalog(amoroCatalogName); + catalogTableFormat = + TableFormat.valueOf(catalogMeta.getCatalogProperties().get(TABLE_FORMATS)); + } catch (Exception e) { + throw new IllegalStateException("failed when load catalog " + amoroCatalogName, e); + } + Map availableCatalogs = Maps.newHashMap(); SUPPORTED_FORMATS.forEach( tableFormat -> { if (!availableCatalogs.containsKey(tableFormat)) { - availableCatalogs.put(tableFormat, createCatalog(context, tableFormat)); + if (catalogTableFormat == TableFormat.ICEBERG + && (tableFormat.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG))) { + context + .getOptions() + .put(FLINK_TABLE_FORMATS.key(), TableFormat.MIXED_HIVE.toString()); + } + availableCatalogs.put(tableFormat, createCatalog(context, tableFormat, hadoopConf)); } }); return new FlinkUnifiedCatalog( - metastoreUrl, context.getName(), defaultDatabase, availableCatalogs); + metastoreUrl, context.getName(), defaultDatabase, unifiedCatalog, availableCatalogs); } - private AbstractCatalog createCatalog(Context context, TableFormat tableFormat) { + private AbstractCatalog createCatalog( + Context context, TableFormat tableFormat, Configuration hadoopConf) { CatalogFactory catalogFactory; switch (tableFormat) { @@ -90,11 +125,22 @@ private AbstractCatalog createCatalog(Context context, TableFormat tableFormat) case MIXED_HIVE: catalogFactory = new ArcticCatalogFactory(); break; + case ICEBERG: + catalogFactory = new IcebergFlinkCatalogFactory(hadoopConf); + break; default: throw new UnsupportedOperationException( String.format("Unsupported table format: [%s] in the amoro catalog." + tableFormat)); } - return (AbstractCatalog) catalogFactory.createCatalog(context); + try { + return (AbstractCatalog) catalogFactory.createCatalog(context); + } catch (CatalogException e) { + if (e.getMessage().contains("must implement createCatalog(Context)")) { + return (AbstractCatalog) + catalogFactory.createCatalog(context.getName(), context.getOptions()); + } + throw e; + } } } diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/iceberg/IcebergFlinkCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/iceberg/IcebergFlinkCatalogFactory.java new file mode 100644 index 0000000000..e5144501a5 --- /dev/null +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/iceberg/IcebergFlinkCatalogFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.catalog.factories.iceberg; + +import org.apache.flink.table.catalog.Catalog; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.flink.FlinkCatalogFactory; + +import java.util.Map; + +/** Creating Iceberg Catalog by the hadoop configuration which stored in the AMS. */ +public class IcebergFlinkCatalogFactory extends FlinkCatalogFactory { + private final Configuration hadoopConf; + + public IcebergFlinkCatalogFactory(Configuration hadoopConf) { + this.hadoopConf = hadoopConf; + } + + @Override + public Catalog createCatalog(String name, Map properties) { + return super.createCatalog(name, properties, hadoopConf); + } +} diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java index 4ea04cdc05..ed619add55 100644 --- a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java @@ -67,7 +67,11 @@ static Stream getFlinkCatalogAndTable() { Arguments.of( initFlinkCatalog(TableFormat.MIXED_ICEBERG), generateFlinkTable(TableFormat.MIXED_ICEBERG.toString()), - TableFormat.MIXED_ICEBERG)); + TableFormat.MIXED_ICEBERG), + Arguments.of( + initFlinkCatalog(TableFormat.ICEBERG), + generateFlinkTable(TableFormat.ICEBERG.toString()), + TableFormat.ICEBERG)); } static ResolvedCatalogTable generateFlinkTable(String tableFormat) { @@ -104,7 +108,7 @@ static FlinkUnifiedCatalog initFlinkCatalog(TableFormat tableFormat) { factoryOptions.put(METASTORE_URL.key(), TEST_AMS.getServerUrl() + "/" + meta.getCatalogName()); final FactoryUtil.DefaultCatalogContext context = new FactoryUtil.DefaultCatalogContext( - "flink_catalog_name", + "FLINK_" + tableFormat, factoryOptions, new Configuration(), FlinkCatalogContext.class.getClassLoader()); diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalogITCase.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalogITCase.java index 977f1c6a10..bf0ea8b8b8 100644 --- a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalogITCase.java +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalogITCase.java @@ -60,7 +60,8 @@ public FlinkUnifiedCatalogITCase(CatalogTestHelper catalogTestHelper) { public static Object[][] parameters() { return new Object[][] { {new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf())}, - {new HiveCatalogTestHelper(TableFormat.MIXED_ICEBERG, TEST_HMS.getHiveConf())} + {new HiveCatalogTestHelper(TableFormat.MIXED_ICEBERG, TEST_HMS.getHiveConf())}, + {new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf())} }; } @@ -71,8 +72,10 @@ public static void beforeAll() throws Exception { @Before public void setup() throws Exception { - String catalog = "amoro"; - exec("CREATE CATALOG %s WITH ('type'='amoro', 'metastore.url'='%s')", catalog, getCatalogUrl()); + String catalog = "unified_catalog"; + exec( + "CREATE CATALOG %s WITH ('type'='unified', 'metastore.url'='%s')", + catalog, getCatalogUrl()); exec("USE CATALOG %s", catalog); exec("USE %s", tableTestHelper().id().getDatabase()); Optional catalogOptional = getTableEnv().getCatalog(catalog); diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java index 95496ba9a0..521ee03158 100644 --- a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java @@ -80,8 +80,8 @@ void testAlterDatabase( flinkUnifiedCatalog.alterDatabase( "default", new CatalogDatabaseImpl(Collections.emptyMap(), "default"), false); } catch (UnsupportedOperationException e) { - // Mixed-format catalog does not support altering database. - if (tableFormat != TableFormat.MIXED_HIVE && tableFormat != TableFormat.MIXED_ICEBERG) { + // Mixed-format and Iceebrg catalog does not support altering database. + if (!tableFormat.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG, TableFormat.ICEBERG)) { throw e; } } @@ -134,7 +134,9 @@ void testAlterTable( flinkUnifiedCatalog.alterTable(flinkCatalogContext.objectPath, newTable, false); } catch (UnsupportedOperationException e) { // https://github.com/NetEase/amoro/issues/2 altering Mixed format table is not supported. - if (tableFormat != TableFormat.MIXED_HIVE && tableFormat != TableFormat.MIXED_ICEBERG) { + // Altering Iceberg schema is also not supported yet. + if (!tableFormat.in( + TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, TableFormat.ICEBERG)) { throw e; } } From b1f7d70d2325cf04c09c9b763a9304b45f7f3007 Mon Sep 17 00:00:00 2001 From: Xianxun Ye Date: Wed, 13 Dec 2023 16:52:54 +0800 Subject: [PATCH 2/5] fixed baiyangtx's comments. --- .../factories/FlinkCatalogFactory.java | 21 ++++++------------- .../factories/MixedHiveCatalogFactory.java | 9 ++++---- .../factories/MixedIcebergCatalogFactory.java | 9 ++++---- .../{ => mixed}/ArcticCatalogFactory.java | 3 ++- .../org.apache.flink.table.factories.Factory | 2 +- .../org.apache.flink.table.factories.Factory | 2 +- 6 files changed, 20 insertions(+), 26 deletions(-) rename mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/{ => mixed}/ArcticCatalogFactory.java (96%) diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java index 7e776f484e..421ffa79c6 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java @@ -24,15 +24,13 @@ import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.FLINK_TABLE_FORMATS; import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; -import com.netease.arctic.AmsClient; -import com.netease.arctic.PooledAmsClient; import com.netease.arctic.UnifiedCatalog; import com.netease.arctic.UnifiedCatalogLoader; -import com.netease.arctic.ams.api.CatalogMeta; import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.ams.api.client.ArcticThriftUrl; import com.netease.arctic.flink.catalog.FlinkUnifiedCatalog; import com.netease.arctic.flink.catalog.factories.iceberg.IcebergFlinkCatalogFactory; +import com.netease.arctic.flink.catalog.factories.mixed.ArcticCatalogFactory; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.Catalog; @@ -88,15 +86,8 @@ public Catalog createCatalog(Context context) { UnifiedCatalogLoader.loadUnifiedCatalog(metastoreUrl, amoroCatalogName, Maps.newHashMap()); Configuration hadoopConf = unifiedCatalog.authenticationContext().getConfiguration(); - AmsClient client = new PooledAmsClient(metastoreUrl); - TableFormat catalogTableFormat; - try { - CatalogMeta catalogMeta = client.getCatalog(amoroCatalogName); - catalogTableFormat = - TableFormat.valueOf(catalogMeta.getCatalogProperties().get(TABLE_FORMATS)); - } catch (Exception e) { - throw new IllegalStateException("failed when load catalog " + amoroCatalogName, e); - } + TableFormat catalogTableFormat = + TableFormat.valueOf(unifiedCatalog.properties().get(TABLE_FORMATS)); Map availableCatalogs = Maps.newHashMap(); SUPPORTED_FORMATS.forEach( @@ -104,9 +95,9 @@ public Catalog createCatalog(Context context) { if (!availableCatalogs.containsKey(tableFormat)) { if (catalogTableFormat == TableFormat.ICEBERG && (tableFormat.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG))) { - context - .getOptions() - .put(FLINK_TABLE_FORMATS.key(), TableFormat.MIXED_HIVE.toString()); + // Mixed catalog couldn't load the iceberg table, so specify the table formats to the + // mixed catalog + context.getOptions().put(FLINK_TABLE_FORMATS.key(), tableFormat.toString()); } availableCatalogs.put(tableFormat, createCatalog(context, tableFormat, hadoopConf)); } diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java index f7c261e6bf..9ef1ca9de6 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java @@ -19,6 +19,7 @@ package com.netease.arctic.flink.catalog.factories; import com.netease.arctic.flink.catalog.ArcticCatalog; +import com.netease.arctic.flink.catalog.factories.mixed.ArcticCatalogFactory; /** * The factory to create {@link ArcticCatalog} with {@link @@ -26,8 +27,8 @@ */ public class MixedHiveCatalogFactory extends ArcticCatalogFactory { - @Override - public String factoryIdentifier() { - return ArcticCatalogFactoryOptions.MIXED_HIVE_IDENTIFIER; - } + @Override + public String factoryIdentifier() { + return ArcticCatalogFactoryOptions.MIXED_HIVE_IDENTIFIER; + } } diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java index 8730697130..b41949a4c0 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java @@ -19,6 +19,7 @@ package com.netease.arctic.flink.catalog.factories; import com.netease.arctic.flink.catalog.ArcticCatalog; +import com.netease.arctic.flink.catalog.factories.mixed.ArcticCatalogFactory; /** * The factory to create {@link ArcticCatalog} with {@link @@ -26,8 +27,8 @@ */ public class MixedIcebergCatalogFactory extends ArcticCatalogFactory { - @Override - public String factoryIdentifier() { - return ArcticCatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER; - } + @Override + public String factoryIdentifier() { + return ArcticCatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER; + } } diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/ArcticCatalogFactory.java similarity index 96% rename from mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactory.java rename to mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/ArcticCatalogFactory.java index d50e237920..e5edfe4326 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/ArcticCatalogFactory.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.netease.arctic.flink.catalog.factories; +package com.netease.arctic.flink.catalog.factories.mixed; import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.DEFAULT_DATABASE; import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.FLINK_TABLE_FORMATS; @@ -26,6 +26,7 @@ import com.netease.arctic.flink.InternalCatalogBuilder; import com.netease.arctic.flink.catalog.ArcticCatalog; +import com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.factories.CatalogFactory; diff --git a/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index adad559681..ee0ca4f4fe 100644 --- a/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -18,6 +18,6 @@ com.netease.arctic.flink.catalog.factories.MixedIcebergCatalogFactory com.netease.arctic.flink.catalog.factories.MixedHiveCatalogFactory -com.netease.arctic.flink.catalog.factories.ArcticCatalogFactory +com.netease.arctic.flink.catalog.factories.mixed.ArcticCatalogFactory com.netease.arctic.flink.catalog.factories.FlinkCatalogFactory com.netease.arctic.flink.table.DynamicTableFactory diff --git a/mixed/flink/flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/mixed/flink/flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 6bbec7e84e..896b146abb 100644 --- a/mixed/flink/flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/mixed/flink/flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -16,5 +16,5 @@ # limitations under the License. # -com.netease.arctic.flink.catalog.factories.ArcticCatalogFactory +com.netease.arctic.flink.catalog.factories.mixed.ArcticCatalogFactory com.netease.arctic.flink.table.DynamicTableFactory \ No newline at end of file From f9f0aaa24fdbebb09d6070941127d6fca363b0ff Mon Sep 17 00:00:00 2001 From: Xianxun Ye Date: Thu, 14 Dec 2023 15:18:26 +0800 Subject: [PATCH 3/5] fixed baiyangtx's comments. Rename classes. --- .../{ArcticCatalog.java => MixedCatalog.java} | 13 +++--- ...ptions.java => CatalogFactoryOptions.java} | 9 +++-- ...y.java => FlinkUnifiedCatalogFactory.java} | 16 ++++---- ...gFactory.java => MixedCatalogFactory.java} | 40 +++++++++---------- .../{ => mixed}/MixedHiveCatalogFactory.java | 20 +++++----- .../MixedIcebergCatalogFactory.java | 20 +++++----- .../flink/table/AmoroDynamicTableFactory.java | 2 +- .../arctic/flink/table/ArcticTableLoader.java | 2 +- .../flink/table/DynamicTableFactory.java | 10 ++--- .../org.apache.flink.table.factories.Factory | 8 ++-- .../netease/arctic/flink/FlinkTestBase.java | 6 +-- .../flink/catalog/FlinkCatalogContext.java | 10 +++-- .../arctic/flink/catalog/TestCatalog.java | 16 ++++---- ...ogs.java => TestFlinkUnifiedCatalogs.java} | 2 +- ...a => TestMixedCatalogTablePartitions.java} | 38 +++++++++--------- .../org.apache.flink.table.factories.Factory | 2 +- 16 files changed, 108 insertions(+), 106 deletions(-) rename mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/{ArcticCatalog.java => MixedCatalog.java} (98%) rename mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/{ArcticCatalogFactoryOptions.java => CatalogFactoryOptions.java} (92%) rename mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/{FlinkCatalogFactory.java => FlinkUnifiedCatalogFactory.java} (89%) rename mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/{ArcticCatalogFactory.java => MixedCatalogFactory.java} (69%) rename mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/{ => mixed}/MixedHiveCatalogFactory.java (61%) rename mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/{ => mixed}/MixedIcebergCatalogFactory.java (61%) rename mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/{TestFlinkCatalogs.java => TestFlinkUnifiedCatalogs.java} (99%) rename mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/{TestArcticCatalogTablePartitions.java => TestMixedCatalogTablePartitions.java} (85%) diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/ArcticCatalog.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/MixedCatalog.java similarity index 98% rename from mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/ArcticCatalog.java rename to mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/MixedCatalog.java index e60d9d4d6a..ed626abe7e 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/ArcticCatalog.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/MixedCatalog.java @@ -27,7 +27,7 @@ import com.google.common.base.Objects; import com.netease.arctic.NoSuchDatabaseException; import com.netease.arctic.flink.InternalCatalogBuilder; -import com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions; +import com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions; import com.netease.arctic.flink.table.DynamicTableFactory; import com.netease.arctic.flink.table.descriptors.ArcticValidator; import com.netease.arctic.flink.util.ArcticUtils; @@ -92,8 +92,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -/** Catalogs for arctic data lake. */ -public class ArcticCatalog extends AbstractCatalog { +/** Catalogs for mixed table format(include mixed-iceberg and mixed-hive). */ +public class MixedCatalog extends AbstractCatalog { public static final String DEFAULT_DB = "default"; /** @@ -106,12 +106,12 @@ public class ArcticCatalog extends AbstractCatalog { private com.netease.arctic.catalog.ArcticCatalog internalCatalog; - public ArcticCatalog(String name, String defaultDatabase, InternalCatalogBuilder catalogBuilder) { + public MixedCatalog(String name, String defaultDatabase, InternalCatalogBuilder catalogBuilder) { super(name, defaultDatabase); this.catalogBuilder = catalogBuilder; } - public ArcticCatalog(ArcticCatalog copy) { + public MixedCatalog(MixedCatalog copy) { this(copy.getName(), copy.getDefaultDatabase(), copy.catalogBuilder); } @@ -229,8 +229,7 @@ private void fillTableMetaPropertiesIfLookupLike( properties.put(ArcticValidator.ARCTIC_CATALOG.key(), tableIdentifier.getCatalog()); properties.put(ArcticValidator.ARCTIC_TABLE.key(), tableIdentifier.getTableName()); properties.put(ArcticValidator.ARCTIC_DATABASE.key(), tableIdentifier.getDatabase()); - properties.put( - ArcticCatalogFactoryOptions.METASTORE_URL.key(), catalogBuilder.getMetastoreUrl()); + properties.put(CatalogFactoryOptions.METASTORE_URL.key(), catalogBuilder.getMetastoreUrl()); } private static List toPartitionKeys(PartitionSpec spec, Schema icebergSchema) { diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/CatalogFactoryOptions.java similarity index 92% rename from mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java rename to mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/CatalogFactoryOptions.java index 17652bb763..5e9608f7a9 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/CatalogFactoryOptions.java @@ -19,18 +19,19 @@ package com.netease.arctic.flink.catalog.factories; import static com.netease.arctic.ams.api.properties.CatalogMetaProperties.TABLE_FORMATS; -import static com.netease.arctic.flink.catalog.ArcticCatalog.DEFAULT_DB; +import static com.netease.arctic.flink.catalog.MixedCatalog.DEFAULT_DB; import com.netease.arctic.ams.api.properties.CatalogMetaProperties; -import com.netease.arctic.flink.catalog.ArcticCatalog; +import com.netease.arctic.flink.catalog.FlinkUnifiedCatalog; +import com.netease.arctic.flink.catalog.MixedCatalog; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.table.catalog.CommonCatalogOptions; -/** {@link ConfigOption}s for {@link ArcticCatalog}. */ +/** {@link ConfigOption}s for {@link MixedCatalog} and {@link FlinkUnifiedCatalog}. */ @Internal -public class ArcticCatalogFactoryOptions { +public class CatalogFactoryOptions { public static final String MIXED_ICEBERG_IDENTIFIER = "mixed_iceberg"; public static final String MIXED_HIVE_IDENTIFIER = "mixed_hive"; @Deprecated public static final String IDENTIFIER = "arctic"; diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkUnifiedCatalogFactory.java similarity index 89% rename from mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java rename to mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkUnifiedCatalogFactory.java index 421ffa79c6..d892424fa3 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkUnifiedCatalogFactory.java @@ -20,8 +20,8 @@ import static com.netease.arctic.ams.api.Constants.THRIFT_TABLE_SERVICE_NAME; import static com.netease.arctic.ams.api.properties.CatalogMetaProperties.TABLE_FORMATS; -import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.DEFAULT_DATABASE; -import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.FLINK_TABLE_FORMATS; +import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.DEFAULT_DATABASE; +import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.FLINK_TABLE_FORMATS; import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; import com.netease.arctic.UnifiedCatalog; @@ -30,7 +30,7 @@ import com.netease.arctic.ams.api.client.ArcticThriftUrl; import com.netease.arctic.flink.catalog.FlinkUnifiedCatalog; import com.netease.arctic.flink.catalog.factories.iceberg.IcebergFlinkCatalogFactory; -import com.netease.arctic.flink.catalog.factories.mixed.ArcticCatalogFactory; +import com.netease.arctic.flink.catalog.factories.mixed.MixedCatalogFactory; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.Catalog; @@ -46,20 +46,20 @@ import java.util.Set; /** Factory for {@link FlinkUnifiedCatalog}. */ -public class FlinkCatalogFactory implements CatalogFactory { +public class FlinkUnifiedCatalogFactory implements CatalogFactory { private static final Set SUPPORTED_FORMATS = Sets.newHashSet(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, TableFormat.ICEBERG); @Override public String factoryIdentifier() { - return ArcticCatalogFactoryOptions.UNIFIED_IDENTIFIER; + return CatalogFactoryOptions.UNIFIED_IDENTIFIER; } @Override public Set> requiredOptions() { Set> requiredOptions = new HashSet<>(); - requiredOptions.add(ArcticCatalogFactoryOptions.METASTORE_URL); + requiredOptions.add(CatalogFactoryOptions.METASTORE_URL); return requiredOptions; } @@ -78,7 +78,7 @@ public Catalog createCatalog(Context context) { helper.validate(); final String defaultDatabase = helper.getOptions().get(DEFAULT_DATABASE); - String metastoreUrl = helper.getOptions().get(ArcticCatalogFactoryOptions.METASTORE_URL); + String metastoreUrl = helper.getOptions().get(CatalogFactoryOptions.METASTORE_URL); String amoroCatalogName = ArcticThriftUrl.parse(metastoreUrl, THRIFT_TABLE_SERVICE_NAME).catalogName(); @@ -114,7 +114,7 @@ private AbstractCatalog createCatalog( switch (tableFormat) { case MIXED_ICEBERG: case MIXED_HIVE: - catalogFactory = new ArcticCatalogFactory(); + catalogFactory = new MixedCatalogFactory(); break; case ICEBERG: catalogFactory = new IcebergFlinkCatalogFactory(hadoopConf); diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/ArcticCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedCatalogFactory.java similarity index 69% rename from mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/ArcticCatalogFactory.java rename to mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedCatalogFactory.java index e5edfe4326..a80a274f1b 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/ArcticCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedCatalogFactory.java @@ -18,15 +18,15 @@ package com.netease.arctic.flink.catalog.factories.mixed; -import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.DEFAULT_DATABASE; -import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.FLINK_TABLE_FORMATS; -import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.METASTORE_URL; +import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.DEFAULT_DATABASE; +import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.FLINK_TABLE_FORMATS; +import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.METASTORE_URL; import static com.netease.arctic.flink.table.KafkaConnectorOptionsUtil.getKafkaParams; import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; import com.netease.arctic.flink.InternalCatalogBuilder; -import com.netease.arctic.flink.catalog.ArcticCatalog; -import com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions; +import com.netease.arctic.flink.catalog.MixedCatalog; +import com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.factories.CatalogFactory; @@ -41,14 +41,14 @@ import java.util.Optional; import java.util.Set; -/** Factory for {@link ArcticCatalog} */ -public class ArcticCatalogFactory implements CatalogFactory { +/** Factory for {@link MixedCatalog} */ +public class MixedCatalogFactory implements CatalogFactory { - private static final Logger LOG = LoggerFactory.getLogger(ArcticCatalogFactory.class); + private static final Logger LOG = LoggerFactory.getLogger(MixedCatalogFactory.class); @Override public String factoryIdentifier() { - return ArcticCatalogFactoryOptions.IDENTIFIER; + return CatalogFactoryOptions.IDENTIFIER; } @Override @@ -67,7 +67,7 @@ public Catalog createCatalog(Context context) { tableFormatsOptional.ifPresent( tableFormats -> catalogProperties.put(FLINK_TABLE_FORMATS.key(), tableFormats)); - return new ArcticCatalog( + return new MixedCatalog( context.getName(), defaultDatabase, InternalCatalogBuilder.builder().metastoreUrl(metastoreUrl).properties(catalogProperties)); @@ -86,16 +86,16 @@ public Set> optionalOptions() { options.add(DEFAULT_DATABASE); // authorization config - options.add(ArcticCatalogFactoryOptions.AUTH_AMS_CONFIGS_DISABLE); - options.add(ArcticCatalogFactoryOptions.AUTH_METHOD); - options.add(ArcticCatalogFactoryOptions.SIMPLE_USER_NAME); - options.add(ArcticCatalogFactoryOptions.KEYTAB_LOGIN_USER); - options.add(ArcticCatalogFactoryOptions.KRB5_CONF_PATH); - options.add(ArcticCatalogFactoryOptions.KRB5_CONF_ENCODE); - options.add(ArcticCatalogFactoryOptions.KEYTAB_PATH); - options.add(ArcticCatalogFactoryOptions.KEYTAB_ENCODE); - - options.add(ArcticCatalogFactoryOptions.FLINK_TABLE_FORMATS); + options.add(CatalogFactoryOptions.AUTH_AMS_CONFIGS_DISABLE); + options.add(CatalogFactoryOptions.AUTH_METHOD); + options.add(CatalogFactoryOptions.SIMPLE_USER_NAME); + options.add(CatalogFactoryOptions.KEYTAB_LOGIN_USER); + options.add(CatalogFactoryOptions.KRB5_CONF_PATH); + options.add(CatalogFactoryOptions.KRB5_CONF_ENCODE); + options.add(CatalogFactoryOptions.KEYTAB_PATH); + options.add(CatalogFactoryOptions.KEYTAB_ENCODE); + + options.add(CatalogFactoryOptions.FLINK_TABLE_FORMATS); return options; } } diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedHiveCatalogFactory.java similarity index 61% rename from mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java rename to mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedHiveCatalogFactory.java index 9ef1ca9de6..a61f4aff7a 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedHiveCatalogFactory.java @@ -16,19 +16,19 @@ * limitations under the License. */ -package com.netease.arctic.flink.catalog.factories; +package com.netease.arctic.flink.catalog.factories.mixed; -import com.netease.arctic.flink.catalog.ArcticCatalog; -import com.netease.arctic.flink.catalog.factories.mixed.ArcticCatalogFactory; +import com.netease.arctic.flink.catalog.MixedCatalog; +import com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions; /** - * The factory to create {@link ArcticCatalog} with {@link - * ArcticCatalogFactoryOptions#MIXED_HIVE_IDENTIFIER} identifier. + * The factory to create {@link MixedCatalog} with {@link + * CatalogFactoryOptions#MIXED_HIVE_IDENTIFIER} identifier. */ -public class MixedHiveCatalogFactory extends ArcticCatalogFactory { +public class MixedHiveCatalogFactory extends MixedCatalogFactory { - @Override - public String factoryIdentifier() { - return ArcticCatalogFactoryOptions.MIXED_HIVE_IDENTIFIER; - } + @Override + public String factoryIdentifier() { + return CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER; + } } diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedIcebergCatalogFactory.java similarity index 61% rename from mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java rename to mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedIcebergCatalogFactory.java index b41949a4c0..e98a78206c 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedIcebergCatalogFactory.java @@ -16,19 +16,19 @@ * limitations under the License. */ -package com.netease.arctic.flink.catalog.factories; +package com.netease.arctic.flink.catalog.factories.mixed; -import com.netease.arctic.flink.catalog.ArcticCatalog; -import com.netease.arctic.flink.catalog.factories.mixed.ArcticCatalogFactory; +import com.netease.arctic.flink.catalog.MixedCatalog; +import com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions; /** - * The factory to create {@link ArcticCatalog} with {@link - * ArcticCatalogFactoryOptions#MIXED_ICEBERG_IDENTIFIER} identifier. + * The factory to create {@link MixedCatalog} with {@link + * CatalogFactoryOptions#MIXED_ICEBERG_IDENTIFIER} identifier. */ -public class MixedIcebergCatalogFactory extends ArcticCatalogFactory { +public class MixedIcebergCatalogFactory extends MixedCatalogFactory { - @Override - public String factoryIdentifier() { - return ArcticCatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER; - } + @Override + public String factoryIdentifier() { + return CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER; + } } diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/AmoroDynamicTableFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/AmoroDynamicTableFactory.java index 347bfa2bc6..bbcfd9d17f 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/AmoroDynamicTableFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/AmoroDynamicTableFactory.java @@ -18,7 +18,7 @@ package com.netease.arctic.flink.table; -import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.UNIFIED_IDENTIFIER; +import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.UNIFIED_IDENTIFIER; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT; import com.netease.arctic.ams.api.TableFormat; diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/ArcticTableLoader.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/ArcticTableLoader.java index b50f0f50b1..254582961b 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/ArcticTableLoader.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/ArcticTableLoader.java @@ -18,7 +18,7 @@ package com.netease.arctic.flink.table; -import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.METASTORE_URL; +import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.METASTORE_URL; import com.netease.arctic.catalog.ArcticCatalog; import com.netease.arctic.flink.InternalCatalogBuilder; diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/DynamicTableFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/DynamicTableFactory.java index f8ddb7fda7..ec8fb8f843 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/DynamicTableFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/DynamicTableFactory.java @@ -20,7 +20,7 @@ import static com.netease.arctic.flink.FlinkSchemaUtil.getPhysicalSchema; import static com.netease.arctic.flink.FlinkSchemaUtil.getPhysicalSchemaForDimTable; -import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.METASTORE_URL; +import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.METASTORE_URL; import static com.netease.arctic.flink.table.KafkaConnectorOptionsUtil.getKafkaProperties; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.LOOKUP_CACHE_MAX_ROWS; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.LOOKUP_CACHE_TTL_AFTER_WRITE; @@ -42,7 +42,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC; import com.netease.arctic.flink.InternalCatalogBuilder; -import com.netease.arctic.flink.catalog.ArcticCatalog; +import com.netease.arctic.flink.catalog.MixedCatalog; import com.netease.arctic.flink.table.descriptors.ArcticValidator; import com.netease.arctic.flink.util.ArcticUtils; import com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil; @@ -84,9 +84,9 @@ public class DynamicTableFactory implements DynamicTableSourceFactory, DynamicTa private InternalCatalogBuilder internalCatalogBuilder; private String internalCatalogName; - public DynamicTableFactory(ArcticCatalog arcticCatalog) { - this.internalCatalogBuilder = arcticCatalog.catalogBuilder(); - this.internalCatalogName = arcticCatalog.amsCatalogName(); + public DynamicTableFactory(MixedCatalog mixedCatalog) { + this.internalCatalogBuilder = mixedCatalog.catalogBuilder(); + this.internalCatalogName = mixedCatalog.amsCatalogName(); } public DynamicTableFactory() {} diff --git a/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index ee0ca4f4fe..278a85f6c5 100644 --- a/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -16,8 +16,8 @@ # limitations under the License. # -com.netease.arctic.flink.catalog.factories.MixedIcebergCatalogFactory -com.netease.arctic.flink.catalog.factories.MixedHiveCatalogFactory -com.netease.arctic.flink.catalog.factories.mixed.ArcticCatalogFactory -com.netease.arctic.flink.catalog.factories.FlinkCatalogFactory +com.netease.arctic.flink.catalog.factories.mixed.MixedIcebergCatalogFactory +com.netease.arctic.flink.catalog.factories.mixed.MixedHiveCatalogFactory +com.netease.arctic.flink.catalog.factories.mixed.MixedCatalogFactory +com.netease.arctic.flink.catalog.factories.FlinkUnifiedCatalogFactory com.netease.arctic.flink.table.DynamicTableFactory diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/FlinkTestBase.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/FlinkTestBase.java index 96b0bfc840..6f753224ac 100644 --- a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/FlinkTestBase.java +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/FlinkTestBase.java @@ -18,7 +18,7 @@ package com.netease.arctic.flink; -import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.IDENTIFIER; +import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.IDENTIFIER; import static com.netease.arctic.flink.kafka.testutils.KafkaContainerTest.KAFKA_CONTAINER; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED; @@ -26,7 +26,7 @@ import com.netease.arctic.TableTestHelper; import com.netease.arctic.catalog.CatalogTestHelper; import com.netease.arctic.catalog.TableTestBase; -import com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions; +import com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions; import com.netease.arctic.flink.write.ArcticRowDataTaskWriterFactory; import com.netease.arctic.io.reader.GenericKeyedDataReader; import com.netease.arctic.scan.CombinedScanTask; @@ -127,7 +127,7 @@ public void before() throws Exception { public void config() { props = Maps.newHashMap(); props.put("type", IDENTIFIER); - props.put(ArcticCatalogFactoryOptions.METASTORE_URL.key(), metastoreUrl); + props.put(CatalogFactoryOptions.METASTORE_URL.key(), metastoreUrl); } public static void prepare() throws Exception { diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java index ed619add55..98728cfa52 100644 --- a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java @@ -18,13 +18,13 @@ package com.netease.arctic.flink.catalog; -import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.METASTORE_URL; +import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.METASTORE_URL; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT; import com.netease.arctic.TestAms; import com.netease.arctic.ams.api.CatalogMeta; import com.netease.arctic.ams.api.TableFormat; -import com.netease.arctic.flink.catalog.factories.FlinkCatalogFactory; +import com.netease.arctic.flink.catalog.factories.FlinkUnifiedCatalogFactory; import com.netease.arctic.hive.TestHMS; import com.netease.arctic.hive.catalog.HiveCatalogTestHelper; import org.apache.flink.configuration.Configuration; @@ -49,7 +49,8 @@ public class FlinkCatalogContext { static final TestHMS TEST_HMS = new TestHMS(); static final TestAms TEST_AMS = new TestAms(); - static final FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory(); + static final FlinkUnifiedCatalogFactory FLINK_UNIFIED_CATALOG_FACTORY = + new FlinkUnifiedCatalogFactory(); static ResolvedSchema resolvedSchema = ResolvedSchema.of( @@ -112,7 +113,8 @@ static FlinkUnifiedCatalog initFlinkCatalog(TableFormat tableFormat) { factoryOptions, new Configuration(), FlinkCatalogContext.class.getClassLoader()); - flinkUnifiedCatalog = (FlinkUnifiedCatalog) flinkCatalogFactory.createCatalog(context); + flinkUnifiedCatalog = + (FlinkUnifiedCatalog) FLINK_UNIFIED_CATALOG_FACTORY.createCatalog(context); flinkUnifiedCatalog.open(); return flinkUnifiedCatalog; } diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java index 844f60dffd..0ad3299cb4 100644 --- a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java @@ -34,7 +34,7 @@ import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.catalog.BasicCatalogTestHelper; import com.netease.arctic.catalog.CatalogTestBase; -import com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions; +import com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.TableIdentifier; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -89,8 +89,8 @@ public TestCatalog() { @Before public void before() throws Exception { props = Maps.newHashMap(); - props.put("type", ArcticCatalogFactoryOptions.IDENTIFIER); - props.put(ArcticCatalogFactoryOptions.METASTORE_URL.key(), getCatalogUrl()); + props.put("type", CatalogFactoryOptions.IDENTIFIER); + props.put(CatalogFactoryOptions.METASTORE_URL.key(), getCatalogUrl()); sql("CREATE CATALOG " + CATALOG + " WITH %s", toWithClause(props)); sql("USE CATALOG " + CATALOG); sql("CREATE DATABASE " + CATALOG + "." + DB); @@ -417,11 +417,11 @@ public void testDefaultCatalogDDLWithVirtualColumn() { // create Table with compute columns under default catalog props = Maps.newHashMap(); - props.put("connector", ArcticCatalogFactoryOptions.IDENTIFIER); - props.put(ArcticCatalogFactoryOptions.METASTORE_URL.key(), getCatalogUrl()); - props.put(ArcticCatalogFactoryOptions.IDENTIFIER + ".catalog", CATALOG); - props.put(ArcticCatalogFactoryOptions.IDENTIFIER + ".database", DB); - props.put(ArcticCatalogFactoryOptions.IDENTIFIER + ".table", TABLE); + props.put("connector", CatalogFactoryOptions.IDENTIFIER); + props.put(CatalogFactoryOptions.METASTORE_URL.key(), getCatalogUrl()); + props.put(CatalogFactoryOptions.IDENTIFIER + ".catalog", CATALOG); + props.put(CatalogFactoryOptions.IDENTIFIER + ".database", DB); + props.put(CatalogFactoryOptions.IDENTIFIER + ".table", TABLE); sql( "CREATE TABLE default_catalog.default_database." diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkUnifiedCatalogs.java similarity index 99% rename from mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java rename to mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkUnifiedCatalogs.java index 521ee03158..29a6be22b8 100644 --- a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkUnifiedCatalogs.java @@ -32,7 +32,7 @@ import java.util.List; import java.util.Map; -class TestFlinkCatalogs { +class TestFlinkUnifiedCatalogs { static FlinkCatalogContext flinkCatalogContext = new FlinkCatalogContext(); @BeforeAll diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestArcticCatalogTablePartitions.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestMixedCatalogTablePartitions.java similarity index 85% rename from mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestArcticCatalogTablePartitions.java rename to mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestMixedCatalogTablePartitions.java index c0b5e2bd8d..1cad61a737 100644 --- a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestArcticCatalogTablePartitions.java +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestMixedCatalogTablePartitions.java @@ -51,11 +51,11 @@ import java.util.LinkedList; import java.util.List; -public class TestArcticCatalogTablePartitions extends FlinkTestBase { +public class TestMixedCatalogTablePartitions extends FlinkTestBase { private final String tableName = "test_partition_table"; private final String db = "test_partition_db"; - public TestArcticCatalogTablePartitions() { + public TestMixedCatalogTablePartitions() { super( new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG), new BasicTableTestHelper(true, true)); @@ -83,20 +83,20 @@ public void testListPartitionsUnKeyedTable() throws TableNotPartitionedException rows); getTableEnv().createTemporaryView("input", input); - sql("CREATE CATALOG arcticCatalog WITH %s", toWithClause(props)); + sql("CREATE CATALOG mixedCatalog WITH %s", toWithClause(props)); sql( - "CREATE TABLE IF NOT EXISTS arcticCatalog." + "CREATE TABLE IF NOT EXISTS mixedCatalog." + db + "." + tableName + "(" + " id INT, name STRING, dt STRING) PARTITIONED BY (dt)"); - sql("INSERT INTO %s select * from input", "arcticCatalog." + db + "." + tableName); + sql("INSERT INTO %s select * from input", "mixedCatalog." + db + "." + tableName); ObjectPath objectPath = new ObjectPath(db, tableName); - ArcticCatalog arcticCatalog = (ArcticCatalog) getTableEnv().getCatalog("arcticCatalog").get(); - List list = arcticCatalog.listPartitions(objectPath); + MixedCatalog mixedCatalog = (MixedCatalog) getTableEnv().getCatalog("mixedCatalog").get(); + List list = mixedCatalog.listPartitions(objectPath); List expected = Lists.newArrayList(); CatalogPartitionSpec partitionSpec1 = @@ -127,20 +127,20 @@ public void testListPartitionsKeyedTable() throws TableNotPartitionedException { getTableEnv().createTemporaryView("input", input); - sql("CREATE CATALOG arcticCatalog WITH %s", toWithClause(props)); + sql("CREATE CATALOG mixedCatalog WITH %s", toWithClause(props)); sql( - "CREATE TABLE IF NOT EXISTS arcticCatalog." + "CREATE TABLE IF NOT EXISTS mixedCatalog." + db + "." + tableName + "(" + " id INT, name STRING, dt STRING, PRIMARY KEY (id) NOT ENFORCED) PARTITIONED BY (dt)"); - sql("INSERT INTO %s select * from input", "arcticCatalog." + db + "." + tableName); + sql("INSERT INTO %s select * from input", "mixedCatalog." + db + "." + tableName); ObjectPath objectPath = new ObjectPath(db, tableName); - ArcticCatalog arcticCatalog = (ArcticCatalog) getTableEnv().getCatalog("arcticCatalog").get(); - List partitionList = arcticCatalog.listPartitions(objectPath); + MixedCatalog mixedCatalog = (MixedCatalog) getTableEnv().getCatalog("mixedCatalog").get(); + List partitionList = mixedCatalog.listPartitions(objectPath); List expected = Lists.newArrayList(); CatalogPartitionSpec partitionSpec1 = @@ -174,15 +174,15 @@ public void testListPartitionsByFilter() Table input = getTableEnv().fromDataStream(rowData, $("id"), $("name"), $("dt")); getTableEnv().createTemporaryView("input", input); - sql("CREATE CATALOG arcticCatalog WITH %s", toWithClause(props)); + sql("CREATE CATALOG mixedCatalog WITH %s", toWithClause(props)); sql( - "CREATE TABLE IF NOT EXISTS arcticCatalog." + "CREATE TABLE IF NOT EXISTS mixedCatalog." + db + "." + tableName + "(" + " id INT, name STRING, dt STRING) PARTITIONED BY (dt,name)"); - sql("INSERT INTO %s select * from input", "arcticCatalog." + db + "." + tableName); + sql("INSERT INTO %s select * from input", "mixedCatalog." + db + "." + tableName); ResolvedExpression dtRef = new FieldReferenceExpression("dt", DataTypes.STRING(), 0, 3); CallExpression callExpression = @@ -192,9 +192,9 @@ public void testListPartitionsByFilter() DataTypes.BOOLEAN()); ObjectPath objectPath = new ObjectPath(db, tableName); - ArcticCatalog arcticCatalog = (ArcticCatalog) getTableEnv().getCatalog("arcticCatalog").get(); + MixedCatalog mixedCatalog = (MixedCatalog) getTableEnv().getCatalog("mixedCatalog").get(); List list = - arcticCatalog.listPartitionsByFilter(objectPath, singletonList(callExpression)); + mixedCatalog.listPartitionsByFilter(objectPath, singletonList(callExpression)); List expected = Lists.newArrayList(); CatalogPartitionSpec partitionSpec1 = @@ -206,14 +206,14 @@ public void testListPartitionsByFilter() Assert.assertEquals("Should produce the expected catalog partition specs.", list, expected); List listCatalogPartitionSpec = - arcticCatalog.listPartitions( + mixedCatalog.listPartitions( objectPath, new CatalogPartitionSpec(ImmutableMap.of("dt", "2023-10-01", "name", "Gerry"))); Assert.assertEquals( "Should produce the expected catalog partition specs.", listCatalogPartitionSpec.size(), 1); try { - arcticCatalog.listPartitions( + mixedCatalog.listPartitions( objectPath, new CatalogPartitionSpec(ImmutableMap.of("dt", "2023-10-01", "name1", "Gerry"))); } catch (Exception e) { diff --git a/mixed/flink/flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/mixed/flink/flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 896b146abb..5ae1bb6194 100644 --- a/mixed/flink/flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/mixed/flink/flink-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -16,5 +16,5 @@ # limitations under the License. # -com.netease.arctic.flink.catalog.factories.mixed.ArcticCatalogFactory +com.netease.arctic.flink.catalog.factories.mixed.MixedCatalogFactory com.netease.arctic.flink.table.DynamicTableFactory \ No newline at end of file From 2d81b880fcd87e989058b0260a74570fb466b6ea Mon Sep 17 00:00:00 2001 From: Xianxun Ye Date: Thu, 14 Dec 2023 18:19:23 +0800 Subject: [PATCH 4/5] fixed baiyangtx's comments. Instantiate the original flink catalog in the unified catalog, instead of unified catalog factory. --- .../flink/catalog/FlinkUnifiedCatalog.java | 217 ++++++++---------- .../factories/FlinkUnifiedCatalogFactory.java | 65 ++---- 2 files changed, 116 insertions(+), 166 deletions(-) diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalog.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalog.java index 1a126455b0..b4abe7a9c9 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalog.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkUnifiedCatalog.java @@ -19,6 +19,7 @@ package com.netease.arctic.flink.catalog; import static com.netease.arctic.ams.api.Constants.THRIFT_TABLE_SERVICE_NAME; +import static com.netease.arctic.flink.catalog.factories.FlinkUnifiedCatalogFactory.SUPPORTED_FORMATS; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT; import com.netease.arctic.AlreadyExistsException; @@ -28,7 +29,10 @@ import com.netease.arctic.UnifiedCatalog; import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.ams.api.client.ArcticThriftUrl; +import com.netease.arctic.flink.catalog.factories.iceberg.IcebergFlinkCatalogFactory; +import com.netease.arctic.flink.catalog.factories.mixed.MixedCatalogFactory; import com.netease.arctic.flink.table.AmoroDynamicTableFactory; +import com.netease.arctic.table.TableIdentifier; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -52,6 +56,7 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.Factory; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -62,7 +67,7 @@ /** This is a Flink catalog wrap a unified catalog. */ public class FlinkUnifiedCatalog extends AbstractCatalog { - private UnifiedCatalog unifiedCatalog; + private final UnifiedCatalog unifiedCatalog; private final String amsUri; private final String amoroCatalogName; /** @@ -70,24 +75,28 @@ public class FlinkUnifiedCatalog extends AbstractCatalog { * *

May include: Iceberg, Mixed and Paimon Catalogs, etc. */ - private final Map availableCatalogs; + private Map availableCatalogs; + + private final CatalogFactory.Context context; + private final org.apache.hadoop.conf.Configuration hadoopConf; public FlinkUnifiedCatalog( String amsUri, - String name, String defaultDatabase, UnifiedCatalog unifiedCatalog, - Map availableCatalogs) { - super(name, defaultDatabase); + CatalogFactory.Context context, + org.apache.hadoop.conf.Configuration hadoopConf) { + super(context.getName(), defaultDatabase); this.amsUri = amsUri; this.amoroCatalogName = ArcticThriftUrl.parse(amsUri, THRIFT_TABLE_SERVICE_NAME).catalogName(); this.unifiedCatalog = unifiedCatalog; - this.availableCatalogs = availableCatalogs; + this.context = context; + this.hadoopConf = hadoopConf; } @Override public void open() throws CatalogException { - availableCatalogs.forEach((tableFormat, catalog) -> catalog.open()); + availableCatalogs = Maps.newHashMap(); } @Override @@ -158,13 +167,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { AmoroTable amoroTable = unifiedCatalog.loadTable(tablePath.getDatabaseName(), tablePath.getObjectName()); - AbstractCatalog catalog = availableCatalogs.get(amoroTable.format()); - if (catalog == null) { - throw new UnsupportedOperationException( - String.format( - "Unsupported operation: get table [%s], %s: %s.", - tablePath, TABLE_FORMAT.key(), amoroTable.format())); - } + AbstractCatalog catalog = originalCatalog(amoroTable); CatalogTable catalogTable = (CatalogTable) catalog.getTable(tablePath); final Map flinkProperties = Maps.newHashMap(catalogTable.getOptions()); flinkProperties.put(TABLE_FORMAT.key(), amoroTable.format().toString()); @@ -200,10 +203,7 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) @Override public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> new UnsupportedOperationException("Unsupported operation: rename table.")); + AbstractCatalog catalog = originalCatalog(tablePath); catalog.renameTable(tablePath, newTableName, ignoreIfNotExists); } @@ -213,32 +213,25 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig Configuration configuration = new Configuration(); table.getOptions().forEach(configuration::setString); TableFormat format = configuration.get(TABLE_FORMAT); - AbstractCatalog catalog = availableCatalogs.get(format); - if (catalog == null) { - throw new UnsupportedOperationException( - String.format( - "Unsupported operation: create table, %s: %s.", TABLE_FORMAT.key(), format)); - } + TableIdentifier tableIdentifier = + TableIdentifier.of( + unifiedCatalog.name(), tablePath.getDatabaseName(), tablePath.getObjectName()); + AbstractCatalog catalog = + getOriginalCatalog(format).orElseGet(() -> createOriginalCatalog(tableIdentifier, format)); catalog.createTable(tablePath, table, ignoreIfExists); } @Override public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> new UnsupportedOperationException("Unsupported operation: alter table.")); + AbstractCatalog catalog = originalCatalog(tablePath); catalog.alterTable(tablePath, newTable, ignoreIfNotExists); } @Override public List listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> new UnsupportedOperationException("Unsupported operation: list partitions.")); + AbstractCatalog catalog = originalCatalog(tablePath); return catalog.listPartitions(tablePath); } @@ -247,10 +240,7 @@ public List listPartitions( ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> new UnsupportedOperationException("Unsupported operation: list partitions.")); + AbstractCatalog catalog = originalCatalog(tablePath); return catalog.listPartitions(tablePath, partitionSpec); } @@ -258,32 +248,22 @@ public List listPartitions( public List listPartitionsByFilter( ObjectPath tablePath, List filters) throws TableNotExistException, TableNotPartitionedException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> - new UnsupportedOperationException( - "Unsupported operation: list partitions by filter.")); + AbstractCatalog catalog = originalCatalog(tablePath); return catalog.listPartitionsByFilter(tablePath, filters); } @Override public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> new UnsupportedOperationException("Unsupported operation: get partition.")); + AbstractCatalog catalog = originalCatalog(tablePath); return catalog.getPartition(tablePath, partitionSpec); } @Override public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { - return getOriginalCatalog(tablePath) - .map(catalog -> catalog.partitionExists(tablePath, partitionSpec)) - .orElseThrow( - () -> new UnsupportedOperationException("Unsupported operation: partition exists.")); + AbstractCatalog catalog = originalCatalog(tablePath); + return catalog.partitionExists(tablePath, partitionSpec); } @Override @@ -294,11 +274,7 @@ public void createPartition( boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> - new UnsupportedOperationException("Unsupported operation: create partition.")); + AbstractCatalog catalog = originalCatalog(tablePath); catalog.createPartition(tablePath, partitionSpec, partition, ignoreIfExists); } @@ -306,10 +282,7 @@ public void createPartition( public void dropPartition( ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> new UnsupportedOperationException("Unsupported operation: drop partition.")); + AbstractCatalog catalog = originalCatalog(tablePath); catalog.dropPartition(tablePath, partitionSpec, ignoreIfNotExists); } @@ -320,10 +293,7 @@ public void alterPartition( CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> new UnsupportedOperationException("Unsupported operation: alter partition.")); + AbstractCatalog catalog = originalCatalog(tablePath); catalog.alterPartition(tablePath, partitionSpec, newPartition, ignoreIfNotExists); } @@ -367,24 +337,14 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) { @Override public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> - new UnsupportedOperationException( - "Unsupported operation: get table statistics.")); + AbstractCatalog catalog = originalCatalog(tablePath); return catalog.getTableStatistics(tablePath); } @Override public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> - new UnsupportedOperationException( - "Unsupported operation: get table column statistics.")); + AbstractCatalog catalog = originalCatalog(tablePath); return catalog.getTableColumnStatistics(tablePath); } @@ -392,12 +352,7 @@ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) public CatalogTableStatistics getPartitionStatistics( ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> - new UnsupportedOperationException( - "Unsupported operation: get partition statistics.")); + AbstractCatalog catalog = originalCatalog(tablePath); return catalog.getPartitionStatistics(tablePath, partitionSpec); } @@ -405,12 +360,7 @@ public CatalogTableStatistics getPartitionStatistics( public CatalogColumnStatistics getPartitionColumnStatistics( ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> - new UnsupportedOperationException( - "Unsupported operation: get partition column statistics.")); + AbstractCatalog catalog = originalCatalog(tablePath); return catalog.getPartitionColumnStatistics(tablePath, partitionSpec); } @@ -418,12 +368,7 @@ public CatalogColumnStatistics getPartitionColumnStatistics( public void alterTableStatistics( ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> - new UnsupportedOperationException( - "Unsupported operation: alter table statistics.")); + AbstractCatalog catalog = originalCatalog(tablePath); catalog.alterTableStatistics(tablePath, tableStatistics, ignoreIfNotExists); } @@ -431,12 +376,7 @@ public void alterTableStatistics( public void alterTableColumnStatistics( ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> - new UnsupportedOperationException( - "Unsupported operation: alter table column statistics.")); + AbstractCatalog catalog = originalCatalog(tablePath); catalog.alterTableColumnStatistics(tablePath, columnStatistics, ignoreIfNotExists); } @@ -447,12 +387,7 @@ public void alterPartitionStatistics( CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> - new UnsupportedOperationException( - "Unsupported operation: alter partition statistics.")); + AbstractCatalog catalog = originalCatalog(tablePath); catalog.alterPartitionStatistics( tablePath, partitionSpec, partitionStatistics, ignoreIfNotExists); } @@ -464,25 +399,71 @@ public void alterPartitionColumnStatistics( CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { - AbstractCatalog catalog = - getOriginalCatalog(tablePath) - .orElseThrow( - () -> - new UnsupportedOperationException( - "Unsupported operation: alter partition column statistics.")); + AbstractCatalog catalog = originalCatalog(tablePath); catalog.alterPartitionColumnStatistics( tablePath, partitionSpec, columnStatistics, ignoreIfNotExists); } - private Optional getOriginalCatalog(ObjectPath tablePath) { - TableFormat format = getTableFormat(tablePath); - return Optional.of(availableCatalogs.get(format)); + /** + * Get the original flink catalog for the given table, if the flink catalog is not exists in the + * cache, would create a new original flink catalog for this table format. + * + * @param amoroTable amoroTable + * @return original Flink catalog + */ + private AbstractCatalog originalCatalog(AmoroTable amoroTable) { + TableFormat format = amoroTable.format(); + TableIdentifier tableIdentifier = amoroTable.id(); + return getOriginalCatalog(format) + .orElseGet(() -> createOriginalCatalog(tableIdentifier, format)); + } + + private AbstractCatalog originalCatalog(ObjectPath tablePath) { + AmoroTable amoroTable = loadAmoroTable(tablePath); + return originalCatalog(amoroTable); } - private TableFormat getTableFormat(ObjectPath tablePath) { - AmoroTable amoroTable = - unifiedCatalog.loadTable(tablePath.getDatabaseName(), tablePath.getObjectName()); - return amoroTable.format(); + private Optional getOriginalCatalog(TableFormat format) { + return Optional.ofNullable(availableCatalogs.get(format)); + } + + private AmoroTable loadAmoroTable(ObjectPath tablePath) { + return unifiedCatalog.loadTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + } + + private AbstractCatalog createOriginalCatalog( + TableIdentifier tableIdentifier, TableFormat tableFormat) { + CatalogFactory catalogFactory; + + switch (tableFormat) { + case MIXED_ICEBERG: + case MIXED_HIVE: + catalogFactory = new MixedCatalogFactory(); + break; + case ICEBERG: + catalogFactory = new IcebergFlinkCatalogFactory(hadoopConf); + break; + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported table format: [%s] in the unified catalog, table identifier is [%s], the supported table formats are [%s].", + tableFormat, tableIdentifier, SUPPORTED_FORMATS)); + } + + AbstractCatalog originalCatalog; + try { + originalCatalog = (AbstractCatalog) catalogFactory.createCatalog(context); + } catch (CatalogException e) { + if (e.getMessage().contains("must implement createCatalog(Context)")) { + originalCatalog = + (AbstractCatalog) catalogFactory.createCatalog(context.getName(), context.getOptions()); + } else { + throw e; + } + } + originalCatalog.open(); + availableCatalogs.put(tableFormat, originalCatalog); + return originalCatalog; } @Override diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkUnifiedCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkUnifiedCatalogFactory.java index d892424fa3..1d7c5eb248 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkUnifiedCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkUnifiedCatalogFactory.java @@ -21,7 +21,6 @@ import static com.netease.arctic.ams.api.Constants.THRIFT_TABLE_SERVICE_NAME; import static com.netease.arctic.ams.api.properties.CatalogMetaProperties.TABLE_FORMATS; import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.DEFAULT_DATABASE; -import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.FLINK_TABLE_FORMATS; import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; import com.netease.arctic.UnifiedCatalog; @@ -29,12 +28,9 @@ import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.ams.api.client.ArcticThriftUrl; import com.netease.arctic.flink.catalog.FlinkUnifiedCatalog; -import com.netease.arctic.flink.catalog.factories.iceberg.IcebergFlinkCatalogFactory; -import com.netease.arctic.flink.catalog.factories.mixed.MixedCatalogFactory; +import com.netease.arctic.utils.CatalogUtil; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.hadoop.conf.Configuration; @@ -42,13 +38,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import java.util.HashSet; -import java.util.Map; import java.util.Set; /** Factory for {@link FlinkUnifiedCatalog}. */ public class FlinkUnifiedCatalogFactory implements CatalogFactory { - private static final Set SUPPORTED_FORMATS = + public static final Set SUPPORTED_FORMATS = Sets.newHashSet(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, TableFormat.ICEBERG); @Override @@ -86,52 +81,26 @@ public Catalog createCatalog(Context context) { UnifiedCatalogLoader.loadUnifiedCatalog(metastoreUrl, amoroCatalogName, Maps.newHashMap()); Configuration hadoopConf = unifiedCatalog.authenticationContext().getConfiguration(); - TableFormat catalogTableFormat = - TableFormat.valueOf(unifiedCatalog.properties().get(TABLE_FORMATS)); - - Map availableCatalogs = Maps.newHashMap(); - SUPPORTED_FORMATS.forEach( - tableFormat -> { - if (!availableCatalogs.containsKey(tableFormat)) { - if (catalogTableFormat == TableFormat.ICEBERG - && (tableFormat.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG))) { - // Mixed catalog couldn't load the iceberg table, so specify the table formats to the - // mixed catalog - context.getOptions().put(FLINK_TABLE_FORMATS.key(), tableFormat.toString()); - } - availableCatalogs.put(tableFormat, createCatalog(context, tableFormat, hadoopConf)); - } - }); + Set tableFormats = + CatalogUtil.tableFormats(unifiedCatalog.metastoreType(), unifiedCatalog.properties()); + validate(tableFormats); return new FlinkUnifiedCatalog( - metastoreUrl, context.getName(), defaultDatabase, unifiedCatalog, availableCatalogs); + metastoreUrl, defaultDatabase, unifiedCatalog, context, hadoopConf); } - private AbstractCatalog createCatalog( - Context context, TableFormat tableFormat, Configuration hadoopConf) { - CatalogFactory catalogFactory; - - switch (tableFormat) { - case MIXED_ICEBERG: - case MIXED_HIVE: - catalogFactory = new MixedCatalogFactory(); - break; - case ICEBERG: - catalogFactory = new IcebergFlinkCatalogFactory(hadoopConf); - break; - default: - throw new UnsupportedOperationException( - String.format("Unsupported table format: [%s] in the amoro catalog." + tableFormat)); + private void validate(Set expectedFormats) { + if (expectedFormats.isEmpty()) { + throw new IllegalArgumentException( + String.format( + "The table formats must be specified in the catalog properties: [%s]", + TABLE_FORMATS)); } - - try { - return (AbstractCatalog) catalogFactory.createCatalog(context); - } catch (CatalogException e) { - if (e.getMessage().contains("must implement createCatalog(Context)")) { - return (AbstractCatalog) - catalogFactory.createCatalog(context.getName(), context.getOptions()); - } - throw e; + if (!SUPPORTED_FORMATS.containsAll(expectedFormats)) { + throw new IllegalArgumentException( + String.format( + "The table formats [%s] are not supported in the unified catalog, the supported table formats are [%s].", + expectedFormats, SUPPORTED_FORMATS)); } } } From 0e3fb9c02048844d1c61744e7eabe4e1334dc58e Mon Sep 17 00:00:00 2001 From: Xianxun Ye Date: Mon, 18 Dec 2023 17:02:38 +0800 Subject: [PATCH 5/5] fixed baiyangtx's comments. --- .../catalog/factories/CatalogFactoryOptions.java | 2 +- .../factories/mixed/MixedCatalogFactory.java | 2 +- .../com/netease/arctic/flink/FlinkTestBase.java | 4 ++-- ...tCatalog.java => TestLegacyMixedCatalog.java} | 16 ++++++++-------- 4 files changed, 12 insertions(+), 12 deletions(-) rename mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/{TestCatalog.java => TestLegacyMixedCatalog.java} (97%) diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/CatalogFactoryOptions.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/CatalogFactoryOptions.java index 5e9608f7a9..a5ad3fbacc 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/CatalogFactoryOptions.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/CatalogFactoryOptions.java @@ -34,7 +34,7 @@ public class CatalogFactoryOptions { public static final String MIXED_ICEBERG_IDENTIFIER = "mixed_iceberg"; public static final String MIXED_HIVE_IDENTIFIER = "mixed_hive"; - @Deprecated public static final String IDENTIFIER = "arctic"; + @Deprecated public static final String LEGACY_MIXED_IDENTIFIER = "arctic"; public static final String UNIFIED_IDENTIFIER = "unified"; public static final ConfigOption DEFAULT_DATABASE = diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedCatalogFactory.java index a80a274f1b..ca5f670818 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/mixed/MixedCatalogFactory.java @@ -48,7 +48,7 @@ public class MixedCatalogFactory implements CatalogFactory { @Override public String factoryIdentifier() { - return CatalogFactoryOptions.IDENTIFIER; + return CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER; } @Override diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/FlinkTestBase.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/FlinkTestBase.java index 6f753224ac..94c5a281a4 100644 --- a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/FlinkTestBase.java +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/FlinkTestBase.java @@ -18,7 +18,7 @@ package com.netease.arctic.flink; -import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.IDENTIFIER; +import static com.netease.arctic.flink.catalog.factories.CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER; import static com.netease.arctic.flink.kafka.testutils.KafkaContainerTest.KAFKA_CONTAINER; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED; @@ -126,7 +126,7 @@ public void before() throws Exception { public void config() { props = Maps.newHashMap(); - props.put("type", IDENTIFIER); + props.put("type", LEGACY_MIXED_IDENTIFIER); props.put(CatalogFactoryOptions.METASTORE_URL.key(), metastoreUrl); } diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestLegacyMixedCatalog.java similarity index 97% rename from mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java rename to mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestLegacyMixedCatalog.java index 0ad3299cb4..da3d84ce16 100644 --- a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestLegacyMixedCatalog.java @@ -70,10 +70,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -public class TestCatalog extends CatalogTestBase { - private static final Logger LOG = LoggerFactory.getLogger(TestCatalog.class); +public class TestLegacyMixedCatalog extends CatalogTestBase { + private static final Logger LOG = LoggerFactory.getLogger(TestLegacyMixedCatalog.class); - public TestCatalog() { + public TestLegacyMixedCatalog() { super(new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG)); } @@ -89,7 +89,7 @@ public TestCatalog() { @Before public void before() throws Exception { props = Maps.newHashMap(); - props.put("type", CatalogFactoryOptions.IDENTIFIER); + props.put("type", CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER); props.put(CatalogFactoryOptions.METASTORE_URL.key(), getCatalogUrl()); sql("CREATE CATALOG " + CATALOG + " WITH %s", toWithClause(props)); sql("USE CATALOG " + CATALOG); @@ -417,11 +417,11 @@ public void testDefaultCatalogDDLWithVirtualColumn() { // create Table with compute columns under default catalog props = Maps.newHashMap(); - props.put("connector", CatalogFactoryOptions.IDENTIFIER); + props.put("connector", CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER); props.put(CatalogFactoryOptions.METASTORE_URL.key(), getCatalogUrl()); - props.put(CatalogFactoryOptions.IDENTIFIER + ".catalog", CATALOG); - props.put(CatalogFactoryOptions.IDENTIFIER + ".database", DB); - props.put(CatalogFactoryOptions.IDENTIFIER + ".table", TABLE); + props.put(CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER + ".catalog", CATALOG); + props.put(CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER + ".database", DB); + props.put(CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER + ".table", TABLE); sql( "CREATE TABLE default_catalog.default_database."