From e7689a3f78cff9a7120515b2a5fb1ddb1e30082b Mon Sep 17 00:00:00 2001 From: Nandor Kollar Date: Wed, 6 Mar 2019 14:11:58 +0100 Subject: [PATCH 1/3] [SPARK-26509][SQL] Parquet DELTA_BYTE_ARRAY is not supported in Spark 2.x's Vectorized Reader --- .../parquet/VectorizedColumnReader.java | 18 +++- .../VectorizedDeltaBinaryPackedReader.java | 91 +++++++++++++++++ .../VectorizedDeltaByteArrayReader.java | 98 +++++++++++++++++++ .../parquet/ParquetEncodingSuite.scala | 36 +++++++ 4 files changed, 239 insertions(+), 4 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index ba26b57567e6..4aa3070d29df 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -580,10 +580,7 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) thr this.dataColumn = new VectorizedRleValuesReader(); this.isCurrentPageDictionaryEncoded = true; } else { - if (dataEncoding != Encoding.PLAIN) { - throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding); - } - this.dataColumn = new VectorizedPlainValuesReader(); + this.dataColumn = getValuesReader(dataEncoding); this.isCurrentPageDictionaryEncoded = false; } @@ -594,6 +591,19 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) thr } } + private ValuesReader getValuesReader(Encoding encoding) { + switch (encoding) { + case PLAIN: + return new VectorizedPlainValuesReader(); + case DELTA_BYTE_ARRAY: + return new VectorizedDeltaByteArrayReader(); + case DELTA_BINARY_PACKED: + return new VectorizedDeltaBinaryPackedReader(); + default: + throw new UnsupportedOperationException("Unsupported encoding: " + encoding); + } + } + private void readPageV1(DataPageV1 page) throws IOException { this.pageValueCount = page.getValueCount(); ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java new file mode 100644 index 000000000000..f6885214489e --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet; + +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; +import org.apache.parquet.io.api.Binary; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; + +import java.io.IOException; + +/** + * An implementation of the Parquet DELTA_BINARY_PACKED decoder that supports the vectorized interface. + */ +public class VectorizedDeltaBinaryPackedReader extends ValuesReader implements VectorizedValuesReader { + private final DeltaBinaryPackingValuesReader deltaBinaryPackingValuesReader = new DeltaBinaryPackingValuesReader(); + + @Override + public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { + deltaBinaryPackingValuesReader.initFromPage(valueCount, in); + } + + @Override + public void skip() { + throw new UnsupportedOperationException(); + } + + @Override + public byte readByte() { + throw new UnsupportedOperationException(); + } + + @Override + public Binary readBinary(int len) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBooleans(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBytes(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readIntegers(int total, WritableColumnVector c, int rowId) { + for (int i =0; i ParquetProperties.WriterVersion.PARQUET_2_0.toString, + ParquetOutputFormat.ENABLE_DICTIONARY -> "false" + ) + + val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", + ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/test.parquet" + val data = (1 to 3).map { i => + (i, i.toLong, s"test_${i}") + } + + spark.createDataFrame(data).write.options(extraOptions).mode("overwrite").parquet(path) + + val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head + val columnChunkMetadataList = blockMetadata.getColumns.asScala + + // Verify that indeed delta encoding is used for each column + assert(columnChunkMetadataList.length === 3) + assert(columnChunkMetadataList(0).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + assert(columnChunkMetadataList(1).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + assert(columnChunkMetadataList(2).getEncodings.contains(Encoding.DELTA_BYTE_ARRAY)) + + val actual = spark.read.parquet(path).collect + assert(actual.sortBy(_.getInt(0)) === data.map(Row.fromTuple)); + } + } + } } From 9e4c765d7c9982744490b50f022faa5ac6e04ae5 Mon Sep 17 00:00:00 2001 From: Nandor Kollar Date: Wed, 6 Mar 2019 17:36:26 +0100 Subject: [PATCH 2/3] Fix style errors --- .../VectorizedDeltaBinaryPackedReader.java | 18 ++++++++++-------- .../VectorizedDeltaByteArrayReader.java | 3 ++- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java index f6885214489e..b573dadd2fe0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java @@ -25,14 +25,16 @@ import java.io.IOException; /** - * An implementation of the Parquet DELTA_BINARY_PACKED decoder that supports the vectorized interface. + * An implementation of the Parquet DELTA_BINARY_PACKED decoder + * that supports the vectorized interface. */ -public class VectorizedDeltaBinaryPackedReader extends ValuesReader implements VectorizedValuesReader { - private final DeltaBinaryPackingValuesReader deltaBinaryPackingValuesReader = new DeltaBinaryPackingValuesReader(); +public class VectorizedDeltaBinaryPackedReader extends ValuesReader + implements VectorizedValuesReader { + private final DeltaBinaryPackingValuesReader valuesReader = new DeltaBinaryPackingValuesReader(); @Override public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { - deltaBinaryPackingValuesReader.initFromPage(valueCount, in); + valuesReader.initFromPage(valueCount, in); } @Override @@ -62,15 +64,15 @@ public void readBytes(int total, WritableColumnVector c, int rowId) { @Override public void readIntegers(int total, WritableColumnVector c, int rowId) { - for (int i =0; i Date: Wed, 22 Jan 2020 10:17:36 +0800 Subject: [PATCH 3/3] address comment --- .../datasources/parquet/ParquetEncodingSuite.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index a60cc90f2a78..0133da1fdc29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -126,12 +126,13 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess ) val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", + withSQLConf( + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL") { withTempPath { dir => val path = s"${dir.getCanonicalPath}/test.parquet" val data = (1 to 3).map { i => - (i, i.toLong, s"test_${i}") + (i, i.toLong, Array[Byte](i.toByte), s"test_${i}") } spark.createDataFrame(data).write.options(extraOptions).mode("overwrite").parquet(path) @@ -140,12 +141,15 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess val columnChunkMetadataList = blockMetadata.getColumns.asScala // Verify that indeed delta encoding is used for each column - assert(columnChunkMetadataList.length === 3) + assert(columnChunkMetadataList.length === 4) assert(columnChunkMetadataList(0).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) assert(columnChunkMetadataList(1).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + // Both fixed-length byte array and variable-length byte array (also called BINARY) + // are use DELTA_BYTE_ARRAY for encoding assert(columnChunkMetadataList(2).getEncodings.contains(Encoding.DELTA_BYTE_ARRAY)) + assert(columnChunkMetadataList(3).getEncodings.contains(Encoding.DELTA_BYTE_ARRAY)) - val actual = spark.read.parquet(path).collect + val actual = spark.read.parquet(path).collect() assert(actual.sortBy(_.getInt(0)) === data.map(Row.fromTuple)); } }