Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ project(':iceberg-parquet') {
api project(':iceberg-api')
implementation project(':iceberg-core')
implementation project(':iceberg-common')
implementation 'io.airlift:aircompressor'

implementation(libs.parquet.avro) {
exclude group: 'org.apache.avro', module: 'avro'
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hadoop.HadoopDependency;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -113,7 +114,8 @@ private String metadataFileLocation(Table table) {
}

private FileIO fileIO(Table table) {
if (table.io() instanceof HadoopConfigurable) {
if (HadoopDependency.isHadoopCommonOnClasspath(SerializableTable.class.getClassLoader())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the hadoop is not on the classpath, but the table is HadoopConfigurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No configuration will be passed through.

&& table.io() instanceof HadoopConfigurable) {
((HadoopConfigurable) table.io()).serializeConfWith(SerializableConfSupplier::new);
}

Expand Down
35 changes: 35 additions & 0 deletions core/src/main/java/org/apache/iceberg/hadoop/HadoopDependency.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.hadoop;

/** Responsible telling if specific Hadoop dependencies are on classpath. */
public class HadoopDependency {

private HadoopDependency() {}

public static boolean isHadoopCommonOnClasspath(ClassLoader classLoader) {
try {
Class.forName("org.apache.hadoop.conf.Configuration", false, classLoader);
Class.forName("org.apache.hadoop.security.UserGroupInformation", false, classLoader);
return true;
} catch (ClassNotFoundException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@

/** Wraps a {@link Configuration} object in a {@link Serializable} layer. */
public class SerializableConfiguration implements Serializable {
private static final long serialVersionUID = -8840976521081151175L;

private transient Configuration hadoopConf;

public SerializableConfiguration(Configuration hadoopConf) {
this.hadoopConf = hadoopConf;
}

public SerializableConfiguration(Object hadoopConf) {
this.hadoopConf = (Configuration) hadoopConf;
}
Comment on lines +37 to +39

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point here? When there's no Hadoop one classpath then it will blow up no matter what, right?
Additionally explicit casts are just brittle. This question applies to all other such places where Object is passed.
In Flink this is solved in a way that Hadoop specific class usages are protected with isHadoopCommonOnClasspath and that's it, works like charm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also interested in the answer to Gabor's question.

also wondering if we can get overload ambiguity from the two constructors?


private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
hadoopConf.write(out);
Expand All @@ -47,4 +52,8 @@ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOE
public Configuration get() {
return hadoopConf;
}

public Configuration getClone() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe name this method just as config().

getCone can look like a clone of this SeriazableConfiguration class.

return new Configuration(hadoopConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.Serializable;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
Expand Down Expand Up @@ -49,21 +48,20 @@ public interface CatalogLoader extends Serializable, Cloneable {
@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
CatalogLoader clone();

static CatalogLoader hadoop(
String name, Configuration hadoopConf, Map<String, String> properties) {
static CatalogLoader hadoop(String name, Object hadoopConf, Map<String, String> properties) {
return new HadoopCatalogLoader(name, hadoopConf, properties);
}

static CatalogLoader hive(String name, Configuration hadoopConf, Map<String, String> properties) {
static CatalogLoader hive(String name, Object hadoopConf, Map<String, String> properties) {
return new HiveCatalogLoader(name, hadoopConf, properties);
}

static CatalogLoader rest(String name, Configuration hadoopConf, Map<String, String> properties) {
static CatalogLoader rest(String name, Object hadoopConf, Map<String, String> properties) {
return new RESTCatalogLoader(name, hadoopConf, properties);
}

static CatalogLoader custom(
String name, Map<String, String> properties, Configuration hadoopConf, String impl) {
String name, Map<String, String> properties, Object hadoopConf, String impl) {
return new CustomCatalogLoader(name, properties, hadoopConf, impl);
}

Expand All @@ -73,8 +71,7 @@ class HadoopCatalogLoader implements CatalogLoader {
private final String warehouseLocation;
private final Map<String, String> properties;

private HadoopCatalogLoader(
String catalogName, Configuration conf, Map<String, String> properties) {
private HadoopCatalogLoader(String catalogName, Object conf, Map<String, String> properties) {
this.catalogName = catalogName;
this.hadoopConf = new SerializableConfiguration(conf);
this.warehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION);
Expand All @@ -90,7 +87,7 @@ public Catalog loadCatalog() {
@Override
@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
public CatalogLoader clone() {
return new HadoopCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties);
return new HadoopCatalogLoader(catalogName, hadoopConf.getClone(), properties);
}

@Override
Expand All @@ -110,8 +107,7 @@ class HiveCatalogLoader implements CatalogLoader {
private final int clientPoolSize;
private final Map<String, String> properties;

private HiveCatalogLoader(
String catalogName, Configuration conf, Map<String, String> properties) {
private HiveCatalogLoader(String catalogName, Object conf, Map<String, String> properties) {
this.catalogName = catalogName;
this.hadoopConf = new SerializableConfiguration(conf);
this.uri = properties.get(CatalogProperties.URI);
Expand All @@ -132,7 +128,7 @@ public Catalog loadCatalog() {
@Override
@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
public CatalogLoader clone() {
return new HiveCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties);
return new HiveCatalogLoader(catalogName, hadoopConf.getClone(), properties);
}

@Override
Expand All @@ -151,23 +147,33 @@ class RESTCatalogLoader implements CatalogLoader {
private final SerializableConfiguration hadoopConf;
private final Map<String, String> properties;

private RESTCatalogLoader(
String catalogName, Configuration conf, Map<String, String> properties) {
private RESTCatalogLoader(String catalogName, Object conf, Map<String, String> properties) {
this.catalogName = catalogName;
this.hadoopConf = new SerializableConfiguration(conf);
if (conf != null) {
this.hadoopConf = new SerializableConfiguration(conf);
} else {
this.hadoopConf = null;
}
this.properties = Maps.newHashMap(properties);
}

@Override
public Catalog loadCatalog() {
return CatalogUtil.loadCatalog(
RESTCatalog.class.getName(), catalogName, properties, hadoopConf.get());
Object conf = null;
if (hadoopConf != null) {
conf = hadoopConf.get();
}
return CatalogUtil.loadCatalog(RESTCatalog.class.getName(), catalogName, properties, conf);
}

@Override
@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
public CatalogLoader clone() {
return new RESTCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties);
Object conf = null;
if (hadoopConf != null) {
conf = hadoopConf.getClone();
}
return new RESTCatalogLoader(catalogName, conf, properties);
}

@Override
Expand All @@ -187,8 +193,12 @@ class CustomCatalogLoader implements CatalogLoader {
private final String impl;

private CustomCatalogLoader(
String name, Map<String, String> properties, Configuration conf, String impl) {
this.hadoopConf = new SerializableConfiguration(conf);
String name, Map<String, String> properties, Object conf, String impl) {
if (conf != null) {
this.hadoopConf = new SerializableConfiguration(conf);
} else {
this.hadoopConf = null;
}
this.properties = Maps.newHashMap(properties); // wrap into a hashmap for serialization
this.name = name;
this.impl =
Expand All @@ -198,13 +208,21 @@ private CustomCatalogLoader(

@Override
public Catalog loadCatalog() {
return CatalogUtil.loadCatalog(impl, name, properties, hadoopConf.get());
Object conf = null;
if (hadoopConf != null) {
conf = hadoopConf.get();
}
return CatalogUtil.loadCatalog(impl, name, properties, conf);
}

@Override
@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
public CatalogLoader clone() {
return new CustomCatalogLoader(name, properties, new Configuration(hadoopConf.get()), impl);
Object conf = null;
if (hadoopConf != null) {
conf = hadoopConf.getClone();
}
return new CustomCatalogLoader(name, properties, conf, impl);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,18 @@
*/
package org.apache.iceberg.flink;

import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.hadoop.HadoopDependency;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.flink.util.HadoopUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
Expand All @@ -56,7 +52,7 @@
* </ul>
*
* <p>To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override
* {@link #createCatalogLoader(String, Map, Configuration)}.
* {@link #createCatalogLoader(String, Map, Object)}.
*/
public class FlinkCatalogFactory implements CatalogFactory {

Expand Down Expand Up @@ -85,7 +81,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
* @return an Iceberg catalog loader
*/
static CatalogLoader createCatalogLoader(
String name, Map<String, String> properties, Configuration hadoopConf) {
String name, Map<String, String> properties, Object hadoopConf) {
String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL);
if (catalogImpl != null) {
String catalogType = properties.get(ICEBERG_CATALOG_TYPE);
Expand All @@ -103,10 +99,10 @@ static CatalogLoader createCatalogLoader(
case ICEBERG_CATALOG_TYPE_HIVE:
// The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in
// that case it will
// fallback to parse those values from hadoop configuration which is loaded from classpath.
// fall back to parse those values from hadoop configuration which is loaded from classpath.
String hiveConfDir = properties.get(HIVE_CONF_DIR);
String hadoopConfDir = properties.get(HADOOP_CONF_DIR);
Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir);
Object newHadoopConf = HadoopUtil.mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir);
return CatalogLoader.hive(name, newHadoopConf, properties);

case ICEBERG_CATALOG_TYPE_HADOOP:
Expand Down Expand Up @@ -139,8 +135,7 @@ public Catalog createCatalog(String name, Map<String, String> properties) {
return createCatalog(name, properties, clusterHadoopConf());
}

protected Catalog createCatalog(
String name, Map<String, String> properties, Configuration hadoopConf) {
protected Catalog createCatalog(String name, Map<String, String> properties, Object hadoopConf) {
CatalogLoader catalogLoader = createCatalogLoader(name, properties, hadoopConf);
String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, DEFAULT_DATABASE_NAME);

Expand Down Expand Up @@ -172,42 +167,11 @@ protected Catalog createCatalog(
cacheExpirationIntervalMs);
}

private static Configuration mergeHiveConf(
Configuration hadoopConf, String hiveConfDir, String hadoopConfDir) {
Configuration newConf = new Configuration(hadoopConf);
if (!Strings.isNullOrEmpty(hiveConfDir)) {
Preconditions.checkState(
Files.exists(Paths.get(hiveConfDir, "hive-site.xml")),
"There should be a hive-site.xml file under the directory %s",
hiveConfDir);
newConf.addResource(new Path(hiveConfDir, "hive-site.xml"));
public static Object clusterHadoopConf() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better in the HadoopUtil?

if (HadoopDependency.isHadoopCommonOnClasspath(CatalogLoader.class.getClassLoader())) {
return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
} else {
// If don't provide the hive-site.xml path explicitly, it will try to load resource from
// classpath. If still
// couldn't load the configuration file, then it will throw exception in HiveCatalog.
URL configFile = CatalogLoader.class.getClassLoader().getResource("hive-site.xml");
if (configFile != null) {
newConf.addResource(configFile);
}
return null;
}

if (!Strings.isNullOrEmpty(hadoopConfDir)) {
Preconditions.checkState(
Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")),
"Failed to load Hadoop configuration: missing %s",
Paths.get(hadoopConfDir, "hdfs-site.xml"));
newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml"));
Preconditions.checkState(
Files.exists(Paths.get(hadoopConfDir, "core-site.xml")),
"Failed to load Hadoop configuration: missing %s",
Paths.get(hadoopConfDir, "core-site.xml"));
newConf.addResource(new Path(hadoopConfDir, "core-site.xml"));
}

return newConf;
}

public static Configuration clusterHadoopConf() {
return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private FlinkConfigOptions() {}
public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
.booleanType()
.noDefaultValue()
.defaultValue(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a backwards incompatible change. I'm not sure how widely it's used, but we need to bdo some research and be more vocal about this change, if we decide to go ahead with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with @pvary that this changes the default behavior, which calls Util.mayHaveBlockLocations(table.io(), table.location()) from Hadoop module to figure out if locality is enabled for hdfs scheme.

would it work if we add the isHadoopCommonOnClasspath check at the beginning of the Util#mayHaveBlockLocations class in Hadoop module? return false if Hadoop common not on class path?

  public static boolean mayHaveBlockLocations(FileIO io, String location) {
    if (usesHadoopFileIO(io, location)) {
      InputFile inputFile = io.newInputFile(location);
      if (inputFile instanceof HadoopInputFile) {
        String scheme = ((HadoopInputFile) inputFile).getFileSystem().getScheme();
        return LOCALITY_WHITELIST_FS.contains(scheme);

      } else {
        return false;
      }
    }

    return false;
  }

.withDescription(
"Expose split host information to use Flink's locality aware split assigner.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ private static TableLoader createTableLoader(
String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null");

org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
org.apache.hadoop.conf.Configuration hadoopConf =
(org.apache.hadoop.conf.Configuration) FlinkCatalogFactory.clusterHadoopConf();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So FlinkSQL will still need hadoop on the classpath?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with Peter that this seems problematic, as the return value can be null

FlinkCatalogFactory factory = new FlinkCatalogFactory();
FlinkCatalog flinkCatalog =
(FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf);
Expand Down
Loading