diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java new file mode 100644 index 0000000000..cae68107da --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.cli.commands; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.cli.BaseCommand; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.CompressionConverter; +import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.List; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; + +@Parameters(commandDescription="Translate the compression from one to another") +public class TransCompressionCommand extends BaseCommand { + + private CompressionConverter compressionConverter; + + public TransCompressionCommand(Logger console) { + super(console); + compressionConverter = new CompressionConverter(); + } + + @Parameter(description = "") + String input; + + @Parameter(description = "") + String output; + + @Parameter(description = " getExamples() { + return Lists.newArrayList( + "# Translate the compression from one to another", + " input.parquet output.parquet ZSTD" + ); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 18fbf6d8c5..987b603337 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -613,8 +613,8 @@ public static ParquetFileReader open(InputFile file, ParquetReadOptions options) return new ParquetFileReader(file, options); } + protected final SeekableInputStream f; private final InputFile file; - private final SeekableInputStream f; private final ParquetReadOptions options; private final Map paths = new HashMap<>(); private final FileMetaData fileMetaData; // may be null diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java new file mode 100644 index 0000000000..922699f486 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.util; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.Util; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class CompressionConverter { + + private static final Logger LOG = LoggerFactory.getLogger(CompressionConverter.class); + + private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2; + private byte[] pageBuffer; + + public CompressionConverter() { + this.pageBuffer = new byte[pageBufferSize]; + } + + public void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema, + String createdBy, CompressionCodecName codecName) throws IOException { + int blockIndex = 0; + PageReadStore store = reader.readNextRowGroup(); + while (store != null) { + writer.startBlock(store.getRowCount()); + BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex); + List columnsInOrder = blockMetaData.getColumns(); + Map descriptorsMap = schema.getColumns().stream().collect( + Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x)); + for (int i = 0; i < columnsInOrder.size(); i += 1) { + ColumnChunkMetaData chunk = columnsInOrder.get(i); + ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, createdBy); + ColumnDescriptor columnDescriptor = descriptorsMap.get(chunk.getPath()); + writer.startColumn(columnDescriptor, crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName); + processChunk(reader, writer, chunk, createdBy, codecName); + writer.endColumn(); + } + writer.endBlock(); + store = reader.readNextRowGroup(); + blockIndex++; + } + } + + private void processChunk(TransParquetFileReader reader, ParquetFileWriter writer, ColumnChunkMetaData chunk, + String createdBy, CompressionCodecName codecName) throws IOException { + CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); + CompressionCodecFactory.BytesInputDecompressor decompressor = codecFactory.getDecompressor(chunk.getCodec()); + CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(codecName); + ColumnIndex columnIndex = reader.readColumnIndex(chunk); + OffsetIndex offsetIndex = reader.readOffsetIndex(chunk); + + reader.setStreamPosition(chunk.getStartingPos()); + DictionaryPage dictionaryPage = null; + long readValues = 0; + Statistics statistics = null; + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + int pageIndex = 0; + long totalChunkValues = chunk.getValueCount(); + while (readValues < totalChunkValues) { + PageHeader pageHeader = reader.readPageHeader(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + byte[] pageLoad; + switch (pageHeader.type) { + case DICTIONARY_PAGE: + if (dictionaryPage != null) { + throw new IOException("has more than one dictionary page in column chunk"); + } + DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header; + pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size()); + writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad), + pageHeader.getUncompressed_page_size(), + dictPageHeader.getNum_values(), + converter.getEncoding(dictPageHeader.getEncoding()))); + break; + case DATA_PAGE: + DataPageHeader headerV1 = pageHeader.data_page_header; + pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size()); + statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageIndex, converter); + readValues += headerV1.getNum_values(); + if (offsetIndex != null) { + long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex, totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex); + writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), + pageHeader.getUncompressed_page_size(), + BytesInput.from(pageLoad), + statistics, + toIntWithCheck(rowCount), + converter.getEncoding(headerV1.getRepetition_level_encoding()), + converter.getEncoding(headerV1.getDefinition_level_encoding()), + converter.getEncoding(headerV1.getEncoding())); + } else { + writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), + pageHeader.getUncompressed_page_size(), + BytesInput.from(pageLoad), + statistics, + converter.getEncoding(headerV1.getRepetition_level_encoding()), + converter.getEncoding(headerV1.getDefinition_level_encoding()), + converter.getEncoding(headerV1.getEncoding())); + } + pageIndex++; + break; + case DATA_PAGE_V2: + DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2; + int rlLength = headerV2.getRepetition_levels_byte_length(); + BytesInput rlLevels = readBlockAllocate(rlLength, reader); + int dlLength = headerV2.getDefinition_levels_byte_length(); + BytesInput dlLevels = readBlockAllocate(dlLength, reader); + int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength; + int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength; + pageLoad = translatePageLoad(reader, headerV2.is_compressed, compressor, decompressor, payLoadLength, rawDataLength); + statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageIndex, converter); + readValues += headerV2.getNum_values(); + writer.writeDataPageV2(headerV2.getNum_rows(), + headerV2.getNum_nulls(), + headerV2.getNum_values(), + rlLevels, + dlLevels, + converter.getEncoding(headerV2.getEncoding()), + BytesInput.from(pageLoad), + rawDataLength, + statistics); + pageIndex++; + break; + default: + LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize); + break; + } + } + } + + private Statistics convertStatistics(String createdBy, PrimitiveType type, org.apache.parquet.format.Statistics pageStatistics, + ColumnIndex columnIndex, int pageIndex, ParquetMetadataConverter converter) throws IOException { + if (columnIndex != null) { + if (columnIndex.getNullPages() == null) { + throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " + type.getName()); + } + if (pageIndex > columnIndex.getNullPages().size()) { + throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " + columnIndex.getNullPages().size()); + } + org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type); + statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex)); + + if (!columnIndex.getNullPages().get(pageIndex)) { + statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone()); + statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone()); + } + return statsBuilder.build(); + } else if (pageStatistics != null) { + return converter.fromParquetStatistics(createdBy, pageStatistics, type); + } else { + return null; + } + } + + private byte[] translatePageLoad(TransParquetFileReader reader, boolean isCompressed, CompressionCodecFactory.BytesInputCompressor compressor, + CompressionCodecFactory.BytesInputDecompressor decompressor, int payloadLength, int rawDataLength) throws IOException { + BytesInput data = readBlock(payloadLength, reader); + if (isCompressed) { + data = decompressor.decompress(data, rawDataLength); + } + BytesInput newCompressedData = compressor.compress(data); + return newCompressedData.toByteArray(); + } + + public BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException { + byte[] data; + if (length > pageBufferSize) { + data = new byte[length]; + } else { + data = pageBuffer; + } + reader.blockRead(data, 0, length); + return BytesInput.from(data, 0, length); + } + + public BytesInput readBlockAllocate(int length, TransParquetFileReader reader) throws IOException { + byte[] data = new byte[length]; + reader.blockRead(data, 0, length); + return BytesInput.from(data, 0, length); + } + + private int toIntWithCheck(long size) { + if ((int)size != size) { + throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size); + } + return (int)size; + } + + private static final class DummyGroupConverter extends GroupConverter { + @Override public void start() {} + @Override public void end() {} + @Override public Converter getConverter(int fieldIndex) { return new DummyConverter(); } + } + + private static final class DummyConverter extends PrimitiveConverter { + @Override public GroupConverter asGroupConverter() { return new DummyGroupConverter(); } + } + + public static final class TransParquetFileReader extends ParquetFileReader { + + public TransParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { + super(file, options); + } + + public void setStreamPosition(long newPos) throws IOException { + f.seek(newPos); + } + + public void blockRead(byte[] data, int start, int len) throws IOException { + f.readFully(data, start, len); + } + + public PageHeader readPageHeader() throws IOException { + return Util.readPageHeader(f); + } + + public long getPos() throws IOException { + return f.getPos(); + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java index 876a1f372f..0f8cdbb551 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java @@ -53,7 +53,7 @@ public void readFully(byte[] bytes) throws IOException { @Override public void readFully(byte[] bytes, int start, int len) throws IOException { - stream.readFully(bytes); + stream.readFully(bytes, start, len); } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java new file mode 100644 index 0000000000..fefa5e4f08 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.util; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.apache.parquet.schema.Type.Repetition.REPEATED; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; + +public class CompressionConveterTest { + + private Configuration conf = new Configuration(); + private Map extraMeta + = ImmutableMap.of("key1", "value1", "key2", "value2"); + private CompressionConverter compressionConverter = new CompressionConverter(); + private Random rnd = new Random(5); + + @Test + public void testTransCompression() throws Exception { + String[] codecs = {"UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD"}; + for (int i = 0; i < codecs.length; i++) { + for (int j = 0; j reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)).withConf(conf).build(); + for (int i = 0; i < numRecord; i++) { + Group group = reader.read(); + assertTrue(group.getLong("DocId", 0) == testDocs.docId[i]); + assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes()); + assertArrayEquals(group.getBinary("Gender", 0).getBytes(), testDocs.gender[i].getBytes()); + Group subGroup = group.getGroup("Links", 0); + assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(), testDocs.linkBackward[i].getBytes()); + assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes()); + } + reader.close(); + } + + private void validMeta(String inputFile, String outFile) throws Exception { + ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new Path(inputFile), NO_FILTER); + ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new Path(outFile), NO_FILTER); + Assert.assertEquals(inMetaData.getFileMetaData().getSchema(), outMetaData.getFileMetaData().getSchema()); + Assert.assertEquals(inMetaData.getFileMetaData().getKeyValueMetaData(), outMetaData.getFileMetaData().getKeyValueMetaData()); + } + + private void validColumnIndex(String inputFile, String outFile) throws Exception { + ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new Path(inputFile), NO_FILTER); + ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new Path(outFile), NO_FILTER); + Assert.assertEquals(inMetaData.getBlocks().size(), outMetaData.getBlocks().size()); + try (TransParquetFileReader inReader = new TransParquetFileReader(HadoopInputFile.fromPath(new Path(inputFile), conf), HadoopReadOptions.builder(conf).build()); + TransParquetFileReader outReader = new TransParquetFileReader(HadoopInputFile.fromPath(new Path(outFile), conf), HadoopReadOptions.builder(conf).build())) { + for (int i = 0; i < inMetaData.getBlocks().size(); i++) { + BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(i); + BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(i); + Assert.assertEquals(inBlockMetaData.getColumns().size(), outBlockMetaData.getColumns().size()); + for (int j = 0; j < inBlockMetaData.getColumns().size(); j++) { + ColumnChunkMetaData inChunk = inBlockMetaData.getColumns().get(j); + ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk); + OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk); + ColumnChunkMetaData outChunk = outBlockMetaData.getColumns().get(j); + ColumnIndex outColumnIndex = outReader.readColumnIndex(outChunk); + OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk); + if (inColumnIndex != null) { + Assert.assertEquals(inColumnIndex.getBoundaryOrder(), outColumnIndex.getBoundaryOrder()); + Assert.assertEquals(inColumnIndex.getMaxValues(), outColumnIndex.getMaxValues()); + Assert.assertEquals(inColumnIndex.getMinValues(), outColumnIndex.getMinValues()); + Assert.assertEquals(inColumnIndex.getNullCounts(), outColumnIndex.getNullCounts()); + } + if (inOffsetIndex != null) { + List inOffsets = getOffsets(inReader, inChunk); + List outOffsets = getOffsets(outReader, outChunk); + Assert.assertEquals(inOffsets.size(), outOffsets.size()); + Assert.assertEquals(inOffsets.size(), inOffsetIndex.getPageCount()); + Assert.assertEquals(inOffsetIndex.getPageCount(), outOffsetIndex.getPageCount()); + for (int k = 0; k < inOffsetIndex.getPageCount(); k++) { + Assert.assertEquals(inOffsetIndex.getFirstRowIndex(k), outOffsetIndex.getFirstRowIndex(k)); + Assert.assertEquals(inOffsetIndex.getLastRowIndex(k, inChunk.getValueCount()), + outOffsetIndex.getLastRowIndex(k, outChunk.getValueCount())); + Assert.assertEquals(inOffsetIndex.getOffset(k), (long)inOffsets.get(k)); + Assert.assertEquals(outOffsetIndex.getOffset(k), (long)outOffsets.get(k)); + } + } + } + } + } + } + + private List getOffsets(TransParquetFileReader reader, ColumnChunkMetaData chunk) throws IOException { + List offsets = new ArrayList<>(); + reader.setStreamPosition(chunk.getStartingPos()); + long readValues = 0; + long totalChunkValues = chunk.getValueCount(); + while (readValues < totalChunkValues) { + long curOffset = reader.getPos(); + PageHeader pageHeader = reader.readPageHeader(); + switch (pageHeader.type) { + case DICTIONARY_PAGE: + compressionConverter.readBlock(pageHeader.getCompressed_page_size(), reader); + break; + case DATA_PAGE: + DataPageHeader headerV1 = pageHeader.data_page_header; + offsets.add(curOffset); + compressionConverter.readBlock(pageHeader.getCompressed_page_size(), reader); + readValues += headerV1.getNum_values(); + break; + case DATA_PAGE_V2: + DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2; + offsets.add(curOffset); + int rlLength = headerV2.getRepetition_levels_byte_length(); + compressionConverter.readBlock(rlLength, reader); + int dlLength = headerV2.getDefinition_levels_byte_length(); + compressionConverter.readBlock(dlLength, reader); + int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength; + compressionConverter.readBlock(payLoadLength, reader); + readValues += headerV2.getNum_values(); + break; + default: + throw new IOException("Not recognized page type"); + } + } + return offsets; + } + + private String createParquetFile(Configuration conf, Map extraMeta, int numRecord, String prefix, String codec, + ParquetProperties.WriterVersion writerVersion, int pageSize, TestDocs testDocs) throws IOException { + MessageType schema = new MessageType("schema", + new PrimitiveType(REQUIRED, INT64, "DocId"), + new PrimitiveType(REQUIRED, BINARY, "Name"), + new PrimitiveType(REQUIRED, BINARY, "Gender"), + new GroupType(OPTIONAL, "Links", + new PrimitiveType(REPEATED, BINARY, "Backward"), + new PrimitiveType(REPEATED, BINARY, "Forward"))); + + conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()); + + String file = createTempFile(prefix); + ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file)) + .withConf(conf) + .withWriterVersion(writerVersion) + .withExtraMetaData(extraMeta) + .withDictionaryEncoding("DocId", true) + .withValidation(true) + .enablePageWriteChecksum() + .withPageSize(pageSize) + .withCompressionCodec(CompressionCodecName.valueOf(codec)); + try (ParquetWriter writer = builder.build()) { + for (int i = 0; i < numRecord; i++) { + SimpleGroup g = new SimpleGroup(schema); + g.add("DocId", testDocs.docId[i]); + g.add("Name", testDocs.name[i]); + g.add("Gender", testDocs.gender[i]); + Group links = g.addGroup("Links"); + links.add(0, testDocs.linkBackward[i]); + links.add(1, testDocs.linkForward[i]); + writer.write(g); + } + } + + return file; + } + + private static long getLong() { + return ThreadLocalRandom.current().nextLong(1000); + } + + private String getString() { + char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'}; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 100; i++) { + sb.append(chars[rnd.nextInt(10)]); + } + return sb.toString(); + } + + private String createTempFile(String prefix) { + try { + return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet"; + } catch (IOException e) { + throw new AssertionError("Unable to create temporary file", e); + } + } + + private class TestDocs { + public long[] docId; + public String[] name; + public String[] gender; + public String[] linkBackward; + public String[] linkForward; + + public TestDocs(int numRecord) { + docId = new long[numRecord]; + for (int i = 0; i < numRecord; i++) { + docId[i] = getLong(); + } + + name = new String[numRecord]; + for (int i = 0; i < numRecord; i++) { + name[i] = getString(); + } + + gender = new String[numRecord]; + for (int i = 0; i < numRecord; i++) { + gender[i] = getString(); + } + + linkBackward = new String[numRecord]; + for (int i = 0; i < numRecord; i++) { + linkBackward[i] = getString(); + } + + linkForward = new String[numRecord]; + for (int i = 0; i < numRecord; i++) { + linkForward[i] = getString(); + } + } + } +} diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java new file mode 100644 index 0000000000..1348a63ff5 --- /dev/null +++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.tools.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.CompressionConverter; +import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.MessageType; + +import java.util.List; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; + +public class TransCompressionCommand extends ArgsOnlyCommand { + + public static final String[] USAGE = new String[] { + " ", + + "where is the source parquet file", + " is the destination parquet file," + + " is the codec name in the case sensitive format to be translated to, e.g. SNAPPY, GZIP, ZSTD, LZO, LZ4, BROTLI, UNCOMPRESSED" + }; + + private Configuration conf; + private CompressionConverter compressionConverter; + + public TransCompressionCommand() { + super(3, 3); + this.conf = new Configuration(); + compressionConverter = new CompressionConverter(); + } + + public TransCompressionCommand(Configuration conf) { + super(3, 3); + this.conf = conf; + compressionConverter = new CompressionConverter(); + } + + @Override + public String[] getUsageDescription() { + return USAGE; + } + + @Override + public String getCommandDescription() { + return "Translate the compression of a given Parquet file to a new compression one to a new Parquet file."; + } + + @Override + public void execute(CommandLine options) throws Exception { + super.execute(options); + List args = options.getArgList(); + Path inPath = new Path(args.get(0)); + Path outPath = new Path(args.get(1)); + CompressionCodecName codecName = CompressionCodecName.valueOf(args.get(2)); + + ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER); + MessageType schema = metaData.getFileMetaData().getSchema(); + ParquetFileWriter writer = new ParquetFileWriter(conf, schema, outPath, ParquetFileWriter.Mode.CREATE); + writer.start(); + + try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) { + compressionConverter.processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), codecName); + } finally { + writer.end(metaData.getFileMetaData().getKeyValueMetaData()); + } + } +}