Skip to content

Commit 09445b5

Browse files
authored
PARQUET-2451: Add BYTE_STREAM_SPLIT support for FIXED_LEN_BYTE_ARRAY, INT32 and INT64 (#1291)
1 parent 1ae7da3 commit 09445b5

18 files changed

+1128
-204
lines changed

parquet-column/src/main/java/org/apache/parquet/column/Encoding.java

+9
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@
3131
import org.apache.parquet.column.values.ValuesReader;
3232
import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
3333
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble;
34+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFLBA;
3435
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFloat;
36+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForInteger;
37+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForLong;
3538
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
3639
import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
3740
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
@@ -129,6 +132,12 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu
129132
return new ByteStreamSplitValuesReaderForFloat();
130133
case DOUBLE:
131134
return new ByteStreamSplitValuesReaderForDouble();
135+
case INT32:
136+
return new ByteStreamSplitValuesReaderForInteger();
137+
case INT64:
138+
return new ByteStreamSplitValuesReaderForLong();
139+
case FIXED_LEN_BYTE_ARRAY:
140+
return new ByteStreamSplitValuesReaderForFLBA(descriptor.getTypeLength());
132141
default:
133142
throw new ParquetDecodingException("no byte stream split reader for type " + descriptor.getType());
134143
}

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

+79-21
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ public class ParquetProperties {
7070

7171
private static final int MIN_SLAB_SIZE = 64;
7272

73+
private enum ByteStreamSplitMode {
74+
NONE,
75+
FLOATING_POINT,
76+
EXTENDED
77+
}
78+
7379
public enum WriterVersion {
7480
PARQUET_1_0("v1"),
7581
PARQUET_2_0("v2");
@@ -114,7 +120,7 @@ public static WriterVersion fromString(String name) {
114120
private final ColumnProperty<Integer> numBloomFilterCandidates;
115121
private final int pageRowCountLimit;
116122
private final boolean pageWriteChecksumEnabled;
117-
private final boolean enableByteStreamSplit;
123+
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
118124
private final Map<String, String> extraMetaData;
119125

120126
private ParquetProperties(Builder builder) {
@@ -141,10 +147,18 @@ private ParquetProperties(Builder builder) {
141147
this.numBloomFilterCandidates = builder.numBloomFilterCandidates.build();
142148
this.pageRowCountLimit = builder.pageRowCountLimit;
143149
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
144-
this.enableByteStreamSplit = builder.enableByteStreamSplit;
150+
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
145151
this.extraMetaData = builder.extraMetaData;
146152
}
147153

154+
public static Builder builder() {
155+
return new Builder();
156+
}
157+
158+
public static Builder copy(ParquetProperties toCopy) {
159+
return new Builder(toCopy);
160+
}
161+
148162
public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
149163
return newColumnDescriptorValuesWriter(path.getMaxRepetitionLevel());
150164
}
@@ -208,8 +222,23 @@ public boolean isDictionaryEnabled(ColumnDescriptor column) {
208222
return dictionaryEnabled.getValue(column);
209223
}
210224

225+
@Deprecated()
211226
public boolean isByteStreamSplitEnabled() {
212-
return enableByteStreamSplit;
227+
return byteStreamSplitEnabled.getDefaultValue() != ByteStreamSplitMode.NONE;
228+
}
229+
230+
public boolean isByteStreamSplitEnabled(ColumnDescriptor column) {
231+
switch (column.getPrimitiveType().getPrimitiveTypeName()) {
232+
case FLOAT:
233+
case DOUBLE:
234+
return byteStreamSplitEnabled.getValue(column) != ByteStreamSplitMode.NONE;
235+
case INT32:
236+
case INT64:
237+
case FIXED_LEN_BYTE_ARRAY:
238+
return byteStreamSplitEnabled.getValue(column) == ByteStreamSplitMode.EXTENDED;
239+
default:
240+
return false;
241+
}
213242
}
214243

215244
public ByteBufferAllocator getAllocator() {
@@ -301,14 +330,6 @@ public Map<String, String> getExtraMetaData() {
301330
return extraMetaData;
302331
}
303332

304-
public static Builder builder() {
305-
return new Builder();
306-
}
307-
308-
public static Builder copy(ParquetProperties toCopy) {
309-
return new Builder(toCopy);
310-
}
311-
312333
@Override
313334
public String toString() {
314335
return "Parquet page size to " + getPageSizeThreshold() + '\n'
@@ -349,11 +370,16 @@ public static class Builder {
349370
private final ColumnProperty.Builder<Boolean> bloomFilterEnabled;
350371
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
351372
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
352-
private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
373+
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
353374
private Map<String, String> extraMetaData = new HashMap<>();
354375

355376
private Builder() {
356377
enableDict = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
378+
byteStreamSplitEnabled = ColumnProperty.<ByteStreamSplitMode>builder()
379+
.withDefaultValue(
380+
DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED
381+
? ByteStreamSplitMode.FLOATING_POINT
382+
: ByteStreamSplitMode.NONE);
357383
bloomFilterEnabled = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_ENABLED);
358384
bloomFilterNDVs = ColumnProperty.<Long>builder().withDefaultValue(null);
359385
bloomFilterFPPs = ColumnProperty.<Double>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_FPP);
@@ -365,7 +391,7 @@ private Builder() {
365391

366392
private Builder(ParquetProperties toCopy) {
367393
this.pageSize = toCopy.pageSizeThreshold;
368-
this.enableDict = ColumnProperty.<Boolean>builder(toCopy.dictionaryEnabled);
394+
this.enableDict = ColumnProperty.builder(toCopy.dictionaryEnabled);
369395
this.dictPageSize = toCopy.dictionaryPageSizeThreshold;
370396
this.writerVersion = toCopy.writerVersion;
371397
this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck;
@@ -375,13 +401,13 @@ private Builder(ParquetProperties toCopy) {
375401
this.allocator = toCopy.allocator;
376402
this.pageRowCountLimit = toCopy.pageRowCountLimit;
377403
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
378-
this.bloomFilterNDVs = ColumnProperty.<Long>builder(toCopy.bloomFilterNDVs);
379-
this.bloomFilterFPPs = ColumnProperty.<Double>builder(toCopy.bloomFilterFPPs);
380-
this.bloomFilterEnabled = ColumnProperty.<Boolean>builder(toCopy.bloomFilterEnabled);
381-
this.adaptiveBloomFilterEnabled = ColumnProperty.<Boolean>builder(toCopy.adaptiveBloomFilterEnabled);
382-
this.numBloomFilterCandidates = ColumnProperty.<Integer>builder(toCopy.numBloomFilterCandidates);
404+
this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs);
405+
this.bloomFilterFPPs = ColumnProperty.builder(toCopy.bloomFilterFPPs);
406+
this.bloomFilterEnabled = ColumnProperty.builder(toCopy.bloomFilterEnabled);
407+
this.adaptiveBloomFilterEnabled = ColumnProperty.builder(toCopy.adaptiveBloomFilterEnabled);
408+
this.numBloomFilterCandidates = ColumnProperty.builder(toCopy.numBloomFilterCandidates);
383409
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
384-
this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
410+
this.byteStreamSplitEnabled = ColumnProperty.builder(toCopy.byteStreamSplitEnabled);
385411
this.extraMetaData = toCopy.extraMetaData;
386412
}
387413

@@ -420,8 +446,40 @@ public Builder withDictionaryEncoding(String columnPath, boolean enableDictionar
420446
return this;
421447
}
422448

423-
public Builder withByteStreamSplitEncoding(boolean enableByteStreamSplit) {
424-
this.enableByteStreamSplit = enableByteStreamSplit;
449+
/**
450+
* Enable or disable BYTE_STREAM_SPLIT encoding for FLOAT and DOUBLE columns.
451+
*
452+
* @param enable whether BYTE_STREAM_SPLIT encoding should be enabled
453+
* @return this builder for method chaining.
454+
*/
455+
public Builder withByteStreamSplitEncoding(boolean enable) {
456+
this.byteStreamSplitEnabled.withDefaultValue(
457+
enable ? ByteStreamSplitMode.FLOATING_POINT : ByteStreamSplitMode.NONE);
458+
return this;
459+
}
460+
461+
/**
462+
* Enable or disable BYTE_STREAM_SPLIT encoding for specified columns.
463+
*
464+
* @param columnPath the path of the column (dot-string)
465+
* @param enable whether BYTE_STREAM_SPLIT encoding should be enabled
466+
* @return this builder for method chaining.
467+
*/
468+
public Builder withByteStreamSplitEncoding(String columnPath, boolean enable) {
469+
this.byteStreamSplitEnabled.withValue(
470+
columnPath, enable ? ByteStreamSplitMode.EXTENDED : ByteStreamSplitMode.NONE);
471+
return this;
472+
}
473+
474+
/**
475+
* Enable or disable BYTE_STREAM_SPLIT encoding for FLOAT, DOUBLE, INT32, INT64 and FIXED_LEN_BYTE_ARRAY columns.
476+
*
477+
* @param enable whether BYTE_STREAM_SPLIT encoding should be enabled
478+
* @return this builder for method chaining.
479+
*/
480+
public Builder withExtendedByteStreamSplitEncoding(boolean enable) {
481+
this.byteStreamSplitEnabled.withDefaultValue(
482+
enable ? ByteStreamSplitMode.EXTENDED : ByteStreamSplitMode.NONE);
425483
return this;
426484
}
427485

parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java

+26-21
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@
1919
package org.apache.parquet.column.values.bytestreamsplit;
2020

2121
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.nio.ByteOrder;
2224
import org.apache.parquet.bytes.ByteBufferInputStream;
2325
import org.apache.parquet.column.values.ValuesReader;
2426
import org.apache.parquet.io.ParquetDecodingException;
2527
import org.slf4j.Logger;
2628
import org.slf4j.LoggerFactory;
2729

2830
public abstract class ByteStreamSplitValuesReader extends ValuesReader {
29-
3031
private static final Logger LOG = LoggerFactory.getLogger(ByteStreamSplitValuesReader.class);
31-
private final int elementSizeInBytes;
32-
private byte[] byteStreamData;
32+
protected final int elementSizeInBytes;
33+
protected ByteBuffer decodedDataBuffer;
3334
private int indexInStream;
3435
private int valuesCount;
3536

@@ -39,17 +40,27 @@ protected ByteStreamSplitValuesReader(int elementSizeInBytes) {
3940
this.valuesCount = 0;
4041
}
4142

42-
protected void gatherElementDataFromStreams(byte[] gatheredData) throws ParquetDecodingException {
43-
if (gatheredData.length != elementSizeInBytes) {
44-
throw new ParquetDecodingException("gatherData buffer is not of the expected size.");
45-
}
43+
protected int nextElementByteOffset() {
4644
if (indexInStream >= valuesCount) {
4745
throw new ParquetDecodingException("Byte-stream data was already exhausted.");
4846
}
49-
for (int i = 0; i < elementSizeInBytes; ++i) {
50-
gatheredData[i] = byteStreamData[i * valuesCount + indexInStream];
51-
}
47+
int offset = indexInStream * elementSizeInBytes;
5248
++indexInStream;
49+
return offset;
50+
}
51+
52+
// Decode an entire data page
53+
private byte[] decodeData(ByteBuffer encoded, int valuesCount) {
54+
assert encoded.limit() == valuesCount * elementSizeInBytes;
55+
byte[] decoded = new byte[encoded.limit()];
56+
int destByteIndex = 0;
57+
for (int srcValueIndex = 0; srcValueIndex < valuesCount; ++srcValueIndex) {
58+
for (int stream = 0; stream < elementSizeInBytes; ++stream, ++destByteIndex) {
59+
decoded[destByteIndex] = encoded.get(srcValueIndex + stream * valuesCount);
60+
}
61+
}
62+
assert destByteIndex == decoded.length;
63+
return decoded;
5364
}
5465

5566
@Override
@@ -76,18 +87,12 @@ public void initFromPage(int valuesCount, ByteBufferInputStream stream)
7687
throw new ParquetDecodingException(errorMessage);
7788
}
7889

79-
// Allocate buffer for all of the byte stream data.
90+
// Eagerly read and decode the data. This allows returning stable
91+
// Binary views into the internal decode buffer for FIXED_LEN_BYTE_ARRAY.
8092
final int totalSizeInBytes = stream.available();
81-
byteStreamData = new byte[totalSizeInBytes];
82-
83-
// Eagerly read the data for each stream.
84-
final int numRead = stream.read(byteStreamData, 0, totalSizeInBytes);
85-
if (numRead != totalSizeInBytes) {
86-
String errorMessage = String.format(
87-
"Failed to read requested number of bytes. Expected: %d. Read %d.", totalSizeInBytes, numRead);
88-
throw new ParquetDecodingException(errorMessage);
89-
}
90-
93+
final ByteBuffer encodedData = stream.slice(totalSizeInBytes).slice(); // possibly zero-copy
94+
final byte[] decodedData = decodeData(encodedData, this.valuesCount);
95+
decodedDataBuffer = ByteBuffer.wrap(decodedData).order(ByteOrder.LITTLE_ENDIAN);
9196
indexInStream = 0;
9297
}
9398

parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java

+1-8
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,13 @@
1818
*/
1919
package org.apache.parquet.column.values.bytestreamsplit;
2020

21-
import org.apache.parquet.bytes.BytesUtils;
22-
2321
public class ByteStreamSplitValuesReaderForDouble extends ByteStreamSplitValuesReader {
24-
25-
private final byte[] valueByteBuffer;
26-
2722
public ByteStreamSplitValuesReaderForDouble() {
2823
super(Double.BYTES);
29-
valueByteBuffer = new byte[Double.BYTES];
3024
}
3125

3226
@Override
3327
public double readDouble() {
34-
gatherElementDataFromStreams(valueByteBuffer);
35-
return Double.longBitsToDouble(BytesUtils.bytesToLong(valueByteBuffer));
28+
return decodedDataBuffer.getDouble(nextElementByteOffset());
3629
}
3730
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.column.values.bytestreamsplit;
20+
21+
import org.apache.parquet.io.api.Binary;
22+
23+
public class ByteStreamSplitValuesReaderForFLBA extends ByteStreamSplitValuesReader {
24+
// Trivial, but overriden for clarity
25+
public ByteStreamSplitValuesReaderForFLBA(int length) {
26+
super(length);
27+
}
28+
29+
@Override
30+
public Binary readBytes() {
31+
return Binary.fromConstantByteBuffer(decodedDataBuffer, nextElementByteOffset(), elementSizeInBytes);
32+
}
33+
}

parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java

+1-8
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,13 @@
1818
*/
1919
package org.apache.parquet.column.values.bytestreamsplit;
2020

21-
import org.apache.parquet.bytes.BytesUtils;
22-
2321
public class ByteStreamSplitValuesReaderForFloat extends ByteStreamSplitValuesReader {
24-
25-
private final byte[] valueByteBuffer;
26-
2722
public ByteStreamSplitValuesReaderForFloat() {
2823
super(Float.BYTES);
29-
valueByteBuffer = new byte[Float.BYTES];
3024
}
3125

3226
@Override
3327
public float readFloat() {
34-
gatherElementDataFromStreams(valueByteBuffer);
35-
return Float.intBitsToFloat(BytesUtils.bytesToInt(valueByteBuffer));
28+
return decodedDataBuffer.getFloat(nextElementByteOffset());
3629
}
3730
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.column.values.bytestreamsplit;
20+
21+
public class ByteStreamSplitValuesReaderForInteger extends ByteStreamSplitValuesReader {
22+
public ByteStreamSplitValuesReaderForInteger() {
23+
super(4);
24+
}
25+
26+
@Override
27+
public int readInteger() {
28+
return decodedDataBuffer.getInt(nextElementByteOffset());
29+
}
30+
}

0 commit comments

Comments
 (0)