diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index ba26b57567e6..4aa3070d29df 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -580,10 +580,7 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) thr this.dataColumn = new VectorizedRleValuesReader(); this.isCurrentPageDictionaryEncoded = true; } else { - if (dataEncoding != Encoding.PLAIN) { - throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding); - } - this.dataColumn = new VectorizedPlainValuesReader(); + this.dataColumn = getValuesReader(dataEncoding); this.isCurrentPageDictionaryEncoded = false; } @@ -594,6 +591,19 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) thr } } + private ValuesReader getValuesReader(Encoding encoding) { + switch (encoding) { + case PLAIN: + return new VectorizedPlainValuesReader(); + case DELTA_BYTE_ARRAY: + return new VectorizedDeltaByteArrayReader(); + case DELTA_BINARY_PACKED: + return new VectorizedDeltaBinaryPackedReader(); + default: + throw new UnsupportedOperationException("Unsupported encoding: " + encoding); + } + } + private void readPageV1(DataPageV1 page) throws IOException { this.pageValueCount = page.getValueCount(); ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java new file mode 100644 index 000000000000..b573dadd2fe0 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet; + +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; +import org.apache.parquet.io.api.Binary; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; + +import java.io.IOException; + +/** + * An implementation of the Parquet DELTA_BINARY_PACKED decoder + * that supports the vectorized interface. + */ +public class VectorizedDeltaBinaryPackedReader extends ValuesReader + implements VectorizedValuesReader { + private final DeltaBinaryPackingValuesReader valuesReader = new DeltaBinaryPackingValuesReader(); + + @Override + public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { + valuesReader.initFromPage(valueCount, in); + } + + @Override + public void skip() { + throw new UnsupportedOperationException(); + } + + @Override + public byte readByte() { + throw new UnsupportedOperationException(); + } + + @Override + public Binary readBinary(int len) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBooleans(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBytes(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readIntegers(int total, WritableColumnVector c, int rowId) { + for (int i = 0; i < total; i++) { + c.putInt(rowId + i, valuesReader.readInteger()); + } + } + + @Override + public void readLongs(int total, WritableColumnVector c, int rowId) { + for (int i = 0; i < total; i++) { + c.putLong(rowId + i, valuesReader.readLong()); + } + } + + @Override + public void readFloats(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readDoubles(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBinary(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java new file mode 100644 index 000000000000..f4bdd6897b7d --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet; + +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; +import org.apache.parquet.io.api.Binary; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized interface. + */ +public class VectorizedDeltaByteArrayReader extends ValuesReader implements VectorizedValuesReader { + private final DeltaByteArrayReader deltaByteArrayReader = new DeltaByteArrayReader(); + + @Override + public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { + deltaByteArrayReader.initFromPage(valueCount, in); + } + + @Override + public void skip() { + throw new UnsupportedOperationException(); + } + + @Override + public byte readByte() { + throw new UnsupportedOperationException(); + } + + @Override + public Binary readBinary(int len) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBooleans(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBytes(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readIntegers(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readLongs(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readFloats(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readDoubles(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBinary(int total, WritableColumnVector c, int rowId) { + for (int i = 0; i < total; i++) { + Binary binary = deltaByteArrayReader.readBytes(); + ByteBuffer buffer = binary.toByteBuffer(); + if (buffer.hasArray()) { + c.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(), + binary.length()); + } else { + byte[] bytes = new byte[binary.length()]; + buffer.get(bytes); + c.putByteArray(rowId + i, bytes); + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index 6d681afd23b1..0133da1fdc29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -18,8 +18,12 @@ package org.apache.spark.sql.execution.datasources.parquet import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.Path +import org.apache.parquet.column.{Encoding, ParquetProperties} import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.spark.sql.Row +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession // TODO: this needs a lot more testing but it's currently not easy to test with the parquet @@ -114,4 +118,40 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess } } } + + test("parquet v2 pages - delta encoding") { + val extraOptions = Map[String, String]( + ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString, + ParquetOutputFormat.ENABLE_DICTIONARY -> "false" + ) + + val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) + withSQLConf( + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", + ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/test.parquet" + val data = (1 to 3).map { i => + (i, i.toLong, Array[Byte](i.toByte), s"test_${i}") + } + + spark.createDataFrame(data).write.options(extraOptions).mode("overwrite").parquet(path) + + val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head + val columnChunkMetadataList = blockMetadata.getColumns.asScala + + // Verify that indeed delta encoding is used for each column + assert(columnChunkMetadataList.length === 4) + assert(columnChunkMetadataList(0).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + assert(columnChunkMetadataList(1).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + // Both fixed-length byte array and variable-length byte array (also called BINARY) + // are use DELTA_BYTE_ARRAY for encoding + assert(columnChunkMetadataList(2).getEncodings.contains(Encoding.DELTA_BYTE_ARRAY)) + assert(columnChunkMetadataList(3).getEncodings.contains(Encoding.DELTA_BYTE_ARRAY)) + + val actual = spark.read.parquet(path).collect() + assert(actual.sortBy(_.getInt(0)) === data.map(Row.fromTuple)); + } + } + } }