Skip to content

Commit 8bbc6cb

Browse files
committed
PARQUET-787: Limit read allocation size
WIP: This update the `ParquetFileReader` to use multiple buffers when reading a row group, instead of a single humongous allocation. As a consequence, many classes needed to be updated to accept a stream backed by multiple buffers, instead of using a single buffer directly. Assuming a single contiguous buffer would require too many copies. Author: Ryan Blue <blue@apache.org> Closes #390 from rdblue/PARQUET-787-limit-read-allocation-size and squashes the following commits: 4abba3e [Ryan Blue] PARQUET-787: Update byte buffer input streams for review comments. e7c6c5d [Ryan Blue] PARQUET-787: Fix problems from Zoltan's review. be52b59 [Ryan Blue] PARQUET-787: Update tests for both ByteBufferInputStreams. b0b6147 [Ryan Blue] PARQUET-787: Update encodings to use ByteBufferInputStream. a4fa05a [Ryan Blue] Refactor ByteBufferInputStream implementations. 56b22a6 [Ryan Blue] Make allocation size configurable. 103ed3d [Ryan Blue] Add tests for ByteBufferInputStream and fix bugs. 614a2bb [Ryan Blue] Limit allocation size to 8MB chunks for better garbage collection.
1 parent ad80bfe commit 8bbc6cb

File tree

43 files changed

+1852
-441
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1852
-441
lines changed

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,11 @@
2424
import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
2525
import static org.apache.parquet.column.ValuesType.VALUES;
2626

27-
import java.io.ByteArrayInputStream;
2827
import java.io.IOException;
29-
import java.nio.ByteBuffer;
3028

3129
import org.apache.parquet.CorruptDeltaByteArrays;
3230
import org.apache.parquet.VersionParser.ParsedVersion;
31+
import org.apache.parquet.bytes.ByteBufferInputStream;
3332
import org.apache.parquet.bytes.BytesInput;
3433
import org.apache.parquet.bytes.BytesUtils;
3534
import org.apache.parquet.column.ColumnDescriptor;
@@ -549,7 +548,7 @@ public Void visit(DataPageV2 dataPageV2) {
549548
});
550549
}
551550

552-
private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset, int valueCount) {
551+
private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
553552
ValuesReader previousReader = this.dataColumn;
554553

555554
this.currentEncoding = dataEncoding;
@@ -565,13 +564,15 @@ private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset,
565564
} else {
566565
this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
567566
}
567+
568568
if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
569569
bindToDictionary(dictionary);
570570
} else {
571571
bind(path.getType());
572572
}
573+
573574
try {
574-
dataColumn.initFromPage(pageValueCount, bytes, offset);
575+
dataColumn.initFromPage(pageValueCount, in);
575576
} catch (IOException e) {
576577
throw new ParquetDecodingException("could not read page in col " + path, e);
577578
}
@@ -589,16 +590,15 @@ private void readPageV1(DataPageV1 page) {
589590
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
590591
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
591592
try {
592-
ByteBuffer bytes = page.getBytes().toByteBuffer();
593-
LOG.debug("page size {} bytes and {} records", bytes.remaining(), pageValueCount);
593+
BytesInput bytes = page.getBytes();
594+
LOG.debug("page size {} bytes and {} records", bytes.size(), pageValueCount);
594595
LOG.debug("reading repetition levels at 0");
595-
rlReader.initFromPage(pageValueCount, bytes, 0);
596-
int next = rlReader.getNextOffset();
597-
LOG.debug("reading definition levels at {}", next);
598-
dlReader.initFromPage(pageValueCount, bytes, next);
599-
next = dlReader.getNextOffset();
600-
LOG.debug("reading data at {}", next);
601-
initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
596+
ByteBufferInputStream in = bytes.toInputStream();
597+
rlReader.initFromPage(pageValueCount, in);
598+
LOG.debug("reading definition levels at {}", in.position());
599+
dlReader.initFromPage(pageValueCount, in);
600+
LOG.debug("reading data at {}", in.position());
601+
initDataReader(page.getValueEncoding(), in, page.getValueCount());
602602
} catch (IOException e) {
603603
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
604604
}
@@ -607,9 +607,9 @@ private void readPageV1(DataPageV1 page) {
607607
private void readPageV2(DataPageV2 page) {
608608
this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
609609
this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
610+
LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
610611
try {
611-
LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
612-
initDataReader(page.getDataEncoding(), page.getData().toByteBuffer(), 0, page.getValueCount());
612+
initDataReader(page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount());
613613
} catch (IOException e) {
614614
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
615615
}

parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020

2121
import java.io.IOException;
2222

23-
import java.nio.ByteBuffer;
24-
import org.apache.parquet.io.ParquetDecodingException;
23+
import org.apache.parquet.bytes.ByteBufferInputStream;
2524
import org.apache.parquet.io.api.Binary;
2625

2726
/**
@@ -40,8 +39,9 @@ public abstract class ValuesReader {
4039
/**
4140
* Called to initialize the column reader from a part of a page.
4241
*
43-
* The underlying implementation knows how much data to read, so a length
44-
* is not provided.
42+
* Implementations must consume all bytes from the input stream, leaving the
43+
* stream ready to read the next section of data. The underlying
44+
* implementation knows how much data to read, so a length is not provided.
4545
*
4646
* Each page may contain several sections:
4747
* <ul>
@@ -50,36 +50,12 @@ public abstract class ValuesReader {
5050
* <li> data column
5151
* </ul>
5252
*
53-
* This function is called with 'offset' pointing to the beginning of one of these sections,
54-
* and should return the offset to the section following it.
55-
*
5653
* @param valueCount count of values in this page
57-
* @param page the array to read from containing the page data (repetition levels, definition levels, data)
58-
* @param offset where to start reading from in the page
54+
* @param in an input stream containing the page data at the correct offset
5955
*
6056
* @throws IOException
6157
*/
62-
public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException;
63-
64-
/**
65-
* Same functionality as method of the same name that takes a ByteBuffer instead of a byte[].
66-
*
67-
* This method is only provided for backward compatibility and will be removed in a future release.
68-
* Please update any code using it as soon as possible.
69-
* @see #initFromPage(int, ByteBuffer, int)
70-
*/
71-
@Deprecated
72-
public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
73-
this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
74-
}
75-
76-
/**
77-
* Called to return offset of the next section
78-
* @return offset of the next section
79-
*/
80-
public int getNextOffset() {
81-
throw new ParquetDecodingException("Unsupported: cannot get offset of the next section.");
82-
}
58+
public abstract void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException;
8359

8460
/**
8561
* usable when the encoding is dictionary based

parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static org.apache.parquet.column.values.bitpacking.BitPacking.createBitPackingReader;
2323

2424
import java.io.IOException;
25-
import java.nio.ByteBuffer;
2625

2726
import org.apache.parquet.bytes.ByteBufferInputStream;
2827
import org.apache.parquet.bytes.BytesUtils;
@@ -44,7 +43,6 @@ public class BitPackingValuesReader extends ValuesReader {
4443
private ByteBufferInputStream in;
4544
private BitPackingReader bitPackingReader;
4645
private final int bitsPerValue;
47-
private int nextOffset;
4846

4947
/**
5048
* @param bound the maximum value stored by this column
@@ -68,21 +66,16 @@ public int readInteger() {
6866

6967
/**
7068
* {@inheritDoc}
71-
* @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int)
69+
* @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream)
7270
*/
7371
@Override
74-
public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
72+
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
7573
int effectiveBitLength = valueCount * bitsPerValue;
7674
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
7775
LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitsPerValue);
78-
this.in = new ByteBufferInputStream(in, offset, length);
79-
this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
80-
this.nextOffset = offset + length;
81-
}
8276

83-
@Override
84-
public int getNextOffset() {
85-
return nextOffset;
77+
this.in = stream.sliceStream(length);
78+
this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
8679
}
8780

8881
@Override

parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@
1919
package org.apache.parquet.column.values.bitpacking;
2020

2121
import java.io.IOException;
22-
import java.util.Arrays;
2322
import java.nio.ByteBuffer;
2423

24+
import org.apache.parquet.bytes.ByteBufferInputStream;
2525
import org.apache.parquet.bytes.BytesUtils;
2626
import org.apache.parquet.column.values.ValuesReader;
27+
import org.apache.parquet.io.ParquetDecodingException;
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
2930

@@ -36,9 +37,7 @@ public class ByteBitPackingValuesReader extends ValuesReader {
3637
private final BytePacker packer;
3738
private final int[] decoded = new int[VALUES_AT_A_TIME];
3839
private int decodedPosition = VALUES_AT_A_TIME - 1;
39-
private ByteBuffer encoded;
40-
private int encodedPos;
41-
private int nextOffset;
40+
private ByteBufferInputStream in;
4241

4342
public ByteBitPackingValuesReader(int bound, Packer packer) {
4443
this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
@@ -49,37 +48,38 @@ public ByteBitPackingValuesReader(int bound, Packer packer) {
4948
public int readInteger() {
5049
++ decodedPosition;
5150
if (decodedPosition == decoded.length) {
52-
encoded.position(encodedPos);
53-
if (encodedPos + bitWidth > encoded.limit()) {
54-
// unpack8Values needs at least bitWidth bytes to read from,
55-
// We have to fill in 0 byte at the end of encoded bytes.
56-
byte[] tempEncode = new byte[bitWidth];
57-
encoded.get(tempEncode, 0, encoded.limit() - encodedPos);
58-
packer.unpack8Values(tempEncode, 0, decoded, 0);
59-
} else {
60-
packer.unpack8Values(encoded, encodedPos, decoded, 0);
51+
try {
52+
if (in.available() < bitWidth) {
53+
// unpack8Values needs at least bitWidth bytes to read from,
54+
// We have to fill in 0 byte at the end of encoded bytes.
55+
byte[] tempEncode = new byte[bitWidth];
56+
in.read(tempEncode, 0, in.available());
57+
packer.unpack8Values(tempEncode, 0, decoded, 0);
58+
} else {
59+
ByteBuffer encoded = in.slice(bitWidth);
60+
packer.unpack8Values(encoded, encoded.position(), decoded, 0);
61+
}
62+
} catch (IOException e) {
63+
throw new ParquetDecodingException("Failed to read packed values", e);
6164
}
62-
encodedPos += bitWidth;
6365
decodedPosition = 0;
6466
}
6567
return decoded[decodedPosition];
6668
}
6769

6870
@Override
69-
public void initFromPage(int valueCount, ByteBuffer page, int offset)
71+
public void initFromPage(int valueCount, ByteBufferInputStream stream)
7072
throws IOException {
7173
int effectiveBitLength = valueCount * bitWidth;
7274
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil
73-
LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitWidth);
74-
this.encoded = page;
75-
this.encodedPos = offset;
75+
LOG.debug("reading {} bytes for {} values of size {} bits.",
76+
length, valueCount, bitWidth);
77+
// work-around for null values. this will not happen for repetition or
78+
// definition levels (never null), but will happen when valueCount has not
79+
// been adjusted for null values in the data.
80+
length = Math.min(length, stream.available());
81+
this.in = stream.sliceStream(length);
7682
this.decodedPosition = VALUES_AT_A_TIME - 1;
77-
this.nextOffset = offset + length;
78-
}
79-
80-
@Override
81-
public int getNextOffset() {
82-
return nextOffset;
8383
}
8484

8585
@Override

parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.parquet.column.values.delta;
2020

21-
import java.io.ByteArrayInputStream;
2221
import java.io.IOException;
2322

2423
import org.apache.parquet.bytes.ByteBufferInputStream;
@@ -28,7 +27,6 @@
2827
import org.apache.parquet.column.values.bitpacking.Packer;
2928
import org.apache.parquet.io.ParquetDecodingException;
3029

31-
import java.io.IOException;
3230
import java.nio.ByteBuffer;
3331

3432
/**
@@ -43,7 +41,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
4341
*/
4442
private int valuesRead;
4543
private long minDeltaInCurrentBlock;
46-
private ByteBuffer page;
44+
4745
/**
4846
* stores the decoded values including the first value which is written to the header
4947
*/
@@ -54,23 +52,16 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
5452
*/
5553
private int valuesBuffered;
5654
private ByteBufferInputStream in;
57-
private int nextOffset;
5855
private DeltaBinaryPackingConfig config;
5956
private int[] bitWidths;
6057

6158
/**
62-
* eagerly load all the data into memory
63-
*
64-
* @param valueCount count of values in this page
65-
* @param page the array to read from containing the page data (repetition levels, definition levels, data)
66-
* @param offset where to start reading from in the page
67-
* @throws IOException
59+
* eagerly loads all the data into memory
6860
*/
6961
@Override
70-
public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException {
71-
in = new ByteBufferInputStream(page, offset, page.limit() - offset);
62+
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
63+
this.in = stream;
7264
this.config = DeltaBinaryPackingConfig.readConfig(in);
73-
this.page = page;
7465
this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
7566
allocateValuesBuffer();
7667
bitWidths = new int[config.miniBlockNumInABlock];
@@ -81,14 +72,8 @@ public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOE
8172
while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
8273
loadNewBlockToBuffer();
8374
}
84-
this.nextOffset = page.limit() - in.available();
8575
}
86-
87-
@Override
88-
public int getNextOffset() {
89-
return nextOffset;
90-
}
91-
76+
9277
/**
9378
* the value buffer is allocated so that the size of it is multiple of mini block
9479
* because when writing, data is flushed on a mini block basis
@@ -123,7 +108,7 @@ private void checkRead() {
123108
}
124109
}
125110

126-
private void loadNewBlockToBuffer() {
111+
private void loadNewBlockToBuffer() throws IOException {
127112
try {
128113
minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(in);
129114
} catch (IOException e) {
@@ -152,19 +137,18 @@ private void loadNewBlockToBuffer() {
152137
*
153138
* @param packer the packer created from bitwidth of current mini block
154139
*/
155-
private void unpackMiniBlock(BytePackerForLong packer) {
140+
private void unpackMiniBlock(BytePackerForLong packer) throws IOException {
156141
for (int j = 0; j < config.miniBlockSizeInValues; j += 8) {
157142
unpack8Values(packer);
158143
}
159144
}
160145

161-
private void unpack8Values(BytePackerForLong packer) {
162-
//calculate the pos because the packer api uses array not stream
163-
int pos = page.limit() - in.available();
164-
packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
146+
private void unpack8Values(BytePackerForLong packer) throws IOException {
147+
// get a single buffer of 8 values. most of the time, this won't require a copy
148+
// TODO: update the packer to consume from an InputStream
149+
ByteBuffer buffer = in.slice(packer.getBitWidth());
150+
packer.unpack8Values(buffer, buffer.position(), valuesBuffer, valuesBuffered);
165151
this.valuesBuffered += 8;
166-
//sync the pos in stream
167-
in.skip(packer.getBitWidth());
168152
}
169153

170154
private void readBitWidthsForMiniBlocks() {

0 commit comments

Comments
 (0)