From ca2b1535ee1999dbf85ef4a2dcfbfee3a6322ad1 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Fri, 19 May 2023 16:21:06 -0700 Subject: [PATCH 1/4] Parquet: Revert workaround for resource usage with zstd --- .../org/apache/iceberg/parquet/Parquet.java | 4 - .../iceberg/parquet/ParquetCodecFactory.java | 101 ------------------ 2 files changed, 105 deletions(-) delete mode 100644 parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 7caeb7c7cd71..30d9866f9bbb 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1029,12 +1029,8 @@ public CloseableIterable build() { conf.unset(property); } optionsBuilder = HadoopReadOptions.builder(conf); - // page size not used by decompressors - optionsBuilder.withCodecFactory(new ParquetCodecFactory(conf, 0)); } else { optionsBuilder = ParquetReadOptions.builder(); - // page size not used by decompressors - optionsBuilder.withCodecFactory(new ParquetCodecFactory(new Configuration(), 0)); } for (Map.Entry entry : properties.entrySet()) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java deleted file mode 100644 index b61a31f04cd8..000000000000 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.parquet; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.hadoop.CodecFactory; -import org.apache.parquet.hadoop.codec.ZstandardCodec; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; - -/** - * This class implements a codec factory that is used when reading from Parquet. It adds a - * workaround for memory issues encountered when reading from zstd-compressed files. - */ -public class ParquetCodecFactory extends CodecFactory { - - public ParquetCodecFactory(Configuration configuration, int pageSize) { - super(configuration, pageSize); - } - - /** Copied and modified from CodecFactory.HeapBytesDecompressor */ - class HeapBytesDecompressor extends BytesDecompressor { - - private final CompressionCodec codec; - private final Decompressor decompressor; - - HeapBytesDecompressor(CompressionCodecName codecName) { - this.codec = getCodec(codecName); - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - } else { - decompressor = null; - } - } - - @Override - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { - final BytesInput decompressed; - if (codec != null) { - if (decompressor != null) { - decompressor.reset(); - } - if (codec instanceof ZstandardCodec) { - // we need to close the zstd input stream ASAP to free up native resources, so - // read everything into a buffer and then close it - try (InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor)) { - decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); - } - } else { - InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); - decompressed = BytesInput.from(is, uncompressedSize); - } - } else { - decompressed = bytes; - } - return decompressed; - } - - @Override - public void decompress( - ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) - throws IOException { - ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); - output.put(decompressed); - } - - @Override - public void release() { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - } - } - } - - @Override - protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { - return new HeapBytesDecompressor(codecName); - } -} From b8d3ce5bf8d1c393c881cde0a79023bc299a1dc1 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Fri, 19 May 2023 16:44:54 -0700 Subject: [PATCH 2/4] add justification for class removal --- .palantir/revapi.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index bc1667207467..e872794cff85 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -733,6 +733,10 @@ acceptedBreaks: old: "method void org.apache.iceberg.rest.auth.OAuth2Util.AuthSession::(java.util.Map, java.lang.String, java.lang.String)" justification: "Removing deprecations for 1.3.0" + org.apache.iceberg:iceberg-parquet: + - code: "java.class.removed" + old: "class org.apache.iceberg.parquet.ParquetCodecFactory" + justification: "This class was part of a workaround that is being reverted" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" From eaf3f5f459b1633c13c775838ce7fe730a9ab68d Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Fri, 19 May 2023 16:50:11 -0700 Subject: [PATCH 3/4] Revert "add justification for class removal" This reverts commit b8d3ce5bf8d1c393c881cde0a79023bc299a1dc1. --- .palantir/revapi.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index e872794cff85..bc1667207467 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -733,10 +733,6 @@ acceptedBreaks: old: "method void org.apache.iceberg.rest.auth.OAuth2Util.AuthSession::(java.util.Map, java.lang.String, java.lang.String)" justification: "Removing deprecations for 1.3.0" - org.apache.iceberg:iceberg-parquet: - - code: "java.class.removed" - old: "class org.apache.iceberg.parquet.ParquetCodecFactory" - justification: "This class was part of a workaround that is being reverted" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" From ea248466a4aad130b552736d7e94f71302dd5f86 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Fri, 19 May 2023 16:57:55 -0700 Subject: [PATCH 4/4] add deprecation --- .../iceberg/parquet/ParquetCodecFactory.java | 105 ++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java new file mode 100644 index 000000000000..5df566d464a0 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.ZstandardCodec; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +/** + * This class implements a codec factory that is used when reading from Parquet. It adds a + * workaround for memory issues encountered when reading from zstd-compressed files. This is no + * longer used, as Parquet 1.13 includes this fix. + * + * @deprecated will be removed in 1.4.0 + */ +@Deprecated +public class ParquetCodecFactory extends CodecFactory { + + public ParquetCodecFactory(Configuration configuration, int pageSize) { + super(configuration, pageSize); + } + + /** Copied and modified from CodecFactory.HeapBytesDecompressor */ + class HeapBytesDecompressor extends BytesDecompressor { + + private final CompressionCodec codec; + private final Decompressor decompressor; + + HeapBytesDecompressor(CompressionCodecName codecName) { + this.codec = getCodec(codecName); + if (codec != null) { + decompressor = CodecPool.getDecompressor(codec); + } else { + decompressor = null; + } + } + + @Override + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + final BytesInput decompressed; + if (codec != null) { + if (decompressor != null) { + decompressor.reset(); + } + if (codec instanceof ZstandardCodec) { + // we need to close the zstd input stream ASAP to free up native resources, so + // read everything into a buffer and then close it + try (InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor)) { + decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); + } + } else { + InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); + decompressed = BytesInput.from(is, uncompressedSize); + } + } else { + decompressed = bytes; + } + return decompressed; + } + + @Override + public void decompress( + ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + throws IOException { + ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); + output.put(decompressed); + } + + @Override + public void release() { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + } + } + } + + @Override + protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { + return new HeapBytesDecompressor(codecName); + } +}