diff --git a/build.gradle b/build.gradle index 213109852ec4..1d92e86ade7e 100644 --- a/build.gradle +++ b/build.gradle @@ -792,6 +792,7 @@ project(':iceberg-parquet') { api project(':iceberg-api') implementation project(':iceberg-core') implementation project(':iceberg-common') + implementation 'io.airlift:aircompressor' implementation(libs.parquet.avro) { exclude group: 'org.apache.avro', module: 'avro' diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 9e0055a10376..6103f2b31bd8 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.hadoop.HadoopConfigurable; +import org.apache.iceberg.hadoop.HadoopDependency; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -113,7 +114,8 @@ private String metadataFileLocation(Table table) { } private FileIO fileIO(Table table) { - if (table.io() instanceof HadoopConfigurable) { + if (HadoopDependency.isHadoopCommonOnClasspath(SerializableTable.class.getClassLoader()) + && table.io() instanceof HadoopConfigurable) { ((HadoopConfigurable) table.io()).serializeConfWith(SerializableConfSupplier::new); } diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopDependency.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopDependency.java new file mode 100644 index 000000000000..6947ec8d71c6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopDependency.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop; + +/** Responsible telling if specific Hadoop dependencies are on classpath. */ +public class HadoopDependency { + + private HadoopDependency() {} + + public static boolean isHadoopCommonOnClasspath(ClassLoader classLoader) { + try { + Class.forName("org.apache.hadoop.conf.Configuration", false, classLoader); + Class.forName("org.apache.hadoop.security.UserGroupInformation", false, classLoader); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java b/core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java index 3e9f17455f81..75e338e65adc 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java @@ -26,6 +26,7 @@ /** Wraps a {@link Configuration} object in a {@link Serializable} layer. */ public class SerializableConfiguration implements Serializable { + private static final long serialVersionUID = -8840976521081151175L; private transient Configuration hadoopConf; @@ -33,6 +34,10 @@ public SerializableConfiguration(Configuration hadoopConf) { this.hadoopConf = hadoopConf; } + public SerializableConfiguration(Object hadoopConf) { + this.hadoopConf = (Configuration) hadoopConf; + } + private void writeObject(ObjectOutputStream out) throws IOException { out.defaultWriteObject(); hadoopConf.write(out); @@ -47,4 +52,8 @@ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOE public Configuration get() { return hadoopConf; } + + public Configuration getClone() { + return new Configuration(hadoopConf); + } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java index 18473bf4f190..65670725670c 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java @@ -20,7 +20,6 @@ import java.io.Serializable; import java.util.Map; -import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; @@ -49,21 +48,20 @@ public interface CatalogLoader extends Serializable, Cloneable { @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) CatalogLoader clone(); - static CatalogLoader hadoop( - String name, Configuration hadoopConf, Map properties) { + static CatalogLoader hadoop(String name, Object hadoopConf, Map properties) { return new HadoopCatalogLoader(name, hadoopConf, properties); } - static CatalogLoader hive(String name, Configuration hadoopConf, Map properties) { + static CatalogLoader hive(String name, Object hadoopConf, Map properties) { return new HiveCatalogLoader(name, hadoopConf, properties); } - static CatalogLoader rest(String name, Configuration hadoopConf, Map properties) { + static CatalogLoader rest(String name, Object hadoopConf, Map properties) { return new RESTCatalogLoader(name, hadoopConf, properties); } static CatalogLoader custom( - String name, Map properties, Configuration hadoopConf, String impl) { + String name, Map properties, Object hadoopConf, String impl) { return new CustomCatalogLoader(name, properties, hadoopConf, impl); } @@ -73,8 +71,7 @@ class HadoopCatalogLoader implements CatalogLoader { private final String warehouseLocation; private final Map properties; - private HadoopCatalogLoader( - String catalogName, Configuration conf, Map properties) { + private HadoopCatalogLoader(String catalogName, Object conf, Map properties) { this.catalogName = catalogName; this.hadoopConf = new SerializableConfiguration(conf); this.warehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION); @@ -90,7 +87,7 @@ public Catalog loadCatalog() { @Override @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) public CatalogLoader clone() { - return new HadoopCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties); + return new HadoopCatalogLoader(catalogName, hadoopConf.getClone(), properties); } @Override @@ -110,8 +107,7 @@ class HiveCatalogLoader implements CatalogLoader { private final int clientPoolSize; private final Map properties; - private HiveCatalogLoader( - String catalogName, Configuration conf, Map properties) { + private HiveCatalogLoader(String catalogName, Object conf, Map properties) { this.catalogName = catalogName; this.hadoopConf = new SerializableConfiguration(conf); this.uri = properties.get(CatalogProperties.URI); @@ -132,7 +128,7 @@ public Catalog loadCatalog() { @Override @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) public CatalogLoader clone() { - return new HiveCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties); + return new HiveCatalogLoader(catalogName, hadoopConf.getClone(), properties); } @Override @@ -151,23 +147,33 @@ class RESTCatalogLoader implements CatalogLoader { private final SerializableConfiguration hadoopConf; private final Map properties; - private RESTCatalogLoader( - String catalogName, Configuration conf, Map properties) { + private RESTCatalogLoader(String catalogName, Object conf, Map properties) { this.catalogName = catalogName; - this.hadoopConf = new SerializableConfiguration(conf); + if (conf != null) { + this.hadoopConf = new SerializableConfiguration(conf); + } else { + this.hadoopConf = null; + } this.properties = Maps.newHashMap(properties); } @Override public Catalog loadCatalog() { - return CatalogUtil.loadCatalog( - RESTCatalog.class.getName(), catalogName, properties, hadoopConf.get()); + Object conf = null; + if (hadoopConf != null) { + conf = hadoopConf.get(); + } + return CatalogUtil.loadCatalog(RESTCatalog.class.getName(), catalogName, properties, conf); } @Override @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) public CatalogLoader clone() { - return new RESTCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties); + Object conf = null; + if (hadoopConf != null) { + conf = hadoopConf.getClone(); + } + return new RESTCatalogLoader(catalogName, conf, properties); } @Override @@ -187,8 +193,12 @@ class CustomCatalogLoader implements CatalogLoader { private final String impl; private CustomCatalogLoader( - String name, Map properties, Configuration conf, String impl) { - this.hadoopConf = new SerializableConfiguration(conf); + String name, Map properties, Object conf, String impl) { + if (conf != null) { + this.hadoopConf = new SerializableConfiguration(conf); + } else { + this.hadoopConf = null; + } this.properties = Maps.newHashMap(properties); // wrap into a hashmap for serialization this.name = name; this.impl = @@ -198,13 +208,21 @@ private CustomCatalogLoader( @Override public Catalog loadCatalog() { - return CatalogUtil.loadCatalog(impl, name, properties, hadoopConf.get()); + Object conf = null; + if (hadoopConf != null) { + conf = hadoopConf.get(); + } + return CatalogUtil.loadCatalog(impl, name, properties, conf); } @Override @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) public CatalogLoader clone() { - return new CustomCatalogLoader(name, properties, new Configuration(hadoopConf.get()), impl); + Object conf = null; + if (hadoopConf != null) { + conf = hadoopConf.getClone(); + } + return new CustomCatalogLoader(name, properties, conf, impl); } @Override diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index fe4008a13ce5..bd163a0e5b79 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -18,22 +18,18 @@ */ package org.apache.iceberg.flink; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.List; import java.util.Locale; import java.util.Map; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.hadoop.HadoopDependency; import org.apache.flink.runtime.util.HadoopUtils; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.factories.CatalogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.flink.util.HadoopUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.PropertyUtil; @@ -56,7 +52,7 @@ * * *

To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override - * {@link #createCatalogLoader(String, Map, Configuration)}. + * {@link #createCatalogLoader(String, Map, Object)}. */ public class FlinkCatalogFactory implements CatalogFactory { @@ -85,7 +81,7 @@ public class FlinkCatalogFactory implements CatalogFactory { * @return an Iceberg catalog loader */ static CatalogLoader createCatalogLoader( - String name, Map properties, Configuration hadoopConf) { + String name, Map properties, Object hadoopConf) { String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL); if (catalogImpl != null) { String catalogType = properties.get(ICEBERG_CATALOG_TYPE); @@ -103,10 +99,10 @@ static CatalogLoader createCatalogLoader( case ICEBERG_CATALOG_TYPE_HIVE: // The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in // that case it will - // fallback to parse those values from hadoop configuration which is loaded from classpath. + // fall back to parse those values from hadoop configuration which is loaded from classpath. String hiveConfDir = properties.get(HIVE_CONF_DIR); String hadoopConfDir = properties.get(HADOOP_CONF_DIR); - Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir); + Object newHadoopConf = HadoopUtil.mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir); return CatalogLoader.hive(name, newHadoopConf, properties); case ICEBERG_CATALOG_TYPE_HADOOP: @@ -139,8 +135,7 @@ public Catalog createCatalog(String name, Map properties) { return createCatalog(name, properties, clusterHadoopConf()); } - protected Catalog createCatalog( - String name, Map properties, Configuration hadoopConf) { + protected Catalog createCatalog(String name, Map properties, Object hadoopConf) { CatalogLoader catalogLoader = createCatalogLoader(name, properties, hadoopConf); String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, DEFAULT_DATABASE_NAME); @@ -172,42 +167,11 @@ protected Catalog createCatalog( cacheExpirationIntervalMs); } - private static Configuration mergeHiveConf( - Configuration hadoopConf, String hiveConfDir, String hadoopConfDir) { - Configuration newConf = new Configuration(hadoopConf); - if (!Strings.isNullOrEmpty(hiveConfDir)) { - Preconditions.checkState( - Files.exists(Paths.get(hiveConfDir, "hive-site.xml")), - "There should be a hive-site.xml file under the directory %s", - hiveConfDir); - newConf.addResource(new Path(hiveConfDir, "hive-site.xml")); + public static Object clusterHadoopConf() { + if (HadoopDependency.isHadoopCommonOnClasspath(CatalogLoader.class.getClassLoader())) { + return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()); } else { - // If don't provide the hive-site.xml path explicitly, it will try to load resource from - // classpath. If still - // couldn't load the configuration file, then it will throw exception in HiveCatalog. - URL configFile = CatalogLoader.class.getClassLoader().getResource("hive-site.xml"); - if (configFile != null) { - newConf.addResource(configFile); - } + return null; } - - if (!Strings.isNullOrEmpty(hadoopConfDir)) { - Preconditions.checkState( - Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")), - "Failed to load Hadoop configuration: missing %s", - Paths.get(hadoopConfDir, "hdfs-site.xml")); - newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml")); - Preconditions.checkState( - Files.exists(Paths.get(hadoopConfDir, "core-site.xml")), - "Failed to load Hadoop configuration: missing %s", - Paths.get(hadoopConfDir, "core-site.xml")); - newConf.addResource(new Path(hadoopConfDir, "core-site.xml")); - } - - return newConf; - } - - public static Configuration clusterHadoopConf() { - return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()); } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java index 7c7afd24ed8e..2cfcde7a7a90 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java @@ -69,7 +69,7 @@ private FlinkConfigOptions() {} public static final ConfigOption TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO = ConfigOptions.key("table.exec.iceberg.expose-split-locality-info") .booleanType() - .noDefaultValue() + .defaultValue(false) .withDescription( "Expose split host information to use Flink's locality aware split assigner."); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java index b7f1be4b93fb..909375425655 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java @@ -163,7 +163,8 @@ private static TableLoader createTableLoader( String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName); Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null"); - org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); + org.apache.hadoop.conf.Configuration hadoopConf = + (org.apache.hadoop.conf.Configuration) FlinkCatalogFactory.clusterHadoopConf(); FlinkCatalogFactory factory = new FlinkCatalogFactory(); FlinkCatalog flinkCatalog = (FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index da509451fee7..4eb384cf3a31 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -21,7 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.Serializable; -import org.apache.hadoop.conf.Configuration; +import org.apache.flink.util.Preconditions; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -54,7 +54,7 @@ static TableLoader fromHadoopTable(String location) { return fromHadoopTable(location, FlinkCatalogFactory.clusterHadoopConf()); } - static TableLoader fromHadoopTable(String location, Configuration hadoopConf) { + static TableLoader fromHadoopTable(String location, Object hadoopConf) { return new HadoopTableLoader(location, hadoopConf); } @@ -67,8 +67,9 @@ class HadoopTableLoader implements TableLoader { private transient HadoopTables tables; - private HadoopTableLoader(String location, Configuration conf) { + private HadoopTableLoader(String location, Object conf) { this.location = location; + Preconditions.checkNotNull(conf, "Could not load HadoopConfiguration"); this.hadoopConf = new SerializableConfiguration(conf); } @@ -91,7 +92,7 @@ public Table loadTable() { @Override @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) public TableLoader clone() { - return new HadoopTableLoader(location, new Configuration(hadoopConf.get())); + return new HadoopTableLoader(location, hadoopConf.getClone()); } @Override diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/util/HadoopUtil.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/util/HadoopUtil.java new file mode 100644 index 000000000000..22970718345e --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/util/HadoopUtil.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.util; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Strings; + +public class HadoopUtil { + + private HadoopUtil() {} + + private static final String CORE_SITE = "core-site.xml"; + private static final String HIVE_SITE = "hive-site.xml"; + private static final String HDFS_SITE = "hdfs-site.xml"; + + public static Configuration mergeHiveConf( + Object hadoopConf, String hiveConfDir, String hadoopConfDir) { + Preconditions.checkNotNull( + hadoopConf, + "Hadoop configuration is null, are the hadoop dependencies available on the classpath?"); + + Configuration newConf = new Configuration((Configuration) hadoopConf); + if (!Strings.isNullOrEmpty(hiveConfDir)) { + Preconditions.checkState( + Files.exists(Paths.get(hiveConfDir, HIVE_SITE)), + "There should be a hive-site.xml file under the directory %s", + hiveConfDir); + newConf.addResource(new Path(hiveConfDir, HIVE_SITE)); + } else { + // If don't provide the hive-site.xml path explicitly, it will try to load resource from + // classpath. If still + // couldn't load the configuration file, then it will throw exception in HiveCatalog. + URL configFile = CatalogLoader.class.getClassLoader().getResource(HIVE_SITE); + if (configFile != null) { + newConf.addResource(configFile); + } + } + + if (!Strings.isNullOrEmpty(hadoopConfDir)) { + Preconditions.checkState( + Files.exists(Paths.get(hadoopConfDir, HDFS_SITE)), + "Failed to load Hadoop configuration: missing %s", + Paths.get(hadoopConfDir, HDFS_SITE)); + newConf.addResource(new Path(hadoopConfDir, HDFS_SITE)); + Preconditions.checkState( + Files.exists(Paths.get(hadoopConfDir, CORE_SITE)), + "Failed to load Hadoop configuration: missing %s", + Paths.get(hadoopConfDir, CORE_SITE)); + newConf.addResource(new Path(hadoopConfDir, CORE_SITE)); + } + + return newConf; + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/AirliftBytesInputCompressor.java b/parquet/src/main/java/org/apache/iceberg/parquet/AirliftBytesInputCompressor.java new file mode 100644 index 000000000000..6f6a957472b1 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/AirliftBytesInputCompressor.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import io.airlift.compress.Compressor; +import io.airlift.compress.Decompressor; +import io.airlift.compress.lz4.Lz4Compressor; +import io.airlift.compress.lz4.Lz4Decompressor; +import io.airlift.compress.lzo.LzoCompressor; +import io.airlift.compress.lzo.LzoDecompressor; +import io.airlift.compress.snappy.SnappyCompressor; +import io.airlift.compress.snappy.SnappyDecompressor; +import io.airlift.compress.zstd.ZstdCompressor; +import io.airlift.compress.zstd.ZstdDecompressor; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +public class AirliftBytesInputCompressor + implements CompressionCodecFactory.BytesInputCompressor, + CompressionCodecFactory.BytesInputDecompressor { + + private final CompressionCodecName codecName; + private final Compressor compressor; + private final Decompressor decompressor; + private final ByteBufferAllocator allocator; + private final Deque allocatedBuffers; + + public AirliftBytesInputCompressor(CompressionCodecName codecName) { + this(codecName, new HeapByteBufferAllocator()); + } + + public AirliftBytesInputCompressor( + CompressionCodecName codecName, ByteBufferAllocator allocator) { + this.codecName = codecName; + + switch (codecName) { + case LZ4: + compressor = new Lz4Compressor(); + decompressor = new Lz4Decompressor(); + break; + case LZO: + compressor = new LzoCompressor(); + decompressor = new LzoDecompressor(); + break; + case SNAPPY: + compressor = new SnappyCompressor(); + decompressor = new SnappyDecompressor(); + break; + case ZSTD: + compressor = new ZstdCompressor(); + decompressor = new ZstdDecompressor(); + break; + default: + throw new UnsupportedOperationException( + "Add Hadoop to the classpath, compression not supported by Airlift: " + codecName); + } + + this.allocator = allocator; + this.allocatedBuffers = new ArrayDeque<>(); + } + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + ByteBuffer inBuf = bytes.toByteBuffer(); + + int maxOutLen = compressor.maxCompressedLength((int) bytes.size()); + ByteBuffer outBuf = allocator.allocate(maxOutLen); + + this.allocatedBuffers.push(outBuf); + compressor.compress(inBuf, outBuf); + + return BytesInput.from((ByteBuffer) outBuf.flip()); + } + + @Override + public CompressionCodecName getCodecName() { + return codecName; + } + + @Override + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + ByteBuffer inBuf = bytes.toByteBuffer(); + ByteBuffer outBuf = allocator.allocate(uncompressedSize); + this.allocatedBuffers.push(outBuf); + decompressor.decompress(inBuf, outBuf); + return BytesInput.from((ByteBuffer) outBuf.flip()); + } + + @Override + public void decompress( + ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + throws IOException { + decompressor.decompress(input, output); + } + + @Override + public void release() { + while (!allocatedBuffers.isEmpty()) { + allocator.release(allocatedBuffers.pop()); + } + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/AirliftCodecFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/AirliftCodecFactory.java new file mode 100644 index 000000000000..f49cc69b2315 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/AirliftCodecFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +public class AirliftCodecFactory implements CompressionCodecFactory { + + private final Map compressors = + Maps.newHashMap(); + + private static final Set CODECS = + EnumSet.of( + CompressionCodecName.LZ4, + CompressionCodecName.LZO, + CompressionCodecName.SNAPPY, + CompressionCodecName.ZSTD); + + @Override + public BytesInputCompressor getCompressor(CompressionCodecName compressionCodecName) { + if (CODECS.contains(compressionCodecName)) { + return compressors.computeIfAbsent( + compressionCodecName, c -> new AirliftBytesInputCompressor(compressionCodecName)); + } else { + throw new UnsupportedOperationException( + "Compression is not available: " + compressionCodecName.toString()); + } + } + + @Override + public BytesInputDecompressor getDecompressor(CompressionCodecName compressionCodecName) { + if (CODECS.contains(compressionCodecName)) { + return compressors.computeIfAbsent( + compressionCodecName, c -> new AirliftBytesInputCompressor(compressionCodecName)); + } else { + throw new UnsupportedOperationException( + "Compression is not available: " + compressionCodecName.toString()); + } + } + + @Override + public void release() { + compressors.values().forEach(AirliftBytesInputCompressor::release); + compressors.clear(); + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index c97512a17d87..65e246a4a318 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -79,6 +79,7 @@ import org.apache.iceberg.encryption.NativeEncryptionOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.hadoop.HadoopDependency; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.CloseableIterable; @@ -1168,7 +1169,12 @@ public CloseableIterable build() { if (readerFunc != null || batchedReaderFunc != null) { ParquetReadOptions.Builder optionsBuilder; - if (file instanceof HadoopInputFile) { + if (!HadoopDependency.isHadoopCommonOnClasspath(Parquet.class.getClassLoader())) { + // when Hadoop isn't available, make sure to use the Airlift codec factory + optionsBuilder = ParquetReadOptions.builder(); + // page size not used by decompressors + optionsBuilder.withCodecFactory(new AirliftCodecFactory()); + } else if (file instanceof HadoopInputFile) { // remove read properties already set that may conflict with this read Configuration conf = new Configuration(((HadoopInputFile) file).getConf()); for (String property : READ_PROPERTIES_TO_REMOVE) {