Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,7 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) thr
this.dataColumn = new VectorizedRleValuesReader();
this.isCurrentPageDictionaryEncoded = true;
} else {
if (dataEncoding != Encoding.PLAIN) {
throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
}
this.dataColumn = new VectorizedPlainValuesReader();
this.dataColumn = getValuesReader(dataEncoding);
this.isCurrentPageDictionaryEncoded = false;
}

Expand All @@ -594,6 +591,19 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) thr
}
}

private ValuesReader getValuesReader(Encoding encoding) {
switch (encoding) {
case PLAIN:
return new VectorizedPlainValuesReader();
case DELTA_BYTE_ARRAY:
return new VectorizedDeltaByteArrayReader();
case DELTA_BINARY_PACKED:
return new VectorizedDeltaBinaryPackedReader();
default:
throw new UnsupportedOperationException("Unsupported encoding: " + encoding);
}
}

private void readPageV1(DataPageV1 page) throws IOException {
this.pageValueCount = page.getValueCount();
ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

import java.io.IOException;

/**
* An implementation of the Parquet DELTA_BINARY_PACKED decoder
* that supports the vectorized interface.
*/
public class VectorizedDeltaBinaryPackedReader extends ValuesReader
implements VectorizedValuesReader {
private final DeltaBinaryPackingValuesReader valuesReader = new DeltaBinaryPackingValuesReader();
Copy link
Member

@kiszk kiszk Feb 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse DeltaBinaryPackingValuesReader multiple times while I am not familiar with Parquet?

When DeltaBinaryPackinginitFromPage.initFromPage is called, valuesBuffer is allocated every time at here. On the other hand, valuesBuffer is not initialized at DeltaBinaryPackinginitFromPage.initFromPage.

I am curious about what happens if initFromPage is called multiple times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment.
The current usage keeps the same pattern with parquet inside, DeltaBinaryPackingValuesReader is also reused in the encoding DeltaByteArrayReader and DeltaLengthByteArrayValuesReader.


@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
valuesReader.initFromPage(valueCount, in);
}

@Override
public void skip() {
throw new UnsupportedOperationException();
}

@Override
public byte readByte() {
throw new UnsupportedOperationException();
}

@Override
public Binary readBinary(int len) {
throw new UnsupportedOperationException();
}

@Override
public void readBooleans(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}

@Override
public void readBytes(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}

@Override
public void readIntegers(int total, WritableColumnVector c, int rowId) {
for (int i = 0; i < total; i++) {
c.putInt(rowId + i, valuesReader.readInteger());
}
}

@Override
public void readLongs(int total, WritableColumnVector c, int rowId) {
for (int i = 0; i < total; i++) {
c.putLong(rowId + i, valuesReader.readLong());
}
}

@Override
public void readFloats(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}

@Override
public void readDoubles(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}

@Override
public void readBinary(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized interface.
*/
public class VectorizedDeltaByteArrayReader extends ValuesReader implements VectorizedValuesReader {
private final DeltaByteArrayReader deltaByteArrayReader = new DeltaByteArrayReader();

@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
deltaByteArrayReader.initFromPage(valueCount, in);
}

@Override
public void skip() {
throw new UnsupportedOperationException();
}

@Override
public byte readByte() {
throw new UnsupportedOperationException();
}

@Override
public Binary readBinary(int len) {
throw new UnsupportedOperationException();
}

@Override
public void readBooleans(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}

@Override
public void readBytes(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}

@Override
public void readIntegers(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}

@Override
public void readLongs(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}

@Override
public void readFloats(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}

@Override
public void readDoubles(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}

@Override
public void readBinary(int total, WritableColumnVector c, int rowId) {
for (int i = 0; i < total; i++) {
Binary binary = deltaByteArrayReader.readBytes();
ByteBuffer buffer = binary.toByteBuffer();
if (buffer.hasArray()) {
c.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(),
binary.length());
} else {
byte[] bytes = new byte[binary.length()];
buffer.get(bytes);
c.putByteArray(rowId + i, bytes);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ package org.apache.spark.sql.execution.datasources.parquet

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.parquet.column.{Encoding, ParquetProperties}
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql.Row
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

// TODO: this needs a lot more testing but it's currently not easy to test with the parquet
Expand Down Expand Up @@ -114,4 +118,40 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
}
}
}

test("parquet v2 pages - delta encoding") {
val extraOptions = Map[String, String](
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString,
ParquetOutputFormat.ENABLE_DICTIONARY -> "false"
)

val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions)
withSQLConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/test.parquet"
val data = (1 to 3).map { i =>
(i, i.toLong, Array[Byte](i.toByte), s"test_${i}")
}

spark.createDataFrame(data).write.options(extraOptions).mode("overwrite").parquet(path)

val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head
val columnChunkMetadataList = blockMetadata.getColumns.asScala

// Verify that indeed delta encoding is used for each column
assert(columnChunkMetadataList.length === 4)
assert(columnChunkMetadataList(0).getEncodings.contains(Encoding.DELTA_BINARY_PACKED))
assert(columnChunkMetadataList(1).getEncodings.contains(Encoding.DELTA_BINARY_PACKED))
// Both fixed-length byte array and variable-length byte array (also called BINARY)
// are use DELTA_BYTE_ARRAY for encoding
assert(columnChunkMetadataList(2).getEncodings.contains(Encoding.DELTA_BYTE_ARRAY))
assert(columnChunkMetadataList(3).getEncodings.contains(Encoding.DELTA_BYTE_ARRAY))

val actual = spark.read.parquet(path).collect()
assert(actual.sortBy(_.getInt(0)) === data.map(Row.fromTuple));
}
}
}
}