From d0cf386e7b51b1a2488df7c07edab509e288cd8e Mon Sep 17 00:00:00 2001 From: Xinli shang Date: Thu, 11 Jun 2020 13:23:45 -0700 Subject: [PATCH 1/3] Parquet-1872: Add TransCompression command to parquet-tools --- .../cli/commands/TransCompressionCommand.java | 302 +++++++++++++++++ .../commands/TransCompressionCommandTest.java | 271 ++++++++++++++++ .../parquet/hadoop/ParquetFileReader.java | 2 +- .../command/TransCompressionCommand.java | 306 ++++++++++++++++++ .../command/TestTransCompressionCommand.java | 273 ++++++++++++++++ 5 files changed, 1153 insertions(+), 1 deletion(-) create mode 100644 parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java create mode 100644 parquet-cli/src/test/java/org/apache/parquet/cli/commands/TransCompressionCommandTest.java create mode 100644 parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java create mode 100644 parquet-tools/src/test/java/org/apache/parquet/tools/command/TestTransCompressionCommand.java diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java new file mode 100644 index 0000000000..b7a836e5a1 --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.cli.commands; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.cli.BaseCommand; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.Util; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; + +@Parameters(commandDescription="Translate the compression from one to another") +public class TransCompressionCommand extends BaseCommand { + + public TransCompressionCommand(Logger console) { + super(console); + } + + @Parameter(description = "") + String input; + + @Parameter(description = "") + String output; + + @Parameter(description = " getExamples() { + return Lists.newArrayList( + "# Translate the compression from one to another", + " input.parquet output.parquet ZSTD" + ); + } + private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema, + String createdBy, CompressionCodecName codecName) throws IOException { + int blockIndex = 0; + PageReadStore store = reader.readNextRowGroup(); + while (store != null) { + writer.startBlock(store.getRowCount()); + BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex); + List columnsInOrder = blockMetaData.getColumns(); + Map descriptorsMap = schema.getColumns().stream().collect( + Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x)); + for (int i = 0; i < columnsInOrder.size(); i += 1) { + ColumnChunkMetaData chunk = columnsInOrder.get(i); + ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DumpGroupConverter(), schema, createdBy); + ColumnDescriptor columnDescriptor = descriptorsMap.get(chunk.getPath()); + writer.startColumn(columnDescriptor, crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName); + processChunk(reader, writer, chunk, createdBy, codecName); + writer.endColumn(); + } + writer.endBlock(); + store = reader.readNextRowGroup(); + blockIndex++; + } + } + + private void processChunk(TransParquetFileReader reader, ParquetFileWriter writer, ColumnChunkMetaData chunk, + String createdBy, CompressionCodecName codecName) throws IOException { + CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); + BytesInputDecompressor decompressor = codecFactory.getDecompressor(chunk.getCodec()); + BytesInputCompressor compressor = codecFactory.getCompressor(codecName); + ColumnIndex columnIndex = reader.readColumnIndex(chunk); + OffsetIndex offsetIndex = reader.readOffsetIndex(chunk); + + reader.setStreamPosition(chunk.getStartingPos()); + DictionaryPage dictionaryPage = null; + long readValues = 0; + Statistics statistics = null; + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + int pageIndex = 0; + long totalChunkValues = chunk.getValueCount(); + while (readValues < totalChunkValues) { + PageHeader pageHeader = reader.readPageHeader(); + byte[] pageLoad; + switch (pageHeader.type) { + case DICTIONARY_PAGE: + if (dictionaryPage != null) { + throw new IOException("has more than one dictionary page in column chunk"); + } + DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header; + pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size()); + writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad), + pageHeader.getUncompressed_page_size(), + converter.getEncoding(dictPageHeader.getEncoding()))); + break; + case DATA_PAGE: + DataPageHeader headerV1 = pageHeader.data_page_header; + pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size()); + statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageIndex, converter); + readValues += headerV1.getNum_values(); + if (offsetIndex != null) { + long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex, totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex); + writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), + pageHeader.uncompressed_page_size, + BytesInput.from(pageLoad), + statistics, + toIntWithCheck(rowCount), + converter.getEncoding(headerV1.getRepetition_level_encoding()), + converter.getEncoding(headerV1.getDefinition_level_encoding()), + converter.getEncoding(headerV1.getEncoding())); + } else { + writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), + pageHeader.uncompressed_page_size, + BytesInput.from(pageLoad), + statistics, + converter.getEncoding(headerV1.getRepetition_level_encoding()), + converter.getEncoding(headerV1.getDefinition_level_encoding()), + converter.getEncoding(headerV1.getEncoding())); + } + pageIndex++; + break; + case DATA_PAGE_V2: + DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2; + int rlLength = headerV2.getRepetition_levels_byte_length(); + BytesInput rlLevels = readBlock(rlLength, reader); + int dlLength = headerV2.getDefinition_levels_byte_length(); + BytesInput dlLevels = readBlock(dlLength, reader); + int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength; + int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength; + pageLoad = translatePageLoad(reader, headerV2.is_compressed, compressor, decompressor, payLoadLength, rawDataLength); + statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageIndex, converter); + readValues += headerV2.getNum_values(); + writer.writeDataPageV2(headerV2.getNum_rows(), + headerV2.getNum_nulls(), + headerV2.getNum_values(), + rlLevels, + dlLevels, + converter.getEncoding(headerV2.getEncoding()), + BytesInput.from(pageLoad), + pageHeader.uncompressed_page_size - rlLength - dlLength, + statistics); + pageIndex++; + break; + default: + break; + } + } + } + + private Statistics convertStatistics(String createdBy, PrimitiveType type, org.apache.parquet.format.Statistics pageStatistics, + ColumnIndex columnIndex, int pageIndex, ParquetMetadataConverter converter) throws IOException { + if (pageStatistics != null) { + return converter.fromParquetStatistics(createdBy, pageStatistics, type); + } else if (columnIndex != null) { + if (columnIndex.getNullPages() == null) { + throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " + type.getName()); + } + if (pageIndex > columnIndex.getNullPages().size()) { + throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " + columnIndex.getNullPages().size()); + } + org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type); + statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex)); + + if (!columnIndex.getNullPages().get(pageIndex)) { + statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone()); + statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone()); + } + return statsBuilder.build(); + } else { + return null; + } + } + + private byte[] translatePageLoad(TransParquetFileReader reader, boolean isCompressed, BytesInputCompressor compressor, + BytesInputDecompressor decompressor, int payloadLength, int rawDataLength) throws IOException { + BytesInput data = readBlock(payloadLength, reader); + if (isCompressed) { + data = decompressor.decompress(data, rawDataLength); + } + BytesInput newCompressedData = compressor.compress(data); + return newCompressedData.toByteArray(); + } + + private BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException { + byte[] data = new byte[length]; + reader.blockRead(data); + return BytesInput.from(data); + } + + private int toIntWithCheck(long size) { + if ((int)size != size) { + throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size); + } + return (int)size; + } + + private static final class DumpGroupConverter extends GroupConverter { + @Override public void start() {} + @Override public void end() {} + @Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); } + } + + private static final class DumpConverter extends PrimitiveConverter { + @Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); } + } + + private static final class TransParquetFileReader extends ParquetFileReader { + + public TransParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { + super(file, options); + } + + public void setStreamPosition(long newPos) throws IOException { + f.seek(newPos); + } + + public void blockRead(byte[] data) throws IOException { + f.readFully(data); + } + + public PageHeader readPageHeader() throws IOException { + return Util.readPageHeader(f); + } + } +} diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/TransCompressionCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/TransCompressionCommandTest.java new file mode 100644 index 0000000000..d6803f7b0a --- /dev/null +++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/TransCompressionCommandTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cli.commands; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.apache.parquet.schema.Type.Repetition.REPEATED; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TransCompressionCommandTest extends ParquetFileTest { + + private Configuration conf = new Configuration(); + private Map extraMeta + = ImmutableMap.of("key1", "value1", "key2", "value2"); + + @Test + public void testTransCompression() throws Exception { + String[] codecs = {"UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD"}; + for (int i = 0; i < codecs.length; i++) { + for (int j = 0; j reader = ParquetReader.builder(new GroupReadSupport(), new Path(inputFile)).withConf(conf).build(); + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + assertTrue(group.getLong("DocId", 0) < 1000); + assertEquals(group.getBinary("Name", 0).length(), 100); + assertEquals(group.getBinary("Gender", 0).length(), 100); + Group subGroup = group.getGroup("Links", 0); + assertEquals(subGroup.getBinary("Backward", 0).length(), 100); + assertEquals(subGroup.getBinary("Forward", 0).length(), 100); + } + reader.close(); + } + + private void validMeta(String inputFile, String outFile) throws Exception { + ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new Path(inputFile), NO_FILTER); + ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new Path(outFile), NO_FILTER); + Assert.assertEquals(inMetaData.getFileMetaData().getSchema(), outMetaData.getFileMetaData().getSchema()); + Assert.assertEquals(inMetaData.getFileMetaData().getKeyValueMetaData(), outMetaData.getFileMetaData().getKeyValueMetaData()); + } + + private void validColumnIndex(String inputFile, String outFile) throws Exception { + ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new Path(inputFile), NO_FILTER); + ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new Path(outFile), NO_FILTER); + Assert.assertEquals(inMetaData.getBlocks().size(), outMetaData.getBlocks().size()); + try (ParquetFileReader inReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(inputFile), conf), HadoopReadOptions.builder(conf).build()); + ParquetFileReader outReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(outFile), conf), HadoopReadOptions.builder(conf).build())) { + for (int i = 0; i < inMetaData.getBlocks().size(); i++) { + BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(i); + BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(i); + Assert.assertEquals(inBlockMetaData.getColumns().size(), outBlockMetaData.getColumns().size()); + for (int j = 0; j < inBlockMetaData.getColumns().size(); j++) { + ColumnChunkMetaData inChunk = inBlockMetaData.getColumns().get(j); + ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk); + OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk); + ColumnChunkMetaData outChunk = outBlockMetaData.getColumns().get(j); + ColumnIndex outColumnIndex = outReader.readColumnIndex(outChunk); + OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk); + if (inColumnIndex != null) { + Assert.assertEquals(inColumnIndex.getBoundaryOrder(), outColumnIndex.getBoundaryOrder()); + Assert.assertEquals(inColumnIndex.getMaxValues(), outColumnIndex.getMaxValues()); + Assert.assertEquals(inColumnIndex.getMinValues(), outColumnIndex.getMinValues()); + Assert.assertEquals(inColumnIndex.getNullCounts(), outColumnIndex.getNullCounts()); + } + if (inOffsetIndex != null) { + Assert.assertEquals(inOffsetIndex.getPageCount(), outOffsetIndex.getPageCount()); + for (int k = 0; k < inOffsetIndex.getPageCount(); k++) { + Assert.assertEquals(inOffsetIndex.getFirstRowIndex(k), outOffsetIndex.getFirstRowIndex(k)); + Assert.assertEquals(inOffsetIndex.getLastRowIndex(k, inChunk.getValueCount()), + outOffsetIndex.getLastRowIndex(k, outChunk.getValueCount())); + } + } + } + } + } + } + + private String createParquetFile(String prefix, String codec, int numRecord, ParquetProperties.WriterVersion writerVersion, int pageSize) throws IOException { + MessageType schema = new MessageType("schema", + new PrimitiveType(REQUIRED, INT64, "DocId"), + new PrimitiveType(REQUIRED, BINARY, "Name"), + new PrimitiveType(REQUIRED, BINARY, "Gender"), + new GroupType(OPTIONAL, "Links", + new PrimitiveType(REPEATED, BINARY, "Backward"), + new PrimitiveType(REPEATED, BINARY, "Forward"))); + + conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()); + + String file = createTempFile(prefix); + ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file)) + .withConf(conf) + .withWriterVersion(writerVersion) + .withExtraMetaData(extraMeta) + .withDictionaryEncoding("DocId", true) + .withValidation(true) + .enablePageWriteChecksum() + .withPageSize(pageSize) + .withCompressionCodec(CompressionCodecName.valueOf(codec)); + try (ParquetWriter writer = builder.build()) { + for (int i = 0; i < numRecord; i++) { + SimpleGroup g = new SimpleGroup(schema); + g.add("DocId", getLong()); + g.add("Name", getString()); + g.add("Gender", getString()); + Group links = g.addGroup("Links"); + links.add(0, getString()); + links.add(1, getString()); + writer.write(g); + } + } + + return file; + } + + private static long getLong() { + return ThreadLocalRandom.current().nextLong(1000); + } + + private static String getString() { + char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'}; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 100; i++) { + sb.append(chars[new Random().nextInt(10)]); + } + return sb.toString(); + } + + private static String createTempFile(String prefix) { + try { + return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet"; + } catch (IOException e) { + throw new AssertionError("Unable to create temporary file", e); + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 18fbf6d8c5..987b603337 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -613,8 +613,8 @@ public static ParquetFileReader open(InputFile file, ParquetReadOptions options) return new ParquetFileReader(file, options); } + protected final SeekableInputStream f; private final InputFile file; - private final SeekableInputStream f; private final ParquetReadOptions options; private final Map paths = new HashMap<>(); private final FileMetaData fileMetaData; // may be null diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java new file mode 100644 index 0000000000..afcc380202 --- /dev/null +++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.tools.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.Util; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; + +public class TransCompressionCommand extends ArgsOnlyCommand { + + public static final String[] USAGE = new String[] { + " ", + + "where is the source parquet file", + " is the destination parquet file," + + " is the codec name in the case sensitive format to be translated to, e.g. SNAPPY, GZIP, ZSTD, LZO, LZ4, BROTLI, UNCOMPRESSED" + }; + + private static final Logger LOG = LoggerFactory.getLogger(TransCompressionCommand.class); + + private Configuration conf; + + public TransCompressionCommand() { + super(3, 3); + this.conf = new Configuration(); + } + + public TransCompressionCommand(Configuration conf) { + super(3, 3); + this.conf = conf; + } + + @Override + public String[] getUsageDescription() { + return USAGE; + } + + @Override + public String getCommandDescription() { + return "Translate the compression of a given Parquet file to a new compression one to a new Parquet file."; + } + + @Override + public void execute(CommandLine options) throws Exception { + super.execute(options); + List args = options.getArgList(); + Path inPath = new Path(args.get(0)); + Path outPath = new Path(args.get(1)); + CompressionCodecName codecName = CompressionCodecName.valueOf(args.get(2)); + + ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER); + MessageType schema = metaData.getFileMetaData().getSchema(); + + try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) { + ParquetFileWriter writer = new ParquetFileWriter(conf, schema, outPath, ParquetFileWriter.Mode.CREATE); + writer.start(); + processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), codecName); + writer.end(metaData.getFileMetaData().getKeyValueMetaData()); + } + } + + private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema, + String createdBy, CompressionCodecName codecName) throws IOException { + int blockIndex = 0; + PageReadStore store = reader.readNextRowGroup(); + while (store != null) { + writer.startBlock(store.getRowCount()); + BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex); + List columnsInOrder = blockMetaData.getColumns(); + Map descriptorsMap = schema.getColumns().stream().collect( + Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x)); + for (int i = 0; i < columnsInOrder.size(); i += 1) { + ColumnChunkMetaData chunk = columnsInOrder.get(i); + ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DumpGroupConverter(), schema, createdBy); + ColumnDescriptor columnDescriptor = descriptorsMap.get(chunk.getPath()); + writer.startColumn(columnDescriptor, crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName); + processChunk(reader, writer, chunk, createdBy, codecName); + writer.endColumn(); + } + writer.endBlock(); + store = reader.readNextRowGroup(); + blockIndex++; + } + } + + private void processChunk(TransParquetFileReader reader, ParquetFileWriter writer, ColumnChunkMetaData chunk, + String createdBy, CompressionCodecName codecName) throws IOException { + CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); + BytesInputDecompressor decompressor = codecFactory.getDecompressor(chunk.getCodec()); + BytesInputCompressor compressor = codecFactory.getCompressor(codecName); + ColumnIndex columnIndex = reader.readColumnIndex(chunk); + OffsetIndex offsetIndex = reader.readOffsetIndex(chunk); + + reader.setStreamPosition(chunk.getStartingPos()); + DictionaryPage dictionaryPage = null; + long readValues = 0; + Statistics statistics = null; + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + int pageIndex = 0; + long totalChunkValues = chunk.getValueCount(); + while (readValues < totalChunkValues) { + PageHeader pageHeader = reader.readPageHeader(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + byte[] pageLoad; + switch (pageHeader.type) { + case DICTIONARY_PAGE: + if (dictionaryPage != null) { + throw new IOException("has more than one dictionary page in column chunk"); + } + DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header; + pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size()); + writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad), + pageHeader.getUncompressed_page_size(), + converter.getEncoding(dictPageHeader.getEncoding()))); + break; + case DATA_PAGE: + DataPageHeader headerV1 = pageHeader.data_page_header; + pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size()); + statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageIndex, converter); + readValues += headerV1.getNum_values(); + if (offsetIndex != null) { + long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex, totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex); + writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), + pageHeader.uncompressed_page_size, + BytesInput.from(pageLoad), + statistics, + toIntWithCheck(rowCount), + converter.getEncoding(headerV1.getRepetition_level_encoding()), + converter.getEncoding(headerV1.getDefinition_level_encoding()), + converter.getEncoding(headerV1.getEncoding())); + } else { + writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), + pageHeader.uncompressed_page_size, + BytesInput.from(pageLoad), + statistics, + converter.getEncoding(headerV1.getRepetition_level_encoding()), + converter.getEncoding(headerV1.getDefinition_level_encoding()), + converter.getEncoding(headerV1.getEncoding())); + } + pageIndex++; + break; + case DATA_PAGE_V2: + DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2; + int rlLength = headerV2.getRepetition_levels_byte_length(); + BytesInput rlLevels = readBlock(rlLength, reader); + int dlLength = headerV2.getDefinition_levels_byte_length(); + BytesInput dlLevels = readBlock(dlLength, reader); + int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength; + int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength; + pageLoad = translatePageLoad(reader, headerV2.is_compressed, compressor, decompressor, payLoadLength, rawDataLength); + statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageIndex, converter); + readValues += headerV2.getNum_values(); + writer.writeDataPageV2(headerV2.getNum_rows(), + headerV2.getNum_nulls(), + headerV2.getNum_values(), + rlLevels, + dlLevels, + converter.getEncoding(headerV2.getEncoding()), + BytesInput.from(pageLoad), + pageHeader.uncompressed_page_size - rlLength - dlLength, + statistics); + pageIndex++; + break; + default: + LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize); + break; + } + } + } + + private Statistics convertStatistics(String createdBy, PrimitiveType type, org.apache.parquet.format.Statistics pageStatistics, + ColumnIndex columnIndex, int pageIndex, ParquetMetadataConverter converter) throws IOException { + if (pageStatistics != null) { + return converter.fromParquetStatistics(createdBy, pageStatistics, type); + } else if (columnIndex != null) { + if (columnIndex.getNullPages() == null) { + throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " + type.getName()); + } + if (pageIndex > columnIndex.getNullPages().size()) { + throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " + columnIndex.getNullPages().size()); + } + org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type); + statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex)); + + if (!columnIndex.getNullPages().get(pageIndex)) { + statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone()); + statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone()); + } + return statsBuilder.build(); + } else { + return null; + } + } + + private byte[] translatePageLoad(TransParquetFileReader reader, boolean isCompressed, BytesInputCompressor compressor, + BytesInputDecompressor decompressor, int payloadLength, int rawDataLength) throws IOException { + BytesInput data = readBlock(payloadLength, reader); + if (isCompressed) { + data = decompressor.decompress(data, rawDataLength); + } + BytesInput newCompressedData = compressor.compress(data); + return newCompressedData.toByteArray(); + } + + private BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException { + byte[] data = new byte[length]; + reader.blockRead(data); + return BytesInput.from(data); + } + + private int toIntWithCheck(long size) { + if ((int)size != size) { + throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size); + } + return (int)size; + } + + private static final class DumpGroupConverter extends GroupConverter { + @Override public void start() {} + @Override public void end() {} + @Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); } + } + + private static final class DumpConverter extends PrimitiveConverter { + @Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); } + } + + private static final class TransParquetFileReader extends ParquetFileReader { + + public TransParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { + super(file, options); + } + + public void setStreamPosition(long newPos) throws IOException { + f.seek(newPos); + } + + public void blockRead(byte[] data) throws IOException { + f.readFully(data); + } + + public PageHeader readPageHeader() throws IOException { + return Util.readPageHeader(f); + } + } +} diff --git a/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestTransCompressionCommand.java b/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestTransCompressionCommand.java new file mode 100644 index 0000000000..3999d4903d --- /dev/null +++ b/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestTransCompressionCommand.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.tools.command; + +import com.google.common.collect.ImmutableMap; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.apache.parquet.schema.Type.Repetition.REPEATED; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestTransCompressionCommand { + + private TransCompressionCommand command = new TransCompressionCommand(); + private Configuration conf = new Configuration(); + private Map extraMeta + = ImmutableMap.of("key1", "value1", "key2", "value2"); + + @Test + public void testTransCompression() throws Exception { + String[] codecs = {"UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD"}; + for (int i = 0; i < codecs.length; i++) { + for (int j = 0; j reader = ParquetReader.builder(new GroupReadSupport(), new Path(inputFile)).withConf(conf).build(); + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + assertTrue(group.getLong("DocId", 0) < 1000); + assertEquals(group.getBinary("Name", 0).length(), 100); + assertEquals(group.getBinary("Gender", 0).length(), 100); + Group subGroup = group.getGroup("Links", 0); + assertEquals(subGroup.getBinary("Backward", 0).length(), 100); + assertEquals(subGroup.getBinary("Forward", 0).length(), 100); + } + reader.close(); + } + + private void validMeta(String inputFile, String outFile) throws Exception { + ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new Path(inputFile), NO_FILTER); + ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new Path(outFile), NO_FILTER); + Assert.assertEquals(inMetaData.getFileMetaData().getSchema(), outMetaData.getFileMetaData().getSchema()); + Assert.assertEquals(inMetaData.getFileMetaData().getKeyValueMetaData(), outMetaData.getFileMetaData().getKeyValueMetaData()); + } + + private void validColumnIndex(String inputFile, String outFile) throws Exception { + ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new Path(inputFile), NO_FILTER); + ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new Path(outFile), NO_FILTER); + Assert.assertEquals(inMetaData.getBlocks().size(), outMetaData.getBlocks().size()); + try (ParquetFileReader inReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(inputFile), conf), HadoopReadOptions.builder(conf).build()); + ParquetFileReader outReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(outFile), conf), HadoopReadOptions.builder(conf).build())) { + for (int i = 0; i < inMetaData.getBlocks().size(); i++) { + BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(i); + BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(i); + Assert.assertEquals(inBlockMetaData.getColumns().size(), outBlockMetaData.getColumns().size()); + for (int j = 0; j < inBlockMetaData.getColumns().size(); j++) { + ColumnChunkMetaData inChunk = inBlockMetaData.getColumns().get(j); + ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk); + OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk); + ColumnChunkMetaData outChunk = outBlockMetaData.getColumns().get(j); + ColumnIndex outColumnIndex = outReader.readColumnIndex(outChunk); + OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk); + if (inColumnIndex != null) { + Assert.assertEquals(inColumnIndex.getBoundaryOrder(), outColumnIndex.getBoundaryOrder()); + Assert.assertEquals(inColumnIndex.getMaxValues(), outColumnIndex.getMaxValues()); + Assert.assertEquals(inColumnIndex.getMinValues(), outColumnIndex.getMinValues()); + Assert.assertEquals(inColumnIndex.getNullCounts(), outColumnIndex.getNullCounts()); + } + if (inOffsetIndex != null) { + Assert.assertEquals(inOffsetIndex.getPageCount(), outOffsetIndex.getPageCount()); + for (int k = 0; k < inOffsetIndex.getPageCount(); k++) { + Assert.assertEquals(inOffsetIndex.getFirstRowIndex(k), outOffsetIndex.getFirstRowIndex(k)); + Assert.assertEquals(inOffsetIndex.getLastRowIndex(k, inChunk.getValueCount()), + outOffsetIndex.getLastRowIndex(k, outChunk.getValueCount())); + } + } + } + } + } + } + + private void executeCommandLine(String[] cargs) throws Exception { + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(new Options(), cargs, command.supportsExtraArgs()); + command.execute(cmd); + } + + private String createParquetFile(String prefix, String codec, int numRecord, ParquetProperties.WriterVersion writerVersion, int pageSize) throws IOException { + MessageType schema = new MessageType("schema", + new PrimitiveType(REQUIRED, INT64, "DocId"), + new PrimitiveType(REQUIRED, BINARY, "Name"), + new PrimitiveType(REQUIRED, BINARY, "Gender"), + new GroupType(OPTIONAL, "Links", + new PrimitiveType(REPEATED, BINARY, "Backward"), + new PrimitiveType(REPEATED, BINARY, "Forward"))); + + conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()); + + String file = createTempFile(prefix); + ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file)) + .withConf(conf) + .withWriterVersion(writerVersion) + .withExtraMetaData(extraMeta) + .withDictionaryEncoding("DocId", true) + .withValidation(true) + .enablePageWriteChecksum() + .withPageSize(pageSize) + .withCompressionCodec(CompressionCodecName.valueOf(codec)); + try (ParquetWriter writer = builder.build()) { + for (int i = 0; i < numRecord; i++) { + SimpleGroup g = new SimpleGroup(schema); + g.add("DocId", getLong()); + g.add("Name", getString()); + g.add("Gender", getString()); + Group links = g.addGroup("Links"); + links.add(0, getString()); + links.add(1, getString()); + writer.write(g); + } + } + + return file; + } + + private static long getLong() { + return ThreadLocalRandom.current().nextLong(1000); + } + + private static String getString() { + char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'}; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 100; i++) { + sb.append(chars[new Random().nextInt(10)]); + } + return sb.toString(); + } + + private static String createTempFile(String prefix) { + try { + return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet"; + } catch (IOException e) { + throw new AssertionError("Unable to create temporary file", e); + } + } +} From 64827a3ee339614b432c7424c13c03fbc8e91413 Mon Sep 17 00:00:00 2001 From: Xinli shang Date: Fri, 19 Jun 2020 22:46:49 -0700 Subject: [PATCH 2/3] Address feedback --- .../cli/commands/TransCompressionCommand.java | 224 +-------------- .../commands/TransCompressionCommandTest.java | 271 ------------------ .../hadoop/util/CompressionConverter.java | 271 ++++++++++++++++++ .../hadoop/util/H1SeekableInputStream.java | 2 +- .../hadoop/util/CompressionConveterTest.java | 203 ++++++++----- .../command/TransCompressionCommand.java | 232 +-------------- 6 files changed, 414 insertions(+), 789 deletions(-) delete mode 100644 parquet-cli/src/test/java/org/apache/parquet/cli/commands/TransCompressionCommandTest.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java rename parquet-tools/src/test/java/org/apache/parquet/tools/command/TestTransCompressionCommand.java => parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java (59%) diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java index b7a836e5a1..cae68107da 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java @@ -25,55 +25,30 @@ import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; -import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.cli.BaseCommand; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.impl.ColumnReadStoreImpl; -import org.apache.parquet.column.page.DictionaryPage; -import org.apache.parquet.column.page.PageReadStore; -import org.apache.parquet.column.statistics.Statistics; -import org.apache.parquet.compression.CompressionCodecFactory; -import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; -import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; -import org.apache.parquet.format.DataPageHeader; -import org.apache.parquet.format.DataPageHeaderV2; -import org.apache.parquet.format.DictionaryPageHeader; -import org.apache.parquet.format.PageHeader; -import org.apache.parquet.format.Util; -import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.hadoop.util.CompressionConverter; +import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader; import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.internal.column.columnindex.ColumnIndex; -import org.apache.parquet.internal.column.columnindex.OffsetIndex; -import org.apache.parquet.io.InputFile; -import org.apache.parquet.io.ParquetEncodingException; -import org.apache.parquet.io.api.Converter; -import org.apache.parquet.io.api.GroupConverter; -import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; import org.slf4j.Logger; import java.io.IOException; import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; @Parameters(commandDescription="Translate the compression from one to another") public class TransCompressionCommand extends BaseCommand { + private CompressionConverter compressionConverter; + public TransCompressionCommand(Logger console) { super(console); + compressionConverter = new CompressionConverter(); } @Parameter(description = "") @@ -100,11 +75,12 @@ public int run() throws IOException { ParquetMetadata metaData = ParquetFileReader.readFooter(getConf(), inPath, NO_FILTER); MessageType schema = metaData.getFileMetaData().getSchema(); + ParquetFileWriter writer = new ParquetFileWriter(getConf(), schema, outPath, ParquetFileWriter.Mode.CREATE); + writer.start(); try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, getConf()), HadoopReadOptions.builder(getConf()).build())) { - ParquetFileWriter writer = new ParquetFileWriter(getConf(), schema, outPath, ParquetFileWriter.Mode.CREATE); - writer.start(); - processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), codecName); + compressionConverter.processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), codecName); + } finally { writer.end(metaData.getFileMetaData().getKeyValueMetaData()); } return 0; @@ -117,186 +93,4 @@ public List getExamples() { " input.parquet output.parquet ZSTD" ); } - private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema, - String createdBy, CompressionCodecName codecName) throws IOException { - int blockIndex = 0; - PageReadStore store = reader.readNextRowGroup(); - while (store != null) { - writer.startBlock(store.getRowCount()); - BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex); - List columnsInOrder = blockMetaData.getColumns(); - Map descriptorsMap = schema.getColumns().stream().collect( - Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x)); - for (int i = 0; i < columnsInOrder.size(); i += 1) { - ColumnChunkMetaData chunk = columnsInOrder.get(i); - ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DumpGroupConverter(), schema, createdBy); - ColumnDescriptor columnDescriptor = descriptorsMap.get(chunk.getPath()); - writer.startColumn(columnDescriptor, crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName); - processChunk(reader, writer, chunk, createdBy, codecName); - writer.endColumn(); - } - writer.endBlock(); - store = reader.readNextRowGroup(); - blockIndex++; - } - } - - private void processChunk(TransParquetFileReader reader, ParquetFileWriter writer, ColumnChunkMetaData chunk, - String createdBy, CompressionCodecName codecName) throws IOException { - CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); - BytesInputDecompressor decompressor = codecFactory.getDecompressor(chunk.getCodec()); - BytesInputCompressor compressor = codecFactory.getCompressor(codecName); - ColumnIndex columnIndex = reader.readColumnIndex(chunk); - OffsetIndex offsetIndex = reader.readOffsetIndex(chunk); - - reader.setStreamPosition(chunk.getStartingPos()); - DictionaryPage dictionaryPage = null; - long readValues = 0; - Statistics statistics = null; - ParquetMetadataConverter converter = new ParquetMetadataConverter(); - int pageIndex = 0; - long totalChunkValues = chunk.getValueCount(); - while (readValues < totalChunkValues) { - PageHeader pageHeader = reader.readPageHeader(); - byte[] pageLoad; - switch (pageHeader.type) { - case DICTIONARY_PAGE: - if (dictionaryPage != null) { - throw new IOException("has more than one dictionary page in column chunk"); - } - DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header; - pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size()); - writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad), - pageHeader.getUncompressed_page_size(), - converter.getEncoding(dictPageHeader.getEncoding()))); - break; - case DATA_PAGE: - DataPageHeader headerV1 = pageHeader.data_page_header; - pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size()); - statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageIndex, converter); - readValues += headerV1.getNum_values(); - if (offsetIndex != null) { - long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex, totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex); - writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), - pageHeader.uncompressed_page_size, - BytesInput.from(pageLoad), - statistics, - toIntWithCheck(rowCount), - converter.getEncoding(headerV1.getRepetition_level_encoding()), - converter.getEncoding(headerV1.getDefinition_level_encoding()), - converter.getEncoding(headerV1.getEncoding())); - } else { - writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), - pageHeader.uncompressed_page_size, - BytesInput.from(pageLoad), - statistics, - converter.getEncoding(headerV1.getRepetition_level_encoding()), - converter.getEncoding(headerV1.getDefinition_level_encoding()), - converter.getEncoding(headerV1.getEncoding())); - } - pageIndex++; - break; - case DATA_PAGE_V2: - DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2; - int rlLength = headerV2.getRepetition_levels_byte_length(); - BytesInput rlLevels = readBlock(rlLength, reader); - int dlLength = headerV2.getDefinition_levels_byte_length(); - BytesInput dlLevels = readBlock(dlLength, reader); - int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength; - int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength; - pageLoad = translatePageLoad(reader, headerV2.is_compressed, compressor, decompressor, payLoadLength, rawDataLength); - statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageIndex, converter); - readValues += headerV2.getNum_values(); - writer.writeDataPageV2(headerV2.getNum_rows(), - headerV2.getNum_nulls(), - headerV2.getNum_values(), - rlLevels, - dlLevels, - converter.getEncoding(headerV2.getEncoding()), - BytesInput.from(pageLoad), - pageHeader.uncompressed_page_size - rlLength - dlLength, - statistics); - pageIndex++; - break; - default: - break; - } - } - } - - private Statistics convertStatistics(String createdBy, PrimitiveType type, org.apache.parquet.format.Statistics pageStatistics, - ColumnIndex columnIndex, int pageIndex, ParquetMetadataConverter converter) throws IOException { - if (pageStatistics != null) { - return converter.fromParquetStatistics(createdBy, pageStatistics, type); - } else if (columnIndex != null) { - if (columnIndex.getNullPages() == null) { - throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " + type.getName()); - } - if (pageIndex > columnIndex.getNullPages().size()) { - throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " + columnIndex.getNullPages().size()); - } - org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type); - statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex)); - - if (!columnIndex.getNullPages().get(pageIndex)) { - statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone()); - statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone()); - } - return statsBuilder.build(); - } else { - return null; - } - } - - private byte[] translatePageLoad(TransParquetFileReader reader, boolean isCompressed, BytesInputCompressor compressor, - BytesInputDecompressor decompressor, int payloadLength, int rawDataLength) throws IOException { - BytesInput data = readBlock(payloadLength, reader); - if (isCompressed) { - data = decompressor.decompress(data, rawDataLength); - } - BytesInput newCompressedData = compressor.compress(data); - return newCompressedData.toByteArray(); - } - - private BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException { - byte[] data = new byte[length]; - reader.blockRead(data); - return BytesInput.from(data); - } - - private int toIntWithCheck(long size) { - if ((int)size != size) { - throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size); - } - return (int)size; - } - - private static final class DumpGroupConverter extends GroupConverter { - @Override public void start() {} - @Override public void end() {} - @Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); } - } - - private static final class DumpConverter extends PrimitiveConverter { - @Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); } - } - - private static final class TransParquetFileReader extends ParquetFileReader { - - public TransParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { - super(file, options); - } - - public void setStreamPosition(long newPos) throws IOException { - f.seek(newPos); - } - - public void blockRead(byte[] data) throws IOException { - f.readFully(data); - } - - public PageHeader readPageHeader() throws IOException { - return Util.readPageHeader(f); - } - } } diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/TransCompressionCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/TransCompressionCommandTest.java deleted file mode 100644 index d6803f7b0a..0000000000 --- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/TransCompressionCommandTest.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cli.commands; - -import com.google.common.collect.ImmutableMap; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.HadoopReadOptions; -import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.column.page.PageReadStore; -import org.apache.parquet.example.data.Group; -import org.apache.parquet.example.data.simple.SimpleGroup; -import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.ParquetReader; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.example.ExampleParquetWriter; -import org.apache.parquet.hadoop.example.GroupReadSupport; -import org.apache.parquet.hadoop.example.GroupWriteSupport; -import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.internal.column.columnindex.ColumnIndex; -import org.apache.parquet.internal.column.columnindex.OffsetIndex; -import org.apache.parquet.io.ColumnIOFactory; -import org.apache.parquet.io.MessageColumnIO; -import org.apache.parquet.io.RecordReader; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.nio.file.Files; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; - -import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; -import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; -import static org.apache.parquet.schema.Type.Repetition.REPEATED; -import static org.apache.parquet.schema.Type.Repetition.REQUIRED; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TransCompressionCommandTest extends ParquetFileTest { - - private Configuration conf = new Configuration(); - private Map extraMeta - = ImmutableMap.of("key1", "value1", "key2", "value2"); - - @Test - public void testTransCompression() throws Exception { - String[] codecs = {"UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD"}; - for (int i = 0; i < codecs.length; i++) { - for (int j = 0; j reader = ParquetReader.builder(new GroupReadSupport(), new Path(inputFile)).withConf(conf).build(); - for (int i = 0; i < numRecord; i++) { - Group group = reader.read(); - assertTrue(group.getLong("DocId", 0) < 1000); - assertEquals(group.getBinary("Name", 0).length(), 100); - assertEquals(group.getBinary("Gender", 0).length(), 100); - Group subGroup = group.getGroup("Links", 0); - assertEquals(subGroup.getBinary("Backward", 0).length(), 100); - assertEquals(subGroup.getBinary("Forward", 0).length(), 100); - } - reader.close(); - } - - private void validMeta(String inputFile, String outFile) throws Exception { - ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new Path(inputFile), NO_FILTER); - ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new Path(outFile), NO_FILTER); - Assert.assertEquals(inMetaData.getFileMetaData().getSchema(), outMetaData.getFileMetaData().getSchema()); - Assert.assertEquals(inMetaData.getFileMetaData().getKeyValueMetaData(), outMetaData.getFileMetaData().getKeyValueMetaData()); - } - - private void validColumnIndex(String inputFile, String outFile) throws Exception { - ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new Path(inputFile), NO_FILTER); - ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new Path(outFile), NO_FILTER); - Assert.assertEquals(inMetaData.getBlocks().size(), outMetaData.getBlocks().size()); - try (ParquetFileReader inReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(inputFile), conf), HadoopReadOptions.builder(conf).build()); - ParquetFileReader outReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(outFile), conf), HadoopReadOptions.builder(conf).build())) { - for (int i = 0; i < inMetaData.getBlocks().size(); i++) { - BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(i); - BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(i); - Assert.assertEquals(inBlockMetaData.getColumns().size(), outBlockMetaData.getColumns().size()); - for (int j = 0; j < inBlockMetaData.getColumns().size(); j++) { - ColumnChunkMetaData inChunk = inBlockMetaData.getColumns().get(j); - ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk); - OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk); - ColumnChunkMetaData outChunk = outBlockMetaData.getColumns().get(j); - ColumnIndex outColumnIndex = outReader.readColumnIndex(outChunk); - OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk); - if (inColumnIndex != null) { - Assert.assertEquals(inColumnIndex.getBoundaryOrder(), outColumnIndex.getBoundaryOrder()); - Assert.assertEquals(inColumnIndex.getMaxValues(), outColumnIndex.getMaxValues()); - Assert.assertEquals(inColumnIndex.getMinValues(), outColumnIndex.getMinValues()); - Assert.assertEquals(inColumnIndex.getNullCounts(), outColumnIndex.getNullCounts()); - } - if (inOffsetIndex != null) { - Assert.assertEquals(inOffsetIndex.getPageCount(), outOffsetIndex.getPageCount()); - for (int k = 0; k < inOffsetIndex.getPageCount(); k++) { - Assert.assertEquals(inOffsetIndex.getFirstRowIndex(k), outOffsetIndex.getFirstRowIndex(k)); - Assert.assertEquals(inOffsetIndex.getLastRowIndex(k, inChunk.getValueCount()), - outOffsetIndex.getLastRowIndex(k, outChunk.getValueCount())); - } - } - } - } - } - } - - private String createParquetFile(String prefix, String codec, int numRecord, ParquetProperties.WriterVersion writerVersion, int pageSize) throws IOException { - MessageType schema = new MessageType("schema", - new PrimitiveType(REQUIRED, INT64, "DocId"), - new PrimitiveType(REQUIRED, BINARY, "Name"), - new PrimitiveType(REQUIRED, BINARY, "Gender"), - new GroupType(OPTIONAL, "Links", - new PrimitiveType(REPEATED, BINARY, "Backward"), - new PrimitiveType(REPEATED, BINARY, "Forward"))); - - conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()); - - String file = createTempFile(prefix); - ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file)) - .withConf(conf) - .withWriterVersion(writerVersion) - .withExtraMetaData(extraMeta) - .withDictionaryEncoding("DocId", true) - .withValidation(true) - .enablePageWriteChecksum() - .withPageSize(pageSize) - .withCompressionCodec(CompressionCodecName.valueOf(codec)); - try (ParquetWriter writer = builder.build()) { - for (int i = 0; i < numRecord; i++) { - SimpleGroup g = new SimpleGroup(schema); - g.add("DocId", getLong()); - g.add("Name", getString()); - g.add("Gender", getString()); - Group links = g.addGroup("Links"); - links.add(0, getString()); - links.add(1, getString()); - writer.write(g); - } - } - - return file; - } - - private static long getLong() { - return ThreadLocalRandom.current().nextLong(1000); - } - - private static String getString() { - char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'}; - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < 100; i++) { - sb.append(chars[new Random().nextInt(10)]); - } - return sb.toString(); - } - - private static String createTempFile(String prefix) { - try { - return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet"; - } catch (IOException e) { - throw new AssertionError("Unable to create temporary file", e); - } - } -} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java new file mode 100644 index 0000000000..79d9a6aa86 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.util; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.Util; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class CompressionConverter { + + private static final Logger LOG = LoggerFactory.getLogger(CompressionConverter.class); + + private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2; + private byte[] pageBuffer; + + public CompressionConverter() { + this.pageBuffer = new byte[pageBufferSize]; + } + + public void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema, + String createdBy, CompressionCodecName codecName) throws IOException { + int blockIndex = 0; + PageReadStore store = reader.readNextRowGroup(); + while (store != null) { + writer.startBlock(store.getRowCount()); + BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex); + List columnsInOrder = blockMetaData.getColumns(); + Map descriptorsMap = schema.getColumns().stream().collect( + Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x)); + for (int i = 0; i < columnsInOrder.size(); i += 1) { + ColumnChunkMetaData chunk = columnsInOrder.get(i); + ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, createdBy); + ColumnDescriptor columnDescriptor = descriptorsMap.get(chunk.getPath()); + writer.startColumn(columnDescriptor, crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName); + processChunk(reader, writer, chunk, createdBy, codecName); + writer.endColumn(); + } + writer.endBlock(); + store = reader.readNextRowGroup(); + blockIndex++; + } + } + + private void processChunk(TransParquetFileReader reader, ParquetFileWriter writer, ColumnChunkMetaData chunk, + String createdBy, CompressionCodecName codecName) throws IOException { + CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); + CompressionCodecFactory.BytesInputDecompressor decompressor = codecFactory.getDecompressor(chunk.getCodec()); + CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(codecName); + ColumnIndex columnIndex = reader.readColumnIndex(chunk); + OffsetIndex offsetIndex = reader.readOffsetIndex(chunk); + + reader.setStreamPosition(chunk.getStartingPos()); + DictionaryPage dictionaryPage = null; + long readValues = 0; + Statistics statistics = null; + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + int pageIndex = 0; + long totalChunkValues = chunk.getValueCount(); + while (readValues < totalChunkValues) { + PageHeader pageHeader = reader.readPageHeader(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + byte[] pageLoad; + switch (pageHeader.type) { + case DICTIONARY_PAGE: + if (dictionaryPage != null) { + throw new IOException("has more than one dictionary page in column chunk"); + } + DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header; + pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size()); + writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad), + pageHeader.getUncompressed_page_size(), + dictPageHeader.getNum_values(), + converter.getEncoding(dictPageHeader.getEncoding()))); + break; + case DATA_PAGE: + DataPageHeader headerV1 = pageHeader.data_page_header; + pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size()); + statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageIndex, converter); + readValues += headerV1.getNum_values(); + if (offsetIndex != null) { + long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex, totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex); + writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), + pageHeader.getUncompressed_page_size(), + BytesInput.from(pageLoad), + statistics, + toIntWithCheck(rowCount), + converter.getEncoding(headerV1.getRepetition_level_encoding()), + converter.getEncoding(headerV1.getDefinition_level_encoding()), + converter.getEncoding(headerV1.getEncoding())); + } else { + writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), + pageHeader.getUncompressed_page_size(), + BytesInput.from(pageLoad), + statistics, + converter.getEncoding(headerV1.getRepetition_level_encoding()), + converter.getEncoding(headerV1.getDefinition_level_encoding()), + converter.getEncoding(headerV1.getEncoding())); + } + pageIndex++; + break; + case DATA_PAGE_V2: + DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2; + int rlLength = headerV2.getRepetition_levels_byte_length(); + BytesInput rlLevels = readBlockAllocate(rlLength, reader); + int dlLength = headerV2.getDefinition_levels_byte_length(); + BytesInput dlLevels = readBlockAllocate(dlLength, reader); + int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength; + int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength; + pageLoad = translatePageLoad(reader, headerV2.is_compressed, compressor, decompressor, payLoadLength, rawDataLength); + statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageIndex, converter); + readValues += headerV2.getNum_values(); + writer.writeDataPageV2(headerV2.getNum_rows(), + headerV2.getNum_nulls(), + headerV2.getNum_values(), + rlLevels, + dlLevels, + converter.getEncoding(headerV2.getEncoding()), + BytesInput.from(pageLoad), + rawDataLength, + statistics); + pageIndex++; + break; + default: + LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize); + break; + } + } + } + + private Statistics convertStatistics(String createdBy, PrimitiveType type, org.apache.parquet.format.Statistics pageStatistics, + ColumnIndex columnIndex, int pageIndex, ParquetMetadataConverter converter) throws IOException { + if (pageStatistics != null) { + return converter.fromParquetStatistics(createdBy, pageStatistics, type); + } else if (columnIndex != null) { + if (columnIndex.getNullPages() == null) { + throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " + type.getName()); + } + if (pageIndex > columnIndex.getNullPages().size()) { + throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " + columnIndex.getNullPages().size()); + } + org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type); + statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex)); + + if (!columnIndex.getNullPages().get(pageIndex)) { + statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone()); + statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone()); + } + return statsBuilder.build(); + } else { + return null; + } + } + + private byte[] translatePageLoad(TransParquetFileReader reader, boolean isCompressed, CompressionCodecFactory.BytesInputCompressor compressor, + CompressionCodecFactory.BytesInputDecompressor decompressor, int payloadLength, int rawDataLength) throws IOException { + BytesInput data = readBlock(payloadLength, reader); + if (isCompressed) { + data = decompressor.decompress(data, rawDataLength); + } + BytesInput newCompressedData = compressor.compress(data); + return newCompressedData.toByteArray(); + } + + public BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException { + byte[] data; + if (length > pageBufferSize) { + data = new byte[length]; + } else { + data = pageBuffer; + } + reader.blockRead(data, 0, length); + return BytesInput.from(data, 0, length); + } + + public BytesInput readBlockAllocate(int length, TransParquetFileReader reader) throws IOException { + byte[] data = new byte[length]; + reader.blockRead(data, 0, length); + return BytesInput.from(data, 0, length); + } + + private int toIntWithCheck(long size) { + if ((int)size != size) { + throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size); + } + return (int)size; + } + + private static final class DummyGroupConverter extends GroupConverter { + @Override public void start() {} + @Override public void end() {} + @Override public Converter getConverter(int fieldIndex) { return new DummyConverter(); } + } + + private static final class DummyConverter extends PrimitiveConverter { + @Override public GroupConverter asGroupConverter() { return new DummyGroupConverter(); } + } + + public static final class TransParquetFileReader extends ParquetFileReader { + + public TransParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { + super(file, options); + } + + public void setStreamPosition(long newPos) throws IOException { + f.seek(newPos); + } + + public void blockRead(byte[] data, int start, int len) throws IOException { + f.readFully(data, start, len); + } + + public PageHeader readPageHeader() throws IOException { + return Util.readPageHeader(f); + } + + public long getPos() throws IOException { + return f.getPos(); + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java index 876a1f372f..0f8cdbb551 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java @@ -53,7 +53,7 @@ public void readFully(byte[] bytes) throws IOException { @Override public void readFully(byte[] bytes, int start, int len) throws IOException { - stream.readFully(bytes); + stream.readFully(bytes, start, len); } } diff --git a/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestTransCompressionCommand.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java similarity index 59% rename from parquet-tools/src/test/java/org/apache/parquet/tools/command/TestTransCompressionCommand.java rename to parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java index 3999d4903d..01b4a30f1c 100644 --- a/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestTransCompressionCommand.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java @@ -16,14 +16,9 @@ * specific language governing permissions and limitations * under the License. */ - -package org.apache.parquet.tools.command; +package org.apache.parquet.hadoop.util; import com.google.common.collect.ImmutableMap; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; @@ -33,7 +28,12 @@ import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroup; import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; @@ -43,7 +43,7 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader; import org.apache.parquet.internal.column.columnindex.ColumnIndex; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.io.ColumnIOFactory; @@ -57,6 +57,8 @@ import java.io.IOException; import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; @@ -67,15 +69,15 @@ import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; import static org.apache.parquet.schema.Type.Repetition.REPEATED; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertTrue; -public class TestTransCompressionCommand { - - private TransCompressionCommand command = new TransCompressionCommand(); +public class CompressionConveterTest { + private Configuration conf = new Configuration(); private Map extraMeta = ImmutableMap.of("key1", "value1", "key2", "value2"); + private CompressionConverter compressionConverter = new CompressionConverter(); @Test public void testTransCompression() throws Exception { @@ -91,75 +93,46 @@ public void testTransCompression() throws Exception { } } - @Test - public void testSpeed() throws Exception { - String inputFile = createParquetFile("input", "GZIP", 100000, - ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE); - String outputFile = createTempFile("output_trans"); - String cargs[] = {inputFile, outputFile, "ZSTD"}; - - long start = System.currentTimeMillis(); - executeCommandLine(cargs); - long durationTrans = System.currentTimeMillis() - start; - - outputFile = createTempFile("output_record"); - start = System.currentTimeMillis(); - convertRecordByRecord(CompressionCodecName.valueOf("ZSTD"), new Path(inputFile), new Path(outputFile)); - long durationRecord = System.currentTimeMillis() - start; - - // The TransCompressionCommand is ~5 times faster than translating record by record - Assert.assertTrue(durationTrans < durationRecord); - } - private void testInternal(String srcCodec, String destCodec, ParquetProperties.WriterVersion writerVersion, int pageSize) throws Exception { int numRecord = 1000; - String inputFile = createParquetFile("input", srcCodec, numRecord, writerVersion, pageSize); + TestDocs testDocs = new TestDocs(numRecord); + String inputFile = createParquetFile(conf, extraMeta, numRecord, "input", srcCodec, writerVersion, pageSize, testDocs); String outputFile = createTempFile("output_trans"); - String cargs[] = {inputFile, outputFile, destCodec}; - executeCommandLine(cargs); - validateColumns(inputFile, numRecord); + + convertCompression(conf, inputFile, outputFile, destCodec); + + validateColumns(outputFile, numRecord, testDocs); validMeta(inputFile, outputFile); validColumnIndex(inputFile, outputFile); } - private void convertRecordByRecord(CompressionCodecName codecName, Path inpath, Path outpath) throws Exception { - ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inpath, NO_FILTER); - MessageType schema = metaData.getFileMetaData().getSchema(); - HadoopInputFile inputFile = HadoopInputFile.fromPath(inpath, conf); - ParquetReadOptions readOptions = HadoopReadOptions.builder(conf).build(); - - conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()); - ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(outpath).withConf(conf).withCompressionCodec(codecName); - - ParquetWriter parquetWriter = builder.build(); + private void convertCompression(Configuration conf, String inputFile, String outputFile, String codec) throws IOException { + Path inPath = new Path(inputFile); + Path outPath = new Path(outputFile); + CompressionCodecName codecName = CompressionCodecName.valueOf(codec); - PageReadStore pages; - ParquetFileReader reader = new ParquetFileReader(inputFile, readOptions); - - while ((pages = reader.readNextRowGroup()) != null) { - long rows = pages.getRowCount(); - MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); - RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); + ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER); + MessageType schema = metaData.getFileMetaData().getSchema(); + ParquetFileWriter writer = new ParquetFileWriter(conf, schema, outPath, ParquetFileWriter.Mode.CREATE); + writer.start(); - for (int i = 0; i < rows; i++) { - SimpleGroup simpleGroup = (SimpleGroup) recordReader.read(); - parquetWriter.write(simpleGroup); - } + try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) { + compressionConverter.processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), codecName); + } finally { + writer.end(metaData.getFileMetaData().getKeyValueMetaData()); } - - parquetWriter.close(); } - private void validateColumns(String inputFile, int numRecord) throws IOException { - ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(inputFile)).withConf(conf).build(); + private void validateColumns(String file, int numRecord, TestDocs testDocs) throws IOException { + ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)).withConf(conf).build(); for (int i = 0; i < numRecord; i++) { Group group = reader.read(); - assertTrue(group.getLong("DocId", 0) < 1000); - assertEquals(group.getBinary("Name", 0).length(), 100); - assertEquals(group.getBinary("Gender", 0).length(), 100); + assertTrue(group.getLong("DocId", 0) == testDocs.docId[i]); + assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes()); + assertArrayEquals(group.getBinary("Gender", 0).getBytes(), testDocs.gender[i].getBytes()); Group subGroup = group.getGroup("Links", 0); - assertEquals(subGroup.getBinary("Backward", 0).length(), 100); - assertEquals(subGroup.getBinary("Forward", 0).length(), 100); + assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(), testDocs.linkBackward[i].getBytes()); + assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes()); } reader.close(); } @@ -175,8 +148,8 @@ private void validColumnIndex(String inputFile, String outFile) throws Exception ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new Path(inputFile), NO_FILTER); ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new Path(outFile), NO_FILTER); Assert.assertEquals(inMetaData.getBlocks().size(), outMetaData.getBlocks().size()); - try (ParquetFileReader inReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(inputFile), conf), HadoopReadOptions.builder(conf).build()); - ParquetFileReader outReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(outFile), conf), HadoopReadOptions.builder(conf).build())) { + try (TransParquetFileReader inReader = new TransParquetFileReader(HadoopInputFile.fromPath(new Path(inputFile), conf), HadoopReadOptions.builder(conf).build()); + TransParquetFileReader outReader = new TransParquetFileReader(HadoopInputFile.fromPath(new Path(outFile), conf), HadoopReadOptions.builder(conf).build())) { for (int i = 0; i < inMetaData.getBlocks().size(); i++) { BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(i); BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(i); @@ -195,11 +168,17 @@ private void validColumnIndex(String inputFile, String outFile) throws Exception Assert.assertEquals(inColumnIndex.getNullCounts(), outColumnIndex.getNullCounts()); } if (inOffsetIndex != null) { + List inOffsets = getOffsets(inReader, inChunk); + List outOffsets = getOffsets(outReader, outChunk); + Assert.assertEquals(inOffsets.size(), outOffsets.size()); + Assert.assertEquals(inOffsets.size(), inOffsetIndex.getPageCount()); Assert.assertEquals(inOffsetIndex.getPageCount(), outOffsetIndex.getPageCount()); for (int k = 0; k < inOffsetIndex.getPageCount(); k++) { Assert.assertEquals(inOffsetIndex.getFirstRowIndex(k), outOffsetIndex.getFirstRowIndex(k)); Assert.assertEquals(inOffsetIndex.getLastRowIndex(k, inChunk.getValueCount()), outOffsetIndex.getLastRowIndex(k, outChunk.getValueCount())); + Assert.assertEquals(inOffsetIndex.getOffset(k), (long)inOffsets.get(k)); + Assert.assertEquals(outOffsetIndex.getOffset(k), (long)outOffsets.get(k)); } } } @@ -207,13 +186,44 @@ private void validColumnIndex(String inputFile, String outFile) throws Exception } } - private void executeCommandLine(String[] cargs) throws Exception { - CommandLineParser parser = new PosixParser(); - CommandLine cmd = parser.parse(new Options(), cargs, command.supportsExtraArgs()); - command.execute(cmd); + private List getOffsets(TransParquetFileReader reader, ColumnChunkMetaData chunk) throws IOException { + List offsets = new ArrayList<>(); + reader.setStreamPosition(chunk.getStartingPos()); + long readValues = 0; + long totalChunkValues = chunk.getValueCount(); + while (readValues < totalChunkValues) { + long curOffset = reader.getPos(); + PageHeader pageHeader = reader.readPageHeader(); + switch (pageHeader.type) { + case DICTIONARY_PAGE: + compressionConverter.readBlock(pageHeader.getCompressed_page_size(), reader); + break; + case DATA_PAGE: + DataPageHeader headerV1 = pageHeader.data_page_header; + offsets.add(curOffset); + compressionConverter.readBlock(pageHeader.getCompressed_page_size(), reader); + readValues += headerV1.getNum_values(); + break; + case DATA_PAGE_V2: + DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2; + offsets.add(curOffset); + int rlLength = headerV2.getRepetition_levels_byte_length(); + compressionConverter.readBlock(rlLength, reader); + int dlLength = headerV2.getDefinition_levels_byte_length(); + compressionConverter.readBlock(dlLength, reader); + int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength; + compressionConverter.readBlock(payLoadLength, reader); + readValues += headerV2.getNum_values(); + break; + default: + throw new IOException("Not recognized page type"); + } + } + return offsets; } - private String createParquetFile(String prefix, String codec, int numRecord, ParquetProperties.WriterVersion writerVersion, int pageSize) throws IOException { + private String createParquetFile(Configuration conf, Map extraMeta, int numRecord, String prefix, String codec, + ParquetProperties.WriterVersion writerVersion, int pageSize, TestDocs testDocs) throws IOException { MessageType schema = new MessageType("schema", new PrimitiveType(REQUIRED, INT64, "DocId"), new PrimitiveType(REQUIRED, BINARY, "Name"), @@ -237,12 +247,12 @@ private String createParquetFile(String prefix, String codec, int numRecord, Par try (ParquetWriter writer = builder.build()) { for (int i = 0; i < numRecord; i++) { SimpleGroup g = new SimpleGroup(schema); - g.add("DocId", getLong()); - g.add("Name", getString()); - g.add("Gender", getString()); + g.add("DocId", testDocs.docId[i]); + g.add("Name", testDocs.name[i]); + g.add("Gender", testDocs.gender[i]); Group links = g.addGroup("Links"); - links.add(0, getString()); - links.add(1, getString()); + links.add(0, testDocs.linkBackward[i]); + links.add(1, testDocs.linkForward[i]); writer.write(g); } } @@ -254,7 +264,7 @@ private static long getLong() { return ThreadLocalRandom.current().nextLong(1000); } - private static String getString() { + private String getString() { char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'}; StringBuilder sb = new StringBuilder(); for (int i = 0; i < 100; i++) { @@ -263,11 +273,46 @@ private static String getString() { return sb.toString(); } - private static String createTempFile(String prefix) { + private String createTempFile(String prefix) { try { return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet"; } catch (IOException e) { throw new AssertionError("Unable to create temporary file", e); } } + + private class TestDocs { + public long[] docId; + public String[] name; + public String[] gender; + public String[] linkBackward; + public String[] linkForward; + + public TestDocs(int numRecord) { + docId = new long[numRecord]; + for (int i = 0; i < numRecord; i++) { + docId[i] = getLong(); + } + + name = new String[numRecord]; + for (int i = 0; i < numRecord; i++) { + name[i] = getString(); + } + + gender = new String[numRecord]; + for (int i = 0; i < numRecord; i++) { + gender[i] = getString(); + } + + linkBackward = new String[numRecord]; + for (int i = 0; i < numRecord; i++) { + linkBackward[i] = getString(); + } + + linkForward = new String[numRecord]; + for (int i = 0; i < numRecord; i++) { + linkForward[i] = getString(); + } + } + } } diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java index afcc380202..1348a63ff5 100644 --- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java +++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java @@ -22,47 +22,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; -import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.impl.ColumnReadStoreImpl; -import org.apache.parquet.column.page.DictionaryPage; -import org.apache.parquet.column.page.PageReadStore; -import org.apache.parquet.column.statistics.Statistics; -import org.apache.parquet.compression.CompressionCodecFactory; -import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; -import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; -import org.apache.parquet.format.DataPageHeader; -import org.apache.parquet.format.DataPageHeaderV2; -import org.apache.parquet.format.DictionaryPageHeader; -import org.apache.parquet.format.PageHeader; -import org.apache.parquet.format.Util; -import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.hadoop.util.CompressionConverter; +import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader; import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.internal.column.columnindex.ColumnIndex; -import org.apache.parquet.internal.column.columnindex.OffsetIndex; -import org.apache.parquet.io.InputFile; -import org.apache.parquet.io.ParquetEncodingException; -import org.apache.parquet.io.api.Converter; -import org.apache.parquet.io.api.GroupConverter; -import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; @@ -76,18 +45,19 @@ public class TransCompressionCommand extends ArgsOnlyCommand { " is the codec name in the case sensitive format to be translated to, e.g. SNAPPY, GZIP, ZSTD, LZO, LZ4, BROTLI, UNCOMPRESSED" }; - private static final Logger LOG = LoggerFactory.getLogger(TransCompressionCommand.class); - private Configuration conf; + private CompressionConverter compressionConverter; public TransCompressionCommand() { super(3, 3); this.conf = new Configuration(); + compressionConverter = new CompressionConverter(); } public TransCompressionCommand(Configuration conf) { super(3, 3); this.conf = conf; + compressionConverter = new CompressionConverter(); } @Override @@ -110,197 +80,13 @@ public void execute(CommandLine options) throws Exception { ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER); MessageType schema = metaData.getFileMetaData().getSchema(); + ParquetFileWriter writer = new ParquetFileWriter(conf, schema, outPath, ParquetFileWriter.Mode.CREATE); + writer.start(); try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) { - ParquetFileWriter writer = new ParquetFileWriter(conf, schema, outPath, ParquetFileWriter.Mode.CREATE); - writer.start(); - processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), codecName); + compressionConverter.processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), codecName); + } finally { writer.end(metaData.getFileMetaData().getKeyValueMetaData()); } } - - private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema, - String createdBy, CompressionCodecName codecName) throws IOException { - int blockIndex = 0; - PageReadStore store = reader.readNextRowGroup(); - while (store != null) { - writer.startBlock(store.getRowCount()); - BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex); - List columnsInOrder = blockMetaData.getColumns(); - Map descriptorsMap = schema.getColumns().stream().collect( - Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x)); - for (int i = 0; i < columnsInOrder.size(); i += 1) { - ColumnChunkMetaData chunk = columnsInOrder.get(i); - ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DumpGroupConverter(), schema, createdBy); - ColumnDescriptor columnDescriptor = descriptorsMap.get(chunk.getPath()); - writer.startColumn(columnDescriptor, crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName); - processChunk(reader, writer, chunk, createdBy, codecName); - writer.endColumn(); - } - writer.endBlock(); - store = reader.readNextRowGroup(); - blockIndex++; - } - } - - private void processChunk(TransParquetFileReader reader, ParquetFileWriter writer, ColumnChunkMetaData chunk, - String createdBy, CompressionCodecName codecName) throws IOException { - CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); - BytesInputDecompressor decompressor = codecFactory.getDecompressor(chunk.getCodec()); - BytesInputCompressor compressor = codecFactory.getCompressor(codecName); - ColumnIndex columnIndex = reader.readColumnIndex(chunk); - OffsetIndex offsetIndex = reader.readOffsetIndex(chunk); - - reader.setStreamPosition(chunk.getStartingPos()); - DictionaryPage dictionaryPage = null; - long readValues = 0; - Statistics statistics = null; - ParquetMetadataConverter converter = new ParquetMetadataConverter(); - int pageIndex = 0; - long totalChunkValues = chunk.getValueCount(); - while (readValues < totalChunkValues) { - PageHeader pageHeader = reader.readPageHeader(); - int compressedPageSize = pageHeader.getCompressed_page_size(); - byte[] pageLoad; - switch (pageHeader.type) { - case DICTIONARY_PAGE: - if (dictionaryPage != null) { - throw new IOException("has more than one dictionary page in column chunk"); - } - DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header; - pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size()); - writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad), - pageHeader.getUncompressed_page_size(), - converter.getEncoding(dictPageHeader.getEncoding()))); - break; - case DATA_PAGE: - DataPageHeader headerV1 = pageHeader.data_page_header; - pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size()); - statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageIndex, converter); - readValues += headerV1.getNum_values(); - if (offsetIndex != null) { - long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex, totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex); - writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), - pageHeader.uncompressed_page_size, - BytesInput.from(pageLoad), - statistics, - toIntWithCheck(rowCount), - converter.getEncoding(headerV1.getRepetition_level_encoding()), - converter.getEncoding(headerV1.getDefinition_level_encoding()), - converter.getEncoding(headerV1.getEncoding())); - } else { - writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), - pageHeader.uncompressed_page_size, - BytesInput.from(pageLoad), - statistics, - converter.getEncoding(headerV1.getRepetition_level_encoding()), - converter.getEncoding(headerV1.getDefinition_level_encoding()), - converter.getEncoding(headerV1.getEncoding())); - } - pageIndex++; - break; - case DATA_PAGE_V2: - DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2; - int rlLength = headerV2.getRepetition_levels_byte_length(); - BytesInput rlLevels = readBlock(rlLength, reader); - int dlLength = headerV2.getDefinition_levels_byte_length(); - BytesInput dlLevels = readBlock(dlLength, reader); - int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength; - int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength; - pageLoad = translatePageLoad(reader, headerV2.is_compressed, compressor, decompressor, payLoadLength, rawDataLength); - statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageIndex, converter); - readValues += headerV2.getNum_values(); - writer.writeDataPageV2(headerV2.getNum_rows(), - headerV2.getNum_nulls(), - headerV2.getNum_values(), - rlLevels, - dlLevels, - converter.getEncoding(headerV2.getEncoding()), - BytesInput.from(pageLoad), - pageHeader.uncompressed_page_size - rlLength - dlLength, - statistics); - pageIndex++; - break; - default: - LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize); - break; - } - } - } - - private Statistics convertStatistics(String createdBy, PrimitiveType type, org.apache.parquet.format.Statistics pageStatistics, - ColumnIndex columnIndex, int pageIndex, ParquetMetadataConverter converter) throws IOException { - if (pageStatistics != null) { - return converter.fromParquetStatistics(createdBy, pageStatistics, type); - } else if (columnIndex != null) { - if (columnIndex.getNullPages() == null) { - throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " + type.getName()); - } - if (pageIndex > columnIndex.getNullPages().size()) { - throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " + columnIndex.getNullPages().size()); - } - org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type); - statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex)); - - if (!columnIndex.getNullPages().get(pageIndex)) { - statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone()); - statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone()); - } - return statsBuilder.build(); - } else { - return null; - } - } - - private byte[] translatePageLoad(TransParquetFileReader reader, boolean isCompressed, BytesInputCompressor compressor, - BytesInputDecompressor decompressor, int payloadLength, int rawDataLength) throws IOException { - BytesInput data = readBlock(payloadLength, reader); - if (isCompressed) { - data = decompressor.decompress(data, rawDataLength); - } - BytesInput newCompressedData = compressor.compress(data); - return newCompressedData.toByteArray(); - } - - private BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException { - byte[] data = new byte[length]; - reader.blockRead(data); - return BytesInput.from(data); - } - - private int toIntWithCheck(long size) { - if ((int)size != size) { - throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size); - } - return (int)size; - } - - private static final class DumpGroupConverter extends GroupConverter { - @Override public void start() {} - @Override public void end() {} - @Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); } - } - - private static final class DumpConverter extends PrimitiveConverter { - @Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); } - } - - private static final class TransParquetFileReader extends ParquetFileReader { - - public TransParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { - super(file, options); - } - - public void setStreamPosition(long newPos) throws IOException { - f.seek(newPos); - } - - public void blockRead(byte[] data) throws IOException { - f.readFully(data); - } - - public PageHeader readPageHeader() throws IOException { - return Util.readPageHeader(f); - } - } } From 99d1a08975b83d0df93c730edc2d7b007535fd70 Mon Sep 17 00:00:00 2001 From: Xinli shang Date: Mon, 22 Jun 2020 16:19:28 -0700 Subject: [PATCH 3/3] Address more feedbacks --- .../apache/parquet/hadoop/util/CompressionConverter.java | 6 +++--- .../apache/parquet/hadoop/util/CompressionConveterTest.java | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java index 79d9a6aa86..922699f486 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java @@ -180,9 +180,7 @@ private void processChunk(TransParquetFileReader reader, ParquetFileWriter write private Statistics convertStatistics(String createdBy, PrimitiveType type, org.apache.parquet.format.Statistics pageStatistics, ColumnIndex columnIndex, int pageIndex, ParquetMetadataConverter converter) throws IOException { - if (pageStatistics != null) { - return converter.fromParquetStatistics(createdBy, pageStatistics, type); - } else if (columnIndex != null) { + if (columnIndex != null) { if (columnIndex.getNullPages() == null) { throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " + type.getName()); } @@ -197,6 +195,8 @@ private Statistics convertStatistics(String createdBy, PrimitiveType type, org.a statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone()); } return statsBuilder.build(); + } else if (pageStatistics != null) { + return converter.fromParquetStatistics(createdBy, pageStatistics, type); } else { return null; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java index 01b4a30f1c..fefa5e4f08 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java @@ -78,6 +78,7 @@ public class CompressionConveterTest { private Map extraMeta = ImmutableMap.of("key1", "value1", "key2", "value2"); private CompressionConverter compressionConverter = new CompressionConverter(); + private Random rnd = new Random(5); @Test public void testTransCompression() throws Exception { @@ -268,7 +269,7 @@ private String getString() { char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'}; StringBuilder sb = new StringBuilder(); for (int i = 0; i < 100; i++) { - sb.append(chars[new Random().nextInt(10)]); + sb.append(chars[rnd.nextInt(10)]); } return sb.toString(); }