Skip to content

Commit

Permalink
[AMORO-1341] [Flink]: Support UnifiedCatalog to contain Mixed format …
Browse files Browse the repository at this point in the history
…table in Flink Engine (apache#2370)

* [AMORO-1341] [Flink]: Support UnifiedCatalog to contain Mixed format table in Flink Engine
  • Loading branch information
YesOrNo828 authored and ShawHee committed Dec 29, 2023
1 parent 6d076d2 commit 468b767
Show file tree
Hide file tree
Showing 9 changed files with 1,123 additions and 0 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
@Internal
public class ArcticCatalogFactoryOptions {
public static final String IDENTIFIER = "arctic";
public static final String UNIFIED_IDENTIFIER = "unified";

public static final ConfigOption<String> DEFAULT_DATABASE =
ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.flink.catalog.factories;

import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.DEFAULT_DATABASE;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;

import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.flink.catalog.FlinkUnifiedCatalog;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/** Factory for {@link FlinkUnifiedCatalog}. */
public class FlinkCatalogFactory implements CatalogFactory {

private static final Set<TableFormat> SUPPORTED_FORMATS =
Sets.newHashSet(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE);

@Override
public String factoryIdentifier() {
return ArcticCatalogFactoryOptions.UNIFIED_IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(ArcticCatalogFactoryOptions.METASTORE_URL);
return requiredOptions;
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(PROPERTY_VERSION);
options.add(DEFAULT_DATABASE);
return options;
}

@Override
public Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validate();

final String defaultDatabase = helper.getOptions().get(DEFAULT_DATABASE);
String metastoreUrl = helper.getOptions().get(ArcticCatalogFactoryOptions.METASTORE_URL);

Map<TableFormat, AbstractCatalog> availableCatalogs = Maps.newHashMap();
SUPPORTED_FORMATS.forEach(
tableFormat -> {
if (!availableCatalogs.containsKey(tableFormat)) {
availableCatalogs.put(tableFormat, createCatalog(context, tableFormat));
}
});

return new FlinkUnifiedCatalog(
metastoreUrl, context.getName(), defaultDatabase, availableCatalogs);
}

private AbstractCatalog createCatalog(Context context, TableFormat tableFormat) {
CatalogFactory catalogFactory;

switch (tableFormat) {
case MIXED_ICEBERG:
case MIXED_HIVE:
catalogFactory = new ArcticCatalogFactory();
break;
default:
throw new UnsupportedOperationException(
String.format("Unsupported table format: [%s] in the amoro catalog." + tableFormat));
}

return (AbstractCatalog) catalogFactory.createCatalog(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.flink.table;

import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.UNIFIED_IDENTIFIER;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT;

import com.netease.arctic.ams.api.TableFormat;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
* AmoroDynamicTableFactory is a factory for creating dynamic table sources and sinks. It implements
* both DynamicTableSourceFactory and DynamicTableSinkFactory interfaces.
*/
public class AmoroDynamicTableFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory {

private final Map<TableFormat, AbstractCatalog> availableCatalogs;

public AmoroDynamicTableFactory(Map<TableFormat, AbstractCatalog> availableCatalogs) {
this.availableCatalogs =
Preconditions.checkNotNull(availableCatalogs, "availableCatalogs cannot be null");
}

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
ObjectIdentifier identifier = context.getObjectIdentifier();
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
Configuration options = (Configuration) helper.getOptions();
TableFormat tableFormat = options.get(TABLE_FORMAT);

return getOriginalCatalog(tableFormat)
.flatMap(AbstractCatalog::getFactory)
.filter(factory -> factory instanceof DynamicTableSinkFactory)
.map(factory -> ((DynamicTableSinkFactory) factory).createDynamicTableSink(context))
.orElseThrow(
() ->
new UnsupportedOperationException(
String.format(
"Invalid catalog or factory for table format: %s, table: %s.",
tableFormat, identifier)));
}

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
ObjectIdentifier identifier = context.getObjectIdentifier();
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
Configuration options = (Configuration) helper.getOptions();
TableFormat tableFormat = options.get(TABLE_FORMAT);

return getOriginalCatalog(tableFormat)
.flatMap(AbstractCatalog::getFactory)
.filter(factory -> factory instanceof DynamicTableSourceFactory)
.map(factory -> ((DynamicTableSourceFactory) factory).createDynamicTableSource(context))
.orElseThrow(
() ->
new UnsupportedOperationException(
String.format(
"Invalid catalog or factory for table format: %s, table: %s.",
tableFormat, identifier)));
}

private Optional<AbstractCatalog> getOriginalCatalog(TableFormat format) {
return Optional.of(availableCatalogs.get(format));
}

@Override
public String factoryIdentifier() {
return UNIFIED_IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> requiredOptions = Sets.newHashSet();
availableCatalogs.forEach(
(format, catalog) -> {
Optional<Factory> factory = catalog.getFactory();
factory.ifPresent(value -> requiredOptions.addAll(value.requiredOptions()));
});
requiredOptions.add(TABLE_FORMAT);
return requiredOptions;
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> optionalOptions = Sets.newHashSet();
availableCatalogs.forEach(
(format, catalog) -> {
Optional<Factory> factory = catalog.getFactory();
factory.ifPresent(value -> optionalOptions.addAll(value.optionalOptions()));
});
return optionalOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.flink.configuration.description.TextElement.text;

import com.netease.arctic.ams.api.TableFormat;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
Expand Down Expand Up @@ -275,6 +276,19 @@ public class ArcticValidator extends ConnectorDescriptorValidator {
+ " of the key. Default is -1, means it is automatically determined: every shard will be at least 512KB and"
+ " number of shard bits will not exceed 6.");

public static final ConfigOption<TableFormat> TABLE_FORMAT =
ConfigOptions.key("table.format")
.enumType(TableFormat.class)
.defaultValue(TableFormat.MIXED_ICEBERG)
.withDescription(
String.format(
"The format of the table, valid values are %s, %s, %s or %s, and Flink choose '%s' as default format.",
TableFormat.ICEBERG,
TableFormat.MIXED_ICEBERG,
TableFormat.MIXED_HIVE,
TableFormat.PAIMON,
TableFormat.MIXED_ICEBERG));

@Override
public void validate(DescriptorProperties properties) {
String emitMode = properties.getString(ARCTIC_EMIT_MODE.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@
#

com.netease.arctic.flink.catalog.factories.ArcticCatalogFactory
com.netease.arctic.flink.catalog.factories.FlinkCatalogFactory
com.netease.arctic.flink.table.DynamicTableFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.flink.catalog;

import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.METASTORE_URL;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.TABLE_FORMAT;

import com.netease.arctic.TestAms;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.flink.catalog.factories.FlinkCatalogFactory;
import com.netease.arctic.hive.TestHMS;
import com.netease.arctic.hive.catalog.HiveCatalogTestHelper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.params.provider.Arguments;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

public class FlinkCatalogContext {

static final TestHMS TEST_HMS = new TestHMS();
static final TestAms TEST_AMS = new TestAms();
static final FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();

static ResolvedSchema resolvedSchema =
ResolvedSchema.of(
Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT()));
static Schema schema = Schema.newBuilder().fromResolvedSchema(resolvedSchema).build();

ObjectPath objectPath = new ObjectPath("default", "test_hive_from_flink");

static Stream<Arguments> getFlinkCatalogAndTable() {
return Stream.of(
Arguments.of(
initFlinkCatalog(TableFormat.MIXED_HIVE),
generateFlinkTable(TableFormat.MIXED_HIVE.toString()),
TableFormat.MIXED_HIVE),
Arguments.of(
initFlinkCatalog(TableFormat.MIXED_ICEBERG),
generateFlinkTable(TableFormat.MIXED_ICEBERG.toString()),
TableFormat.MIXED_ICEBERG));
}

static ResolvedCatalogTable generateFlinkTable(String tableFormat) {
return new ResolvedCatalogTable(
CatalogTable.of(
schema,
"Flink managed table",
new ArrayList<>(),
Collections.singletonMap(TABLE_FORMAT.key(), tableFormat)),
resolvedSchema);
}

void initial() throws Exception {
TEST_HMS.before();
TEST_AMS.before();
}

void close() {
TEST_AMS.after();
TEST_HMS.after();
}

static FlinkUnifiedCatalog initFlinkCatalog(TableFormat tableFormat) {
FlinkUnifiedCatalog flinkUnifiedCatalog;
Map<String, String> factoryOptions = Maps.newHashMap();
CatalogMeta meta =
HiveCatalogTestHelper.build(TEST_HMS.getHiveConf(), tableFormat)
.buildCatalogMeta(TEST_HMS.getWareHouseLocation());
meta.setCatalogName(tableFormat.name().toLowerCase());

TEST_AMS.getAmsHandler().dropCatalog(meta.getCatalogName());
TEST_AMS.getAmsHandler().createCatalog(meta);

factoryOptions.put(METASTORE_URL.key(), TEST_AMS.getServerUrl() + "/" + meta.getCatalogName());
final FactoryUtil.DefaultCatalogContext context =
new FactoryUtil.DefaultCatalogContext(
"flink_catalog_name",
factoryOptions,
new Configuration(),
FlinkCatalogContext.class.getClassLoader());
flinkUnifiedCatalog = (FlinkUnifiedCatalog) flinkCatalogFactory.createCatalog(context);
flinkUnifiedCatalog.open();
return flinkUnifiedCatalog;
}

HiveMetaStoreClient getHMSClient() {
return TEST_HMS.getHiveClient();
}
}
Loading

0 comments on commit 468b767

Please sign in to comment.