From 9a6b522a51dc54d5b5e28a0ca4c074c46b456954 Mon Sep 17 00:00:00 2001 From: Xianxun Ye Date: Tue, 12 Dec 2023 16:09:32 +0800 Subject: [PATCH 1/2] [AMORO-2423] [Flink]: Using 'mixed_iceberg' and 'mixed_hive' indentifier to CREATE CATALOG and deprecate 'arctic' indentifer --- .../ArcticCatalogFactoryOptions.java | 4 +- .../factories/MixedHiveCatalogFactory.java | 52 +++++++++++++++++++ .../factories/MixedIcebergCatalogFactory.java | 52 +++++++++++++++++++ .../org.apache.flink.table.factories.Factory | 2 + .../arctic/flink/catalog/TestCatalog.java | 21 +++++++- 5 files changed, 128 insertions(+), 3 deletions(-) create mode 100644 mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java create mode 100644 mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java index 1420164125..cc83285afc 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java @@ -30,7 +30,9 @@ /** {@link ConfigOption}s for {@link ArcticCatalog}. */ @Internal public class ArcticCatalogFactoryOptions { - public static final String IDENTIFIER = "arctic"; + public static final String MIXED_ICEBERG_IDENTIFIER = "mixed_iceberg"; + public static final String MIXED_HIVE_IDENTIFIER = "mixed_hive"; + @Deprecated public static final String IDENTIFIER = "arctic"; public static final String UNIFIED_IDENTIFIER = "unified"; public static final ConfigOption DEFAULT_DATABASE = diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java new file mode 100644 index 0000000000..9f02f7904d --- /dev/null +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.catalog.factories; + +import com.netease.arctic.flink.catalog.ArcticCatalog; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; + +import java.util.Set; + +/** + * The factory to create {@link ArcticCatalog} with {@link + * ArcticCatalogFactoryOptions#MIXED_HIVE_IDENTIFIER} identifier. + */ +public class MixedHiveCatalogFactory extends ArcticCatalogFactory { + + @Override + public String factoryIdentifier() { + return ArcticCatalogFactoryOptions.MIXED_HIVE_IDENTIFIER; + } + + @Override + public Catalog createCatalog(Context context) { + return super.createCatalog(context); + } + + @Override + public Set> requiredOptions() { + return super.requiredOptions(); + } + + @Override + public Set> optionalOptions() { + return super.optionalOptions(); + } +} diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java new file mode 100644 index 0000000000..c6712db411 --- /dev/null +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.catalog.factories; + +import com.netease.arctic.flink.catalog.ArcticCatalog; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; + +import java.util.Set; + +/** + * The factory to create {@link ArcticCatalog} with {@link + * ArcticCatalogFactoryOptions#MIXED_ICEBERG_IDENTIFIER} identifier. + */ +public class MixedIcebergCatalogFactory extends ArcticCatalogFactory { + + @Override + public String factoryIdentifier() { + return ArcticCatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER; + } + + @Override + public Catalog createCatalog(Context context) { + return super.createCatalog(context); + } + + @Override + public Set> requiredOptions() { + return super.requiredOptions(); + } + + @Override + public Set> optionalOptions() { + return super.optionalOptions(); + } +} diff --git a/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 5081930298..adad559681 100644 --- a/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -16,6 +16,8 @@ # limitations under the License. # +com.netease.arctic.flink.catalog.factories.MixedIcebergCatalogFactory +com.netease.arctic.flink.catalog.factories.MixedHiveCatalogFactory com.netease.arctic.flink.catalog.factories.ArcticCatalogFactory com.netease.arctic.flink.catalog.factories.FlinkCatalogFactory com.netease.arctic.flink.table.DynamicTableFactory diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java index 081e5c1609..844f60dffd 100644 --- a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java @@ -98,13 +98,30 @@ public void before() throws Exception { @After public void after() { - sql("DROP TABLE " + CATALOG + "." + DB + "." + TABLE); - sql("DROP DATABASE " + CATALOG + "." + DB); + sql("DROP TABLE IF EXISTS " + CATALOG + "." + DB + "." + TABLE); + sql("DROP DATABASE IF EXISTS " + CATALOG + "." + DB); Assert.assertTrue(CollectionUtil.isNullOrEmpty(getMixedFormatCatalog().listDatabases())); sql("USE CATALOG default_catalog"); sql("DROP CATALOG " + CATALOG); } + @Test + public void testCreateIcebergHiveCatalog() { + sql( + "CREATE CATALOG mixed_iceberg_catalog WITH ('type'='mixed_iceberg', 'metastore.url'='%s')", + getCatalogUrl()); + sql( + "CREATE CATALOG mixed_hive_catalog WITH ('type'='mixed_hive', 'metastore.url'='%s')", + getCatalogUrl()); + + String[] catalogs = getTableEnv().listCatalogs(); + Assert.assertArrayEquals( + Arrays.stream(catalogs).sorted().toArray(), + Stream.of("default_catalog", "arcticCatalog", "mixed_iceberg_catalog", "mixed_hive_catalog") + .sorted() + .toArray()); + } + @Test public void testDDL() throws IOException { sql( From cc89bce201bd57d289cfd5cf104f02ecd7ca6911 Mon Sep 17 00:00:00 2001 From: Xianxun Ye Date: Wed, 13 Dec 2023 11:13:51 +0800 Subject: [PATCH 2/2] fixed baiyangtx's comments. --- .../factories/MixedHiveCatalogFactory.java | 19 ------------------- .../factories/MixedIcebergCatalogFactory.java | 19 ------------------- 2 files changed, 38 deletions(-) diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java index 9f02f7904d..f7c261e6bf 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedHiveCatalogFactory.java @@ -19,10 +19,6 @@ package com.netease.arctic.flink.catalog.factories; import com.netease.arctic.flink.catalog.ArcticCatalog; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.table.catalog.Catalog; - -import java.util.Set; /** * The factory to create {@link ArcticCatalog} with {@link @@ -34,19 +30,4 @@ public class MixedHiveCatalogFactory extends ArcticCatalogFactory { public String factoryIdentifier() { return ArcticCatalogFactoryOptions.MIXED_HIVE_IDENTIFIER; } - - @Override - public Catalog createCatalog(Context context) { - return super.createCatalog(context); - } - - @Override - public Set> requiredOptions() { - return super.requiredOptions(); - } - - @Override - public Set> optionalOptions() { - return super.optionalOptions(); - } } diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java index c6712db411..8730697130 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/MixedIcebergCatalogFactory.java @@ -19,10 +19,6 @@ package com.netease.arctic.flink.catalog.factories; import com.netease.arctic.flink.catalog.ArcticCatalog; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.table.catalog.Catalog; - -import java.util.Set; /** * The factory to create {@link ArcticCatalog} with {@link @@ -34,19 +30,4 @@ public class MixedIcebergCatalogFactory extends ArcticCatalogFactory { public String factoryIdentifier() { return ArcticCatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER; } - - @Override - public Catalog createCatalog(Context context) { - return super.createCatalog(context); - } - - @Override - public Set> requiredOptions() { - return super.requiredOptions(); - } - - @Override - public Set> optionalOptions() { - return super.optionalOptions(); - } }