diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkCatalog.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkCatalog.java new file mode 100644 index 0000000000..aed1227295 --- /dev/null +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/FlinkCatalog.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.catalog; + +import static com.netease.arctic.ams.api.Constants.THRIFT_TABLE_SERVICE_NAME; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT; + +import com.netease.arctic.AlreadyExistsException; +import com.netease.arctic.AmoroTable; +import com.netease.arctic.NoSuchDatabaseException; +import com.netease.arctic.NoSuchTableException; +import com.netease.arctic.UnifiedCatalog; +import com.netease.arctic.UnifiedCatalogLoader; +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.ams.api.client.ArcticThriftUrl; +import com.netease.arctic.flink.table.AmoroDynamicTableFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.Factory; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** This is a Flink catalog wrap a unified catalog. */ +public class FlinkCatalog extends AbstractCatalog { + private UnifiedCatalog unifiedCatalog; + private final String amsUri; + private final String amoroCatalogName; + /** + * Available Flink catalogs for Unified Catalog. + * + *

May include: Iceberg, Mixed and Paimon Catalogs, etc. + */ + private final Map availableCatalogs; + + public FlinkCatalog( + String amsUri, + String name, + String defaultDatabase, + Map availableCatalogs) { + super(name, defaultDatabase); + this.amsUri = amsUri; + this.amoroCatalogName = ArcticThriftUrl.parse(amsUri, THRIFT_TABLE_SERVICE_NAME).catalogName(); + this.availableCatalogs = availableCatalogs; + } + + @Override + public void open() throws CatalogException { + unifiedCatalog = + UnifiedCatalogLoader.loadUnifiedCatalog(amsUri, amoroCatalogName, Maps.newHashMap()); + availableCatalogs.forEach((tableFormat, catalog) -> catalog.open()); + } + + @Override + public void close() throws CatalogException { + if (availableCatalogs != null) { + availableCatalogs.forEach((tableFormat, catalog) -> catalog.close()); + } + } + + @Override + public List listDatabases() throws CatalogException { + return unifiedCatalog.listDatabases(); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException("Unsupported operation: get database."); + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + return listDatabases().stream().anyMatch(db -> db.equalsIgnoreCase(databaseName)); + } + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + try { + unifiedCatalog.createDatabase(name); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), name); + } + } + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + try { + unifiedCatalog.dropDatabase(name); + } catch (NoSuchDatabaseException e) { + if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + } + + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException("Unsupported operation: alter database."); + } + + @Override + public List listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + return unifiedCatalog.listTables(databaseName).stream() + .map(table -> table.getIdentifier().getTableName()) + .collect(java.util.stream.Collectors.toList()); + } + + @Override + public List listViews(String databaseName) + throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + AmoroTable amoroTable = + unifiedCatalog.loadTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + AbstractCatalog catalog = availableCatalogs.get(amoroTable.format()); + if (catalog == null) { + throw new UnsupportedOperationException( + String.format( + "Unsupported operation: get table [%s], %s: %s.", + tablePath, TABLE_FORMAT.key(), amoroTable.format())); + } + CatalogBaseTable catalogBaseTable = catalog.getTable(tablePath); + catalogBaseTable.getOptions().put(TABLE_FORMAT.key(), amoroTable.format().toString()); + return catalogBaseTable; + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + try { + unifiedCatalog.loadTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + return true; + } catch (NoSuchDatabaseException | NoSuchTableException e) { + return false; + } + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + unifiedCatalog.dropTable(tablePath.getDatabaseName(), tablePath.getObjectName(), true); + } catch (NoSuchTableException e) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(getName(), tablePath); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> new UnsupportedOperationException("Unsupported operation: rename table.")); + catalog.renameTable(tablePath, newTableName, ignoreIfNotExists); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + Configuration configuration = new Configuration(); + table.getOptions().forEach(configuration::setString); + TableFormat format = configuration.get(TABLE_FORMAT); + AbstractCatalog catalog = availableCatalogs.get(format); + if (catalog == null) { + throw new UnsupportedOperationException( + String.format( + "Unsupported operation: create table, %s: %s.", TABLE_FORMAT.key(), format)); + } + catalog.createTable(tablePath, table, ignoreIfExists); + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> new UnsupportedOperationException("Unsupported operation: alter table.")); + catalog.alterTable(tablePath, newTable, ignoreIfNotExists); + } + + @Override + public List listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> new UnsupportedOperationException("Unsupported operation: list partitions.")); + return catalog.listPartitions(tablePath); + } + + @Override + public List listPartitions( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, + CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> new UnsupportedOperationException("Unsupported operation: list partitions.")); + return catalog.listPartitions(tablePath, partitionSpec); + } + + @Override + public List listPartitionsByFilter( + ObjectPath tablePath, List filters) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported operation: list partitions by filter.")); + return catalog.listPartitionsByFilter(tablePath, filters); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> new UnsupportedOperationException("Unsupported operation: get partition.")); + return catalog.getPartition(tablePath, partitionSpec); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + return getOriginalCatalog(tablePath) + .map(catalog -> catalog.partitionExists(tablePath, partitionSpec)) + .orElseThrow( + () -> new UnsupportedOperationException("Unsupported operation: partition exists.")); + } + + @Override + public void createPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition partition, + boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, + PartitionAlreadyExistsException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> + new UnsupportedOperationException("Unsupported operation: create partition.")); + catalog.createPartition(tablePath, partitionSpec, partition, ignoreIfExists); + } + + @Override + public void dropPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> new UnsupportedOperationException("Unsupported operation: drop partition.")); + catalog.dropPartition(tablePath, partitionSpec, ignoreIfNotExists); + } + + @Override + public void alterPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> new UnsupportedOperationException("Unsupported operation: alter partition.")); + catalog.alterPartition(tablePath, partitionSpec, newPartition, ignoreIfNotExists); + } + + @Override + public Optional getFactory() { + return Optional.of(new AmoroDynamicTableFactory(availableCatalogs)); + } + + @Override + public List listFunctions(String dbName) + throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) + throws FunctionNotExistException, CatalogException { + throw new FunctionNotExistException(getName(), functionPath); + } + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + return false; + } + + @Override + public void createFunction( + ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException("Unsupported operation: create function."); + } + + @Override + public void alterFunction( + ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException("Unsupported operation: alter function."); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException("Unsupported operation: drop function."); + } + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported operation: get table statistics.")); + return catalog.getTableStatistics(tablePath); + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported operation: get table column statistics.")); + return catalog.getTableColumnStatistics(tablePath); + } + + @Override + public CatalogTableStatistics getPartitionStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported operation: get partition statistics.")); + return catalog.getPartitionStatistics(tablePath, partitionSpec); + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported operation: get partition column statistics.")); + return catalog.getPartitionColumnStatistics(tablePath, partitionSpec); + } + + @Override + public void alterTableStatistics( + ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported operation: alter table statistics.")); + catalog.alterTableStatistics(tablePath, tableStatistics, ignoreIfNotExists); + } + + @Override + public void alterTableColumnStatistics( + ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException, TablePartitionedException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported operation: alter table column statistics.")); + catalog.alterTableColumnStatistics(tablePath, columnStatistics, ignoreIfNotExists); + } + + @Override + public void alterPartitionStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported operation: alter partition statistics.")); + catalog.alterPartitionStatistics( + tablePath, partitionSpec, partitionStatistics, ignoreIfNotExists); + } + + @Override + public void alterPartitionColumnStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + AbstractCatalog catalog = + getOriginalCatalog(tablePath) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported operation: alter partition column statistics.")); + catalog.alterPartitionColumnStatistics( + tablePath, partitionSpec, columnStatistics, ignoreIfNotExists); + } + + private Optional getOriginalCatalog(ObjectPath tablePath) { + TableFormat format = getTableFormat(tablePath); + return Optional.of(availableCatalogs.get(format)); + } + + private TableFormat getTableFormat(ObjectPath tablePath) throws CatalogException { + AmoroTable amoroTable = + unifiedCatalog.loadTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + return amoroTable.format(); + } + + @Override + public String toString() { + return "FlinkCatalog{" + + "name='" + + getName() + + '\'' + + ", defaultDatabase='" + + getDefaultDatabase() + + '\'' + + ", amsUri='" + + amsUri + + '\'' + + ", amoroCatalogName='" + + amoroCatalogName + + '\'' + + ", availableCatalogs size=" + + availableCatalogs.size() + + "}"; + } +} diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java index 1aa62d25ff..5b54be38f4 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/ArcticCatalogFactoryOptions.java @@ -31,6 +31,7 @@ @Internal public class ArcticCatalogFactoryOptions { public static final String IDENTIFIER = "arctic"; + public static final String AMORO_IDENTIFIER = "amoro"; public static final ConfigOption DEFAULT_DATABASE = ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java new file mode 100644 index 0000000000..61e16be3d9 --- /dev/null +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/catalog/factories/FlinkCatalogFactory.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.catalog.factories; + +import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.DEFAULT_DATABASE; +import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; + +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.flink.catalog.FlinkCatalog; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** Factory for {@link FlinkCatalog}. */ +public class FlinkCatalogFactory implements CatalogFactory { + + private static final Set SUPPORTED_FORMATS = + Sets.newHashSet(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE); + + @Override + public String factoryIdentifier() { + return ArcticCatalogFactoryOptions.AMORO_IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> requiredOptions = new HashSet<>(); + requiredOptions.add(ArcticCatalogFactoryOptions.METASTORE_URL); + return requiredOptions; + } + + @Override + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + options.add(PROPERTY_VERSION); + options.add(DEFAULT_DATABASE); + return options; + } + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validate(); + + final String defaultDatabase = helper.getOptions().get(DEFAULT_DATABASE); + String metastoreUrl = helper.getOptions().get(ArcticCatalogFactoryOptions.METASTORE_URL); + + Map availableCatalogs = Maps.newHashMap(); + SUPPORTED_FORMATS.forEach( + tableFormat -> { + if (!availableCatalogs.containsKey(tableFormat)) { + availableCatalogs.put(tableFormat, createCatalog(context, tableFormat)); + } + }); + + return new FlinkCatalog(metastoreUrl, context.getName(), defaultDatabase, availableCatalogs); + } + + private AbstractCatalog createCatalog(Context context, TableFormat tableFormat) { + CatalogFactory catalogFactory; + + switch (tableFormat) { + case MIXED_ICEBERG: + case MIXED_HIVE: + catalogFactory = new ArcticCatalogFactory(); + break; + default: + throw new UnsupportedOperationException( + String.format("Unsupported table format: [%s] in the amoro catalog." + tableFormat)); + } + + return (AbstractCatalog) catalogFactory.createCatalog(context); + } +} diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/AmoroDynamicTableFactory.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/AmoroDynamicTableFactory.java new file mode 100644 index 0000000000..79f88c5a0e --- /dev/null +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/AmoroDynamicTableFactory.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.table; + +import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.AMORO_IDENTIFIER; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT; + +import com.netease.arctic.ams.api.TableFormat; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * AmoroDynamicTableFactory is a factory for creating dynamic table sources and sinks. It implements + * both DynamicTableSourceFactory and DynamicTableSinkFactory interfaces. + */ +public class AmoroDynamicTableFactory + implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + private final Map availableCatalogs; + + public AmoroDynamicTableFactory(Map availableCatalogs) { + this.availableCatalogs = + Preconditions.checkNotNull(availableCatalogs, "availableCatalogs cannot be null"); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + ObjectIdentifier identifier = context.getObjectIdentifier(); + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + Configuration options = (Configuration) helper.getOptions(); + TableFormat tableFormat = options.get(TABLE_FORMAT); + + return getOriginalCatalog(tableFormat) + .flatMap(AbstractCatalog::getFactory) + .filter(factory -> factory instanceof DynamicTableSinkFactory) + .map(factory -> ((DynamicTableSinkFactory) factory).createDynamicTableSink(context)) + .orElseThrow( + () -> + new UnsupportedOperationException( + String.format( + "Invalid catalog or factory for table format: %s, table: %s.", + tableFormat, identifier))); + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + ObjectIdentifier identifier = context.getObjectIdentifier(); + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + Configuration options = (Configuration) helper.getOptions(); + TableFormat tableFormat = options.get(TABLE_FORMAT); + + return getOriginalCatalog(tableFormat) + .flatMap(AbstractCatalog::getFactory) + .filter(factory -> factory instanceof DynamicTableSourceFactory) + .map(factory -> ((DynamicTableSourceFactory) factory).createDynamicTableSource(context)) + .orElseThrow( + () -> + new UnsupportedOperationException( + String.format( + "Invalid catalog or factory for table format: %s, table: %s.", + tableFormat, identifier))); + } + + private Optional getOriginalCatalog(TableFormat format) { + return Optional.of(availableCatalogs.get(format)); + } + + @Override + public String factoryIdentifier() { + return AMORO_IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> requiredOptions = Sets.newHashSet(); + availableCatalogs.forEach( + (format, catalog) -> { + Optional factory = catalog.getFactory(); + factory.ifPresent(value -> requiredOptions.addAll(value.requiredOptions())); + }); + requiredOptions.add(TABLE_FORMAT); + return requiredOptions; + } + + @Override + public Set> optionalOptions() { + Set> optionalOptions = Sets.newHashSet(); + availableCatalogs.forEach( + (format, catalog) -> { + Optional factory = catalog.getFactory(); + factory.ifPresent(value -> optionalOptions.addAll(value.optionalOptions())); + }); + return optionalOptions; + } +} diff --git a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java index 3dfc82b21a..74fcd96c7a 100644 --- a/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java +++ b/mixed/flink/flink-common/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java @@ -20,6 +20,7 @@ import static org.apache.flink.configuration.description.TextElement.text; +import com.netease.arctic.ams.api.TableFormat; import org.apache.commons.lang.StringUtils; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -275,6 +276,19 @@ public class ArcticValidator extends ConnectorDescriptorValidator { + " of the key. Default is -1, means it is automatically determined: every shard will be at least 512KB and" + " number of shard bits will not exceed 6."); + public static final ConfigOption TABLE_FORMAT = + ConfigOptions.key("table.format") + .enumType(TableFormat.class) + .defaultValue(TableFormat.MIXED_HIVE) + .withDescription( + String.format( + "The format of the table, valid values are %s, %s, %s or %s, and Flink choose '%s' as default format.", + TableFormat.ICEBERG, + TableFormat.MIXED_ICEBERG, + TableFormat.MIXED_HIVE, + TableFormat.PAIMON, + TableFormat.MIXED_HIVE)); + @Override public void validate(DescriptorProperties properties) { String emitMode = properties.getString(ARCTIC_EMIT_MODE.key()); diff --git a/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 93b485b78e..5081930298 100644 --- a/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/mixed/flink/flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -17,4 +17,5 @@ # com.netease.arctic.flink.catalog.factories.ArcticCatalogFactory +com.netease.arctic.flink.catalog.factories.FlinkCatalogFactory com.netease.arctic.flink.table.DynamicTableFactory diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java new file mode 100644 index 0000000000..40ff184fe2 --- /dev/null +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogContext.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.catalog; + +import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.METASTORE_URL; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT; + +import com.netease.arctic.TestAms; +import com.netease.arctic.ams.api.CatalogMeta; +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.flink.catalog.factories.FlinkCatalogFactory; +import com.netease.arctic.hive.TestHMS; +import com.netease.arctic.hive.catalog.HiveCatalogTestHelper; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.params.provider.Arguments; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Stream; + +public class FlinkCatalogContext { + + static final TestHMS TEST_HMS = new TestHMS(); + static final TestAms TEST_AMS = new TestAms(); + static final FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory(); + + static ResolvedSchema resolvedSchema = + ResolvedSchema.of( + Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT())); + static Schema schema = Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(); + + ObjectPath objectPath = new ObjectPath("default", "test_hive_from_flink"); + + static Stream getFlinkCatalogAndTable() { + return Stream.of( + Arguments.of( + initFlinkCatalog(TableFormat.MIXED_HIVE), + generateFlinkTable(TableFormat.MIXED_HIVE.toString()), + TableFormat.MIXED_HIVE), + Arguments.of( + initFlinkCatalog(TableFormat.MIXED_ICEBERG), + generateFlinkTable(TableFormat.MIXED_ICEBERG.toString()), + TableFormat.MIXED_ICEBERG)); + } + + static ResolvedCatalogTable generateFlinkTable(String tableFormat) { + return new ResolvedCatalogTable( + CatalogTable.of( + schema, + "Flink managed table", + new ArrayList<>(), + Collections.singletonMap(TABLE_FORMAT.key(), tableFormat)), + resolvedSchema); + } + + void initial() throws Exception { + TEST_HMS.before(); + TEST_AMS.before(); + } + + void close() { + TEST_AMS.after(); + TEST_HMS.after(); + } + + static FlinkCatalog initFlinkCatalog(TableFormat tableFormat) { + FlinkCatalog flinkCatalog; + Map factoryOptions = Maps.newHashMap(); + CatalogMeta meta = + HiveCatalogTestHelper.build(TEST_HMS.getHiveConf(), tableFormat) + .buildCatalogMeta(TEST_HMS.getWareHouseLocation()); + meta.setCatalogName(tableFormat.name().toLowerCase()); + + TEST_AMS.getAmsHandler().dropCatalog(meta.getCatalogName()); + TEST_AMS.getAmsHandler().createCatalog(meta); + + factoryOptions.put(METASTORE_URL.key(), TEST_AMS.getServerUrl() + "/" + meta.getCatalogName()); + final FactoryUtil.DefaultCatalogContext context = + new FactoryUtil.DefaultCatalogContext( + "flink_catalog_name", + factoryOptions, + new Configuration(), + FlinkCatalogContext.class.getClassLoader()); + flinkCatalog = (FlinkCatalog) flinkCatalogFactory.createCatalog(context); + flinkCatalog.open(); + return flinkCatalog; + } + + HiveMetaStoreClient getHMSClient() { + return TEST_HMS.getHiveClient(); + } +} diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogITCase.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogITCase.java new file mode 100644 index 0000000000..348c43fe1e --- /dev/null +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/FlinkCatalogITCase.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.catalog; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.netease.arctic.BasicTableTestHelper; +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.catalog.BasicCatalogTestHelper; +import com.netease.arctic.catalog.CatalogTestHelper; +import com.netease.arctic.flink.table.CatalogITCaseBase; +import com.netease.arctic.hive.TestHMS; +import com.netease.arctic.hive.catalog.HiveCatalogTestHelper; +import com.netease.arctic.table.TableIdentifier; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.types.Row; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +@RunWith(value = Parameterized.class) +public class FlinkCatalogITCase extends CatalogITCaseBase { + static final TestHMS TEST_HMS = new TestHMS(); + AbstractCatalog flinkCatalog; + TableIdentifier identifier; + + public FlinkCatalogITCase(CatalogTestHelper catalogTestHelper) { + super(catalogTestHelper, new BasicTableTestHelper(true, false)); + } + + @Parameterized.Parameters(name = "catalogTestHelper = {0}") + public static Object[][] parameters() { + return new Object[][] { + {new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf())}, + {new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG)} + }; + } + + @BeforeClass + public static void beforeAll() throws Exception { + TEST_HMS.before(); + } + + @Before + public void setup() throws Exception { + String catalog = "amoro"; + exec("CREATE CATALOG %s WITH ('type'='amoro', 'metastore.url'='%s')", catalog, getCatalogUrl()); + exec("USE CATALOG %s", catalog); + exec("USE %s", tableTestHelper().id().getDatabase()); + Optional catalogOptional = getTableEnv().getCatalog(catalog); + assertTrue(catalogOptional.isPresent()); + flinkCatalog = (AbstractCatalog) catalogOptional.get(); + assertEquals(catalog, flinkCatalog.getName()); + identifier = tableTestHelper().id(); + } + + @After + public void teardown() { + TEST_HMS.after(); + if (flinkCatalog != null) { + flinkCatalog.close(); + } + } + + @Test + public void testTableExists() throws TableNotExistException { + CatalogBaseTable catalogBaseTable = + flinkCatalog.getTable(new ObjectPath(identifier.getDatabase(), identifier.getTableName())); + assertNotNull(catalogBaseTable); + assertEquals( + tableTestHelper().tableSchema().columns().size(), + catalogBaseTable.getUnresolvedSchema().getColumns().size()); + } + + @Test + public void testInsertAndQuery() throws Exception { + exec( + "INSERT INTO %s SELECT 1, 'Lily', 1234567890, TO_TIMESTAMP('2020-01-01 01:02:03')", + identifier.getTableName()); + TableResult tableResult = + exec("select * from %s /*+OPTIONS('monitor-interval'='1s')*/ ", identifier.getTableName()); + + tableResult.await(30, TimeUnit.SECONDS); + + Row actualRow = tableResult.collect().next(); + assertEquals( + Row.of(1, "Lily", 1234567890L, "2020-01-01T01:02:03").toString(), actualRow.toString()); + } +} diff --git a/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java new file mode 100644 index 0000000000..db17c928ab --- /dev/null +++ b/mixed/flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestFlinkCatalogs.java @@ -0,0 +1,143 @@ +package com.netease.arctic.flink.catalog; + +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.netease.arctic.ams.api.TableFormat; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.thrift.TException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +class TestFlinkCatalogs { + static FlinkCatalogContext flinkCatalogContext = new FlinkCatalogContext(); + + @BeforeAll + public static void setupCatalogMeta() throws Exception { + flinkCatalogContext.initial(); + } + + @AfterAll + public static void tearDown() { + flinkCatalogContext.close(); + } + + @ParameterizedTest + @MethodSource("com.netease.arctic.flink.catalog.FlinkCatalogContext#getFlinkCatalogAndTable") + void testListDatabases(FlinkCatalog flinkCatalog) throws TException { + List expects = flinkCatalogContext.getHMSClient().getAllDatabases(); + assertEquals(expects, flinkCatalog.listDatabases()); + } + + @ParameterizedTest + @MethodSource("com.netease.arctic.flink.catalog.FlinkCatalogContext#getFlinkCatalogAndTable") + void testDatabaseExists(FlinkCatalog flinkCatalog) { + assertTrue(flinkCatalog.databaseExists("default")); + assertFalse(flinkCatalog.databaseExists("not_exists_db")); + } + + @ParameterizedTest + @MethodSource("com.netease.arctic.flink.catalog.FlinkCatalogContext#getFlinkCatalogAndTable") + void testCreateAndDropDatabase(FlinkCatalog flinkCatalog) + throws DatabaseAlreadyExistException, DatabaseNotEmptyException, DatabaseNotExistException { + flinkCatalog.createDatabase( + "test", new CatalogDatabaseImpl(Collections.emptyMap(), "test"), false); + assertTrue(flinkCatalog.databaseExists("test")); + + flinkCatalog.dropDatabase("test", false); + assertFalse(flinkCatalog.databaseExists("test")); + } + + @ParameterizedTest + @MethodSource("com.netease.arctic.flink.catalog.FlinkCatalogContext#getFlinkCatalogAndTable") + void testAlterDatabase(FlinkCatalog flinkCatalog, CatalogTable table, TableFormat tableFormat) + throws DatabaseNotExistException { + try { + flinkCatalog.alterDatabase( + "default", new CatalogDatabaseImpl(Collections.emptyMap(), "default"), false); + } catch (UnsupportedOperationException e) { + // Mixed-format catalog does not support altering database. + if (tableFormat != TableFormat.MIXED_HIVE && tableFormat != TableFormat.MIXED_ICEBERG) { + throw e; + } + } + } + + @ParameterizedTest + @MethodSource("com.netease.arctic.flink.catalog.FlinkCatalogContext#getFlinkCatalogAndTable") + void testCreateGetAndDropTable( + FlinkCatalog flinkCatalog, CatalogTable table, TableFormat tableFormat) + throws TableAlreadyExistException, DatabaseNotExistException, TableNotExistException { + ObjectPath objectPath = flinkCatalogContext.objectPath; + + flinkCatalog.createTable(flinkCatalogContext.objectPath, table, false); + assertTrue(flinkCatalog.tableExists(objectPath)); + + CatalogBaseTable actualTable = flinkCatalog.getTable(objectPath); + assertEquals(table.getUnresolvedSchema(), actualTable.getUnresolvedSchema()); + assertEquals(tableFormat.toString(), actualTable.getOptions().get(TABLE_FORMAT.key())); + + flinkCatalog.dropTable(objectPath, false); + assertFalse(flinkCatalog.tableExists(objectPath)); + } + + @ParameterizedTest + @MethodSource("com.netease.arctic.flink.catalog.FlinkCatalogContext#getFlinkCatalogAndTable") + void testAlterTable(FlinkCatalog flinkCatalog, CatalogTable table, TableFormat tableFormat) + throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException { + try { + flinkCatalog.createTable(flinkCatalogContext.objectPath, table, true); + + ResolvedSchema newResolvedSchema = + ResolvedSchema.of( + Column.physical("name", DataTypes.STRING()), + Column.physical("age", DataTypes.INT()), + Column.physical("address", DataTypes.STRING())); + String comment = "Flink new Table"; + Map newProperties = Maps.newHashMap(); + newProperties.put("new_key", "new_value"); + + CatalogBaseTable newTable = + new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build(), + comment, + new ArrayList<>(), + newProperties), + newResolvedSchema); + try { + flinkCatalog.alterTable(flinkCatalogContext.objectPath, newTable, false); + } catch (UnsupportedOperationException e) { + // https://github.com/NetEase/amoro/issues/2 altering Mixed format table is not supported. + if (tableFormat != TableFormat.MIXED_HIVE && tableFormat != TableFormat.MIXED_ICEBERG) { + throw e; + } + } + } finally { + flinkCatalog.dropTable(flinkCatalogContext.objectPath, true); + } + } +}