Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.descriptors.CatalogDescriptorValidator;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

/**
* A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
Expand Down Expand Up @@ -100,21 +100,37 @@ protected CatalogLoader createCatalogLoader(String name, Map<String, String> pro
}

@Override
public Map<String, String> requiredContext() {
Map<String, String> context = Maps.newHashMap();
context.put(CatalogDescriptorValidator.CATALOG_TYPE, "iceberg");
context.put(CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION, "1");
return context;
public String factoryIdentifier() {
return FlinkCatalogFactoryOptions.IDENTIFIER;
}

@Override
public List<String> supportedProperties() {
return ImmutableList.of("*");
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FlinkCatalogFactoryOptions.CATALOG_TYPE);
return options;
}

@Override
public Catalog createCatalog(String name, Map<String, String> properties) {
return createCatalog(name, properties, clusterHadoopConf());
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FlinkCatalogFactoryOptions.PROPERTY_VERSION);
options.add(FlinkCatalogFactoryOptions.URI);
options.add(FlinkCatalogFactoryOptions.WAREHOUSE);
options.add(FlinkCatalogFactoryOptions.CLIENTS);
options.add(FlinkCatalogFactoryOptions.BASE_NAMESPACE);
options.add(FlinkCatalogFactoryOptions.HIVE_CONF_DIF);
options.add(FlinkCatalogFactoryOptions.CACHE_ENABLED);
return options;
}
Comment on lines +115 to +125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add any of the known additional catalog properties that are used with some of the AWS catalogs?

Also, for catalogs that offer additional options (including custom catalogs), how will users set those? Presently, supportedOptions has a *. Is that still possible? The usage of * would be the easiest path forward, but that might not be possible.

If not, the ones I can think of are presently coming from the AWS catalogs (though we should also look into the JDBC catalog as well).

Additional Catalog Properties To Consider:
General Catalog Properties

  • catalog-impl
  • io-impl

GlueCatalog Specific Options

  • lock-impl
  • lock.table

DynamoDbCatalog Specific Options

  • dynamodb.table-name

AWS Client Configuration

  • client.factory

There are further ones which I'll try to link to in a bit (such as S3FileIO options for AWS authentication and encryption parameters), but those are the first ones I encountered in the docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, will users be able to specify their own catalog implementation (with its own options) still?


@Override
public Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validate();

return createCatalog(context.getName(), context.getOptions(), clusterHadoopConf());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an effort to decouple the need for Hadoop from the FlinkCatalogFactory for environments where Hadoop is not easily configurable and for catalog implementations that don't actually need it (basically anything that uses S3FileIO - though we'll need to update GlueCatalog and eventually DynamoDbCatalog as well).

Even after updating GlueCatalog to remove the Hadoop Configurable interface, this call to clusterHadoopConf() still makes it so that Hadoop is needed on the class path.

I'm not proposing that we change that in this PR (as this PR has been open for a while, and it's a separate concern), but I wanted to draw attention to the issue as I'm a bit less informed on the Flink side compared to many of you: #3044

}

protected Catalog createCatalog(String name, Map<String, String> properties, Configuration hadoopConf) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

public class FlinkCatalogFactoryOptions {
private FlinkCatalogFactoryOptions() {
}

public static final String IDENTIFIER = "iceberg";

public static final ConfigOption<String> CATALOG_TYPE =
ConfigOptions.key("catalog-type")
.stringType()
.noDefaultValue()
.withDescription("Iceberg catalog type, 'hive' or 'hadoop'");

public static final ConfigOption<String> PROPERTY_VERSION =
ConfigOptions.key("property-version")
.stringType()
.defaultValue("1")
.withDescription(
"Version number to describe the property version. This property can be used for backwards " +
"compatibility in case the property format changes. The current property version is `1`. (Optional)");

public static final ConfigOption<String> URI =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reuse some constants from CatalogProperties in iceberg-core module? like

ConfigOptions.key(CatalogProperties.URI)

ConfigOptions.key("uri")
.stringType()
.noDefaultValue()
.withDescription("The Hive Metastore URI (Hive catalog only)");

public static final ConfigOption<String> CLIENTS =
ConfigOptions.key("clients")
.stringType()
.noDefaultValue()
.withDescription("The Hive Client Pool Size (Hive catalog only)");

public static final ConfigOption<String> HIVE_CONF_DIF =
ConfigOptions.key("hive-conf-dir")
.stringType()
.noDefaultValue()
.withDescription(
"Path to a directory containing a hive-site.xml configuration file which will be used to provide " +
"custom Hive configuration values. The value of hive.metastore.warehouse.dir from" +
" <hive-conf-dir>/hive-site.xml (or hive configure file from classpath) will be overwrote with " +
"the warehouse value if setting both hive-conf-dir and warehouse when creating iceberg catalog.");

public static final ConfigOption<String> BASE_NAMESPACE =
ConfigOptions.key("base-namespace")
.stringType()
.noDefaultValue()
.withDescription("A base namespace as the prefix for all databases (Hadoop catalog only)");

public static final ConfigOption<String> WAREHOUSE =
ConfigOptions.key("warehouse")
.stringType()
.noDefaultValue()
.withDescription("The warehouse path (Hadoop catalog only)");

public static final ConfigOption<String> CACHE_ENABLED =
ConfigOptions.key("cache-enabled")
.stringType()
.defaultValue("true")
.withDescription("Whether to cache the catalog in FlinkCatalog.");
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing option default-database

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is some options for using sql to create catalog, I think we don’t need to add default-database , http://iceberg.apache.org/flink/#creating-catalogs-and-using-catalogs

Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.iceberg.flink.FlinkCatalogFactory
org.apache.iceberg.flink.FlinkCatalogFactory
2 changes: 1 addition & 1 deletion versions.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
org.slf4j:* = 1.7.25
org.apache.avro:avro = 1.9.2
org.apache.calcite:* = 1.10.0
org.apache.flink:* = 1.12.1
org.apache.flink:* = 1.13.0
org.apache.hadoop:* = 2.7.3
org.apache.hive:hive-metastore = 2.3.8
org.apache.hive:hive-serde = 2.3.8
Expand Down