diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java index 09b1bdfe0c..c53977f091 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java @@ -57,7 +57,7 @@ * @author Julien Le Dem * */ -class ColumnReaderImpl implements ColumnReader { +public class ColumnReaderImpl implements ColumnReader { private static final Log LOG = Log.getLog(ColumnReaderImpl.class); /** @@ -149,8 +149,8 @@ public double getDouble() { private int dictionaryId; private long endOfPageValueCount; - private int readValues; - private int pageValueCount; + private int readValues = 0; + private int pageValueCount = 0; private final PrimitiveConverter converter; private Binding binding; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java new file mode 100644 index 0000000000..fa73099f76 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.statistics; + +import org.apache.parquet.io.api.Binary; +import java.math.BigInteger; +import java.util.Random; + +public class RandomValues { + private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890"; + + private static abstract class RandomValueGenerator> { + private final Random random; + + protected RandomValueGenerator(long seed) { + this.random = new Random(seed); + } + + public boolean shouldGenerateNull() { + return (random.nextInt(10) == 0); + } + + public int randomInt() { return randomInt(Integer.MAX_VALUE - 1); } + public int randomInt(int maximum) { + // Maximum may be a random number (which may be negative). + return random.nextInt(Math.abs(maximum) + 1); + } + + public long randomLong() { return random.nextLong(); } + public long randomLong(long maximum) { return randomLong() % maximum; } + + public float randomFloat() { return random.nextFloat(); } + public float randomFloat(float maximum) { return random.nextFloat() % maximum; } + + public double randomDouble() { return random.nextDouble(); } + public double randomDouble(double maximum) { return random.nextDouble() % maximum; } + + public BigInteger randomInt96() { + return new BigInteger(95, random); + } + + public BigInteger randomInt96(BigInteger maximum) { + BigInteger result; + while ((result = randomInt96()).compareTo(maximum) > 0); + return result; + } + + public char randomLetter() { + return ALPHABET.charAt(randomInt() % ALPHABET.length()); + } + + public String randomString(int maxLength) { + return randomFixedLengthString(randomInt(maxLength)); + } + + public String randomFixedLengthString(int length) { + StringBuilder builder = new StringBuilder(); + for (int index = 0; index < length; index++) { + builder.append(randomLetter()); + } + + return builder.toString(); + } + + protected abstract T nextValue(); + } + + private static abstract class RandomBinaryBase> extends RandomValueGenerator { + protected final int bufferLength; + protected final byte[] buffer; + + public RandomBinaryBase(long seed, int bufferLength) { + super(seed); + + this.bufferLength = bufferLength; + this.buffer = new byte[bufferLength]; + } + + public abstract Binary nextBinaryValue(); + + public Binary asReusedBinary(byte[] data) { + int length = Math.min(data.length, bufferLength); + System.arraycopy(data, 0, buffer, 0, length); + return Binary.fromReusedByteArray(data, 0, length); + } + } + + public static class IntGenerator extends RandomValueGenerator { + private final RandomRange randomRange = new RandomRange(randomInt(), randomInt()); + private final int minimum = randomRange.minimum(); + private final int maximum = randomRange.maximum(); + private final int range = (maximum - minimum); + + public IntGenerator(long seed) { + super(seed); + } + + @Override + protected Integer nextValue() { + return (minimum + randomInt(range)); + } + } + + public static class LongGenerator extends RandomValueGenerator { + private final RandomRange randomRange = new RandomRange(randomLong(), randomLong()); + private final long minimum = randomRange.minimum(); + private final long maximum = randomRange.maximum(); + private final long range = (maximum - minimum); + + public LongGenerator(long seed) { + super(seed); + } + + @Override + protected Long nextValue() { + return (minimum + randomLong(range)); + } + } + + public static class Int96Generator extends RandomBinaryBase { + private final RandomRange randomRange = new RandomRange(randomInt96(), randomInt96()); + private final BigInteger minimum = randomRange.minimum(); + private final BigInteger maximum = randomRange.maximum(); + private final BigInteger range = maximum.subtract(minimum); + + private static final int INT_96_LENGTH = 12; + + public Int96Generator(long seed) { + super(seed, INT_96_LENGTH); + } + + @Override + protected BigInteger nextValue() { + return (minimum.add(randomInt96(range))); + } + + @Override + public Binary nextBinaryValue() { + return asReusedBinary(nextValue().toByteArray()); + } + } + + public static class FloatGenerator extends RandomValueGenerator { + private final RandomRange randomRange = new RandomRange(randomFloat(), randomFloat()); + private final float minimum = randomRange.minimum(); + private final float maximum = randomRange.maximum(); + private final float range = (maximum - minimum); + + public FloatGenerator(long seed) { + super(seed); + } + + @Override + protected Float nextValue() { + return (minimum + randomFloat(range)); + } + } + + public static class DoubleGenerator extends RandomValueGenerator { + private final RandomRange randomRange = new RandomRange(randomDouble(), randomDouble()); + private final double minimum = randomRange.minimum(); + private final double maximum = randomRange.maximum(); + private final double range = (maximum - minimum); + + public DoubleGenerator(long seed) { + super(seed); + } + + @Override + protected Double nextValue() { + return (minimum + randomDouble(range)); + } + } + + public static class StringGenerator extends RandomBinaryBase { + private static final int MAX_STRING_LENGTH = 16; + public StringGenerator(long seed) { + super(seed, MAX_STRING_LENGTH); + } + + @Override + protected String nextValue() { + int stringLength = randomInt(15) + 1; + return randomString(stringLength); + } + + @Override + public Binary nextBinaryValue() { + return asReusedBinary(nextValue().getBytes()); + } + } + + public static class BinaryGenerator extends RandomBinaryBase { + private static final int MAX_STRING_LENGTH = 16; + public BinaryGenerator(long seed) { + super(seed, MAX_STRING_LENGTH); + } + + @Override + protected Binary nextValue() { + // use a random length, but ensure it is at least a few bytes + int length = 5 + randomInt(buffer.length - 5); + for (int index = 0; index < length; index++) { + buffer[index] = (byte) randomInt(); + } + + return Binary.fromReusedByteArray(buffer, 0, length); + } + + @Override + public Binary nextBinaryValue() { + return nextValue(); + } + } + + public static class FixedGenerator extends RandomBinaryBase { + public FixedGenerator(long seed, int length) { + super(seed, length); + } + + @Override + protected Binary nextValue() { + for (int index = 0; index < buffer.length; index++) { + buffer[index] = (byte) randomInt(); + } + + return Binary.fromReusedByteArray(buffer); + } + + @Override + public Binary nextBinaryValue() { + return nextValue(); + } + } + + private static class RandomRange> { + private T minimum; + private T maximum; + + public RandomRange(T lhs, T rhs) { + this.minimum = lhs; + this.maximum = rhs; + + if (minimum.compareTo(rhs) > 0) { + T temporary = minimum; + minimum = maximum; + maximum = temporary; + } + } + + public T minimum() { return this.minimum; } + public T maximum() { return this.maximum; } + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java new file mode 100644 index 0000000000..5bc060d2fa --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.statistics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.impl.ColumnReaderImpl; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.junit.Assert.assertTrue; + +public class TestStatistics { + private static final int MEGABYTE = 1 << 20; + private static final long RANDOM_SEED = 1441990701846L; //System.currentTimeMillis(); + + public static class DataGenerationContext { + public static abstract class WriteContext { + protected final File path; + protected final Path fsPath; + protected final MessageType schema; + protected final int blockSize; + protected final int pageSize; + protected final boolean enableDictionary; + protected final boolean enableValidation; + protected final ParquetProperties.WriterVersion version; + + public WriteContext(File path, MessageType schema, int blockSize, int pageSize, boolean enableDictionary, boolean enableValidation, ParquetProperties.WriterVersion version) throws IOException { + this.path = path; + this.fsPath = new Path(path.toString()); + this.schema = schema; + this.blockSize = blockSize; + this.pageSize = pageSize; + this.enableDictionary = enableDictionary; + this.enableValidation = enableValidation; + this.version = version; + } + + public abstract void write(ParquetWriter writer) throws IOException; + public abstract void test() throws IOException; + } + + public static void writeAndTest(WriteContext context) throws IOException { + // Create the configuration, and then apply the schema to our configuration. + Configuration configuration = new Configuration(); + GroupWriteSupport.setSchema(context.schema, configuration); + GroupWriteSupport groupWriteSupport = new GroupWriteSupport(); + + // Create the writer properties + final int blockSize = context.blockSize; + final int pageSize = context.pageSize; + final int dictionaryPageSize = pageSize; + final boolean enableDictionary = context.enableDictionary; + final boolean enableValidation = context.enableValidation; + ParquetProperties.WriterVersion writerVersion = context.version; + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + + ParquetWriter writer = new ParquetWriter(context.fsPath, + groupWriteSupport, codec, blockSize, pageSize, dictionaryPageSize, + enableDictionary, enableValidation, writerVersion, configuration); + + context.write(writer); + writer.close(); + + context.test(); + + context.path.delete(); + } + } + + public static class SingletonPageReader implements PageReader { + private final DictionaryPage dict; + private final DataPage data; + + public SingletonPageReader(DictionaryPage dict, DataPage data) { + this.dict = dict; + this.data = data; + } + + @Override + public DictionaryPage readDictionaryPage() { + return dict; + } + + @Override + public long getTotalValueCount() { + return data.getValueCount(); + } + + @Override + public DataPage readPage() { + return data; + } + } + + private static > Statistics getStatisticsFromPageHeader(DataPage page) { + return page.accept(new DataPage.Visitor>() { + @Override + @SuppressWarnings("unchecked") + public Statistics visit(DataPageV1 dataPageV1) { + return (Statistics) dataPageV1.getStatistics(); + } + + @Override + @SuppressWarnings("unchecked") + public Statistics visit(DataPageV2 dataPageV2) { + return (Statistics) dataPageV2.getStatistics(); + } + }); + } + + private static class StatsValidator> { + private final boolean hasNonNull; + private final T min; + private final T max; + + public StatsValidator(DataPage page) { + Statistics stats = getStatisticsFromPageHeader(page); + this.hasNonNull = stats.hasNonNullValue(); + if (hasNonNull) { + this.min = stats.genericGetMin(); + this.max = stats.genericGetMax(); + } else { + this.min = null; + this.max = null; + } + } + + public void validate(T value) { + if (hasNonNull) { + assertTrue("min should be <= all values", min.compareTo(value) <= 0); + assertTrue("min should be >= all values", max.compareTo(value) >= 0); + } + } + } + + private static PrimitiveConverter getValidatingConverter( + final DataPage page, PrimitiveTypeName type) { + return type.convert(new PrimitiveType.PrimitiveTypeNameConverter() { + @Override + public PrimitiveConverter convertFLOAT(PrimitiveTypeName primitiveTypeName) { + final StatsValidator validator = new StatsValidator(page); + return new PrimitiveConverter() { + @Override + public void addFloat(float value) { + validator.validate(value); + } + }; + } + + @Override + public PrimitiveConverter convertDOUBLE(PrimitiveTypeName primitiveTypeName) { + final StatsValidator validator = new StatsValidator(page); + return new PrimitiveConverter() { + @Override + public void addDouble(double value) { + validator.validate(value); + } + }; + } + + @Override + public PrimitiveConverter convertINT32(PrimitiveTypeName primitiveTypeName) { + final StatsValidator validator = new StatsValidator(page); + return new PrimitiveConverter() { + @Override + public void addInt(int value) { + validator.validate(value); + } + }; + } + + @Override + public PrimitiveConverter convertINT64(PrimitiveTypeName primitiveTypeName) { + final StatsValidator validator = new StatsValidator(page); + return new PrimitiveConverter() { + @Override + public void addLong(long value) { + validator.validate(value); + } + }; + } + + @Override + public PrimitiveConverter convertBOOLEAN(PrimitiveTypeName primitiveTypeName) { + final StatsValidator validator = new StatsValidator(page); + return new PrimitiveConverter() { + @Override + public void addBoolean(boolean value) { + validator.validate(value); + } + }; + } + + @Override + public PrimitiveConverter convertINT96(PrimitiveTypeName primitiveTypeName) { + return convertBINARY(primitiveTypeName); + } + + @Override + public PrimitiveConverter convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) { + return convertBINARY(primitiveTypeName); + } + + @Override + public PrimitiveConverter convertBINARY(PrimitiveTypeName primitiveTypeName) { + final StatsValidator validator = new StatsValidator(page); + return new PrimitiveConverter() { + @Override + public void addBinary(Binary value) { + validator.validate(value); + } + }; + } + }); + } + + public static class PageStatsValidator { + public void validate(MessageType schema, PageReadStore store) { + for (ColumnDescriptor desc : schema.getColumns()) { + PageReader reader = store.getPageReader(desc); + DictionaryPage dict = reader.readDictionaryPage(); + DataPage page; + while ((page = reader.readPage()) != null) { + validateStatsForPage(page, dict, desc); + } + } + } + + private void validateStatsForPage(DataPage page, DictionaryPage dict, ColumnDescriptor desc) { + SingletonPageReader reader = new SingletonPageReader(dict, page); + PrimitiveConverter converter = getValidatingConverter(page, desc.getType()); + Statistics stats = getStatisticsFromPageHeader(page); + + long numNulls = 0; + ColumnReaderImpl column = new ColumnReaderImpl(desc, reader, converter, null); + for (int i = 0; i < reader.getTotalValueCount(); i += 1) { + if (column.getCurrentDefinitionLevel() >= desc.getMaxDefinitionLevel()) { + column.writeCurrentValueToConverter(); + } else { + numNulls += 1; + } + column.consume(); + } + + Assert.assertEquals(numNulls, stats.getNumNulls()); + + System.err.println(String.format( + "Validated stats min=%s max=%s nulls=%d for page=%s col=%s", + String.valueOf(stats.genericGetMin()), + String.valueOf(stats.genericGetMax()), stats.getNumNulls(), page, + Arrays.toString(desc.getPath()))); + } + } + + public static class DataContext extends DataGenerationContext.WriteContext { + private static final int MAX_TOTAL_ROWS = 1000000; + + private final long seed; + private final Random random; + private final int recordCount; + + private final int fixedLength; + private final RandomValues.IntGenerator intGenerator; + private final RandomValues.LongGenerator longGenerator; + private final RandomValues.Int96Generator int96Generator; + private final RandomValues.FloatGenerator floatGenerator; + private final RandomValues.DoubleGenerator doubleGenerator; + private final RandomValues.StringGenerator stringGenerator; + private final RandomValues.BinaryGenerator binaryGenerator; + private final RandomValues.FixedGenerator fixedBinaryGenerator; + + public DataContext(long seed, File path, int blockSize, int pageSize, boolean enableDictionary, ParquetProperties.WriterVersion version) throws IOException { + super(path, buildSchema(seed), blockSize, pageSize, enableDictionary, true, version); + + this.seed = seed; + this.random = new Random(seed); + this.recordCount = random.nextInt(MAX_TOTAL_ROWS); + + this.fixedLength = schema.getType("fixed-binary").asPrimitiveType().getTypeLength(); + this.intGenerator = new RandomValues.IntGenerator(random.nextLong()); + this.longGenerator = new RandomValues.LongGenerator(random.nextLong()); + this.int96Generator = new RandomValues.Int96Generator(random.nextLong()); + this.floatGenerator = new RandomValues.FloatGenerator(random.nextLong()); + this.doubleGenerator = new RandomValues.DoubleGenerator(random.nextLong()); + this.stringGenerator = new RandomValues.StringGenerator(random.nextLong()); + this.binaryGenerator = new RandomValues.BinaryGenerator(random.nextLong()); + this.fixedBinaryGenerator = new RandomValues.FixedGenerator(random.nextLong(), fixedLength); + } + + private static MessageType buildSchema(long seed) { + Random random = new Random(seed); + int fixedBinaryLength = random.nextInt(21) + 1; + + return new MessageType("schema", + new PrimitiveType(OPTIONAL, INT32, "i32"), + new PrimitiveType(OPTIONAL, INT64, "i64"), + new PrimitiveType(OPTIONAL, INT96, "i96"), + new PrimitiveType(OPTIONAL, FLOAT, "sngl"), + new PrimitiveType(OPTIONAL, DOUBLE, "dbl"), + new PrimitiveType(OPTIONAL, BINARY, "strings"), + new PrimitiveType(OPTIONAL, BINARY, "binary"), + new PrimitiveType(OPTIONAL, FIXED_LEN_BYTE_ARRAY, fixedBinaryLength, "fixed-binary"), + new PrimitiveType(REQUIRED, INT32, "unconstrained-i32"), + new PrimitiveType(REQUIRED, INT64, "unconstrained-i64"), + new PrimitiveType(REQUIRED, FLOAT, "unconstrained-sngl"), + new PrimitiveType(REQUIRED, DOUBLE, "unconstrained-dbl") + ); + } + + @Override + public void write(ParquetWriter writer) throws IOException { + for (int index = 0; index < recordCount; index++) { + Group group = new SimpleGroup(super.schema); + + if (!intGenerator.shouldGenerateNull()) { + group.append("i32", intGenerator.nextValue()); + } + if (!longGenerator.shouldGenerateNull()) { + group.append("i64", longGenerator.nextValue()); + } + if (!int96Generator.shouldGenerateNull()) { + group.append("i96", int96Generator.nextBinaryValue()); + } + if (!floatGenerator.shouldGenerateNull()) { + group.append("sngl", floatGenerator.nextValue()); + } + if (!doubleGenerator.shouldGenerateNull()) { + group.append("dbl", doubleGenerator.nextValue()); + } + if (!stringGenerator.shouldGenerateNull()) { + group.append("strings", stringGenerator.nextBinaryValue()); + } + if (!binaryGenerator.shouldGenerateNull()) { + group.append("binary", binaryGenerator.nextBinaryValue()); + } + if (!fixedBinaryGenerator.shouldGenerateNull()) { + group.append("fixed-binary", fixedBinaryGenerator.nextBinaryValue()); + } + group.append("unconstrained-i32", random.nextInt()); + group.append("unconstrained-i64", random.nextLong()); + group.append("unconstrained-sngl", random.nextFloat()); + group.append("unconstrained-dbl", random.nextDouble()); + + writer.write(group); + } + } + + @Override + public void test() throws IOException { + Configuration configuration = new Configuration(); + ParquetMetadata metadata = ParquetFileReader.readFooter(configuration, + super.fsPath, ParquetMetadataConverter.NO_FILTER); + ParquetFileReader reader = new ParquetFileReader(configuration, + metadata.getFileMetaData(), + super.fsPath, + metadata.getBlocks(), + metadata.getFileMetaData().getSchema().getColumns()); + + PageStatsValidator validator = new PageStatsValidator(); + + PageReadStore pageReadStore; + while ((pageReadStore = reader.readNextRowGroup()) != null) { + validator.validate(metadata.getFileMetaData().getSchema(), pageReadStore); + } + } + } + + @Rule + public final TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void testStatistics() throws IOException { + File file = folder.newFile("test_file.parquet"); + file.delete(); + + System.out.println(String.format("RANDOM SEED: %s", RANDOM_SEED)); + + Random random = new Random(RANDOM_SEED); + + int blockSize =(random.nextInt(54) + 10) * MEGABYTE; + int pageSize = (random.nextInt(10) + 1) * MEGABYTE; + + List contexts = Arrays.asList( + new DataContext(random.nextLong(), file, blockSize, + pageSize, false, ParquetProperties.WriterVersion.PARQUET_1_0), + new DataContext(random.nextLong(), file, blockSize, + pageSize, true, ParquetProperties.WriterVersion.PARQUET_1_0), + new DataContext(random.nextLong(), file, blockSize, + pageSize, false, ParquetProperties.WriterVersion.PARQUET_2_0), + new DataContext(random.nextLong(), file, blockSize, + pageSize, true, ParquetProperties.WriterVersion.PARQUET_2_0) + ); + + for (DataContext test : contexts) { + DataGenerationContext.writeAndTest(test); + } + } +}