From 9e649b44fcac28545468a02dc6e59c3161a85068 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Wed, 31 Aug 2022 10:03:02 -0700 Subject: [PATCH 1/2] Parquet: close zstd input stream early to avoid memory pressure --- .../org/apache/iceberg/parquet/Parquet.java | 2 + .../iceberg/parquet/ParquetCodecFactory.java | 101 ++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 856c8089b8a8..27c75c42abf4 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1011,8 +1011,10 @@ public CloseableIterable build() { conf.unset(property); } optionsBuilder = HadoopReadOptions.builder(conf); + optionsBuilder.withCodecFactory(new ParquetCodecFactory(conf, 0)); } else { optionsBuilder = ParquetReadOptions.builder(); + optionsBuilder.withCodecFactory(new ParquetCodecFactory(new Configuration(), 0)); } for (Map.Entry entry : properties.entrySet()) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java new file mode 100644 index 000000000000..b61a31f04cd8 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.ZstandardCodec; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +/** + * This class implements a codec factory that is used when reading from Parquet. It adds a + * workaround for memory issues encountered when reading from zstd-compressed files. + */ +public class ParquetCodecFactory extends CodecFactory { + + public ParquetCodecFactory(Configuration configuration, int pageSize) { + super(configuration, pageSize); + } + + /** Copied and modified from CodecFactory.HeapBytesDecompressor */ + class HeapBytesDecompressor extends BytesDecompressor { + + private final CompressionCodec codec; + private final Decompressor decompressor; + + HeapBytesDecompressor(CompressionCodecName codecName) { + this.codec = getCodec(codecName); + if (codec != null) { + decompressor = CodecPool.getDecompressor(codec); + } else { + decompressor = null; + } + } + + @Override + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + final BytesInput decompressed; + if (codec != null) { + if (decompressor != null) { + decompressor.reset(); + } + if (codec instanceof ZstandardCodec) { + // we need to close the zstd input stream ASAP to free up native resources, so + // read everything into a buffer and then close it + try (InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor)) { + decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); + } + } else { + InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); + decompressed = BytesInput.from(is, uncompressedSize); + } + } else { + decompressed = bytes; + } + return decompressed; + } + + @Override + public void decompress( + ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + throws IOException { + ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); + output.put(decompressed); + } + + @Override + public void release() { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + } + } + } + + @Override + protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { + return new HeapBytesDecompressor(codecName); + } +} From 81bf1b8b3f9aee80d7cad7c576a708a78f59197c Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Wed, 31 Aug 2022 11:44:27 -0700 Subject: [PATCH 2/2] added comments --- parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 27c75c42abf4..8b1e6c056403 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1011,9 +1011,11 @@ public CloseableIterable build() { conf.unset(property); } optionsBuilder = HadoopReadOptions.builder(conf); + // page size not used by decompressors optionsBuilder.withCodecFactory(new ParquetCodecFactory(conf, 0)); } else { optionsBuilder = ParquetReadOptions.builder(); + // page size not used by decompressors optionsBuilder.withCodecFactory(new ParquetCodecFactory(new Configuration(), 0)); }