From c099c35051be1506dee2e4ff4d4febcc8c8995b1 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 9 Sep 2016 14:02:11 -0700 Subject: [PATCH 1/3] PARQUET-686: Do not return min/max for the wrong order. Min and max are currently calculated using the default Java ordering that uses signed comparison for all values. This is not correct for binary types like strings and decimals or for unsigned numeric types. This commit prevents statistics accumulated using the signed ordering from being returned by ParquetMetadataConverter when the type should use the unsigned ordering. Because many binary strings are not affected by using the wrong ordering, this adds a property, parquet.strings.use-signed-order to allow overriding this change. --- .../converter/ParquetMetadataConverter.java | 112 +++++++++++++++++- .../parquet/hadoop/ParquetFileReader.java | 23 ++-- .../TestParquetMetadataConverter.java | 52 +++++++- .../parquet/hadoop/TestParquetFileWriter.java | 2 + .../thrift/TestThriftToParquetFileWriter.java | 2 + 5 files changed, 178 insertions(+), 13 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 9eb471f26c..077b96a4e0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.CorruptStatistics; import org.apache.parquet.Log; import org.apache.parquet.format.PageEncodingStats; @@ -81,6 +82,16 @@ public class ParquetMetadataConverter { private static final Log LOG = Log.getLog(ParquetMetadataConverter.class); + private final boolean useSignedStringMinMax; + + public ParquetMetadataConverter() { + this.useSignedStringMinMax = false; + } + + public ParquetMetadataConverter(Configuration conf) { + this.useSignedStringMinMax = conf.getBoolean("parquet.strings.use-signed-order", false); + } + // NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate // sets of encodings. It is important that all collections inserted to this cache be // immutable and have thread-safe read-only access. This can be achieved by wrapping @@ -308,14 +319,25 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist return fromParquetStatistics(null, statistics, type); } + /** + * @deprecated Use {@link #fromParquetStatistics(String, Statistics, PrimitiveType)} instead. + */ + @Deprecated public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics (String createdBy, Statistics statistics, PrimitiveTypeName type) { + return fromParquetStatisticsInternal(createdBy, statistics, type, defaultSortOrder(type)); + } + + // Visible for testing + static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal + (String createdBy, Statistics statistics, PrimitiveTypeName type, SortOrder expectedOrder) { // create stats object based on the column type org.apache.parquet.column.statistics.Statistics stats = org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType(type); // If there was no statistics written to the footer, create an empty Statistics object and return // NOTE: See docs in CorruptStatistics for explanation of why this check is needed - if (statistics != null && !CorruptStatistics.shouldIgnoreStatistics(createdBy, type)) { + if (statistics != null && !CorruptStatistics.shouldIgnoreStatistics(createdBy, type) && + SortOrder.SIGNED == expectedOrder) { if (statistics.isSetMax() && statistics.isSetMin()) { stats.setMinMaxFromBytes(statistics.min.array(), statistics.max.array()); } @@ -324,6 +346,92 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist return stats; } + public org.apache.parquet.column.statistics.Statistics fromParquetStatistics( + String createdBy, Statistics statistics, PrimitiveType type) { + SortOrder expectedOrder = isSignedOrderOkay(type) ? SortOrder.SIGNED : sortOrder(type); + return fromParquetStatisticsInternal( + createdBy, statistics, type.getPrimitiveTypeName(), expectedOrder); + } + + enum SortOrder { + SIGNED, + UNSIGNED, + UNKNOWN + } + + private boolean isSignedOrderOkay(PrimitiveType type) { + if (!useSignedStringMinMax) { + return false; + } + + // even if the override is set, only return stats for string-ish types + if (type.getPrimitiveTypeName() != PrimitiveTypeName.BINARY) { + return false; + } + if (type.getOriginalType() == null) { + return true; // plain binary is okay + } + switch (type.getOriginalType()) { + case UTF8: + case ENUM: + case BSON: + case JSON: + return true; + default: // includes decimal + return false; + } + } + + private static SortOrder defaultSortOrder(PrimitiveTypeName primitive) { + switch (primitive) { + case BOOLEAN: + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + case BINARY: // without a logical type, signed is okay + case FIXED_LEN_BYTE_ARRAY: + return SortOrder.SIGNED; + case INT96: // only used for timestamp, which uses unsigned values + return SortOrder.UNSIGNED; + } + return SortOrder.UNKNOWN; + } + + private static SortOrder sortOrder(PrimitiveType primitive) { + OriginalType annotation = primitive.getOriginalType(); + if (annotation != null) { + switch (annotation) { + case INT_8: + case INT_16: + case INT_32: + case INT_64: + case DATE: + case TIME_MICROS: + case TIME_MILLIS: + case TIMESTAMP_MICROS: + case TIMESTAMP_MILLIS: + return SortOrder.SIGNED; + case UINT_8: + case UINT_16: + case UINT_32: + case UINT_64: + case DECIMAL: + case ENUM: + case UTF8: + case BSON: + case JSON: + return SortOrder.UNSIGNED; + case LIST: + case MAP: + case MAP_KEY_VALUE: + case INTERVAL: + return SortOrder.UNKNOWN; + } + } + return defaultSortOrder(primitive.getPrimitiveTypeName()); + } + public PrimitiveTypeName getPrimitive(Type type) { switch (type) { case BYTE_ARRAY: // TODO: rename BINARY and remove this switch @@ -687,7 +795,7 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws fromParquetStatistics( parquetMetadata.getCreated_by(), metaData.statistics, - messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()), + messageType.getType(path.toArray()).asPrimitiveType()), metaData.data_page_offset, metaData.dictionary_page_offset, metaData.num_values, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 57cdb7dca8..68b845867b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -24,7 +24,6 @@ import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.STATISTICS; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS; -import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC; import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE; import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE; @@ -95,6 +94,7 @@ import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.PrimitiveType; /** * Internal implementation of the Parquet file reader as a block container @@ -108,7 +108,7 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; - private static ParquetMetadataConverter converter = new ParquetMetadataConverter(); + private final ParquetMetadataConverter converter; /** * for files provided, check if there's a summary file. @@ -445,8 +445,8 @@ public static final ParquetMetadata readFooter(Configuration configuration, File public static final ParquetMetadata readFooter( InputFile file, MetadataFilter filter) throws IOException { try (SeekableInputStream in = file.newStream()) { - return readFooter(converter, file.getLength(), file.toString(), - in, filter); + return readFooter(new ParquetMetadataConverter(), file.getLength(), + file.toString(), in, filter); } } @@ -538,6 +538,7 @@ public ParquetFileReader(Configuration configuration, Path filePath, List blocks, List columns) throws IOException { + this.converter = new ParquetMetadataConverter(configuration); this.conf = configuration; this.fileMetaData = fileMetaData; FileSystem fs = filePath.getFileSystem(configuration); @@ -569,6 +570,7 @@ private ParquetFileReader(Configuration configuration, Path file) throws IOExcep * @throws IOException if the file can not be opened */ public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) throws IOException { + this.converter = new ParquetMetadataConverter(conf); this.conf = conf; FileSystem fs = file.getFileSystem(conf); this.fileStatus = fs.getFileStatus(file); @@ -592,6 +594,7 @@ public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) t * @throws IOException if the file can not be opened */ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) throws IOException { + this.converter = new ParquetMetadataConverter(conf); this.conf = conf; FileSystem fs = file.getFileSystem(conf); this.fileStatus = fs.getFileStatus(file); @@ -781,7 +784,7 @@ DictionaryPage readDictionary(ColumnChunkMetaData meta) throws IOException { compressedPage.getEncoding()); } - private static DictionaryPage readCompressedDictionary( + private DictionaryPage readCompressedDictionary( PageHeader pageHeader, SeekableInputStream fin) throws IOException { DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header(); @@ -843,6 +846,8 @@ protected PageHeader readPageHeader() throws IOException { public ColumnChunkPageReader readAllPages() throws IOException { List pagesInChunk = new ArrayList(); DictionaryPage dictionaryPage = null; + PrimitiveType type = getFileMetaData().getSchema() + .getType(descriptor.col.getPath()).asPrimitiveType(); long valuesCountReadSoFar = 0; while (valuesCountReadSoFar < descriptor.metadata.getValueCount()) { PageHeader pageHeader = readPageHeader(); @@ -870,10 +875,10 @@ public ColumnChunkPageReader readAllPages() throws IOException { this.readAsBytesInput(compressedPageSize), dataHeaderV1.getNum_values(), uncompressedPageSize, - fromParquetStatistics( + converter.fromParquetStatistics( getFileMetaData().getCreatedBy(), dataHeaderV1.getStatistics(), - descriptor.col.getType()), + type), converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()), converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()), converter.getEncoding(dataHeaderV1.getEncoding()) @@ -893,10 +898,10 @@ public ColumnChunkPageReader readAllPages() throws IOException { converter.getEncoding(dataHeaderV2.getEncoding()), this.readAsBytesInput(dataSize), uncompressedPageSize, - fromParquetStatistics( + converter.fromParquetStatistics( getFileMetaData().getCreatedBy(), dataHeaderV2.getStatistics(), - descriptor.col.getType()), + type), dataHeaderV2.isIs_compressed() )); valuesCountReadSoFar += dataHeaderV2.getNum_values(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 3c888c37d9..b7b9bdb5aa 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -45,6 +45,7 @@ import java.util.TreeSet; import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.Version; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.statistics.BinaryStatistics; @@ -401,8 +402,9 @@ public void testBinaryStats() { Assert.assertFalse("Num nulls should not be set", formatStats.isSetNull_count()); - Statistics roundTripStats = ParquetMetadataConverter.fromParquetStatistics( - Version.FULL_VERSION, formatStats, PrimitiveTypeName.BINARY); + Statistics roundTripStats = ParquetMetadataConverter.fromParquetStatisticsInternal( + Version.FULL_VERSION, formatStats, PrimitiveTypeName.BINARY, + ParquetMetadataConverter.SortOrder.SIGNED); Assert.assertTrue(roundTripStats.isEmpty()); } @@ -515,4 +517,50 @@ public void testBooleanStats() { Assert.assertEquals("Num nulls should match", 3004, formatStats.getNull_count()); } + + @Test + public void testIgnoreStatsWithSignedSortOrder() { + ParquetMetadataConverter converter = new ParquetMetadataConverter(); + BinaryStatistics stats = new BinaryStatistics(); + stats.incrementNumNulls(); + stats.updateStats(Binary.fromString("A")); + stats.incrementNumNulls(); + stats.updateStats(Binary.fromString("z")); + stats.incrementNumNulls(); + + Statistics convertedStats = converter.fromParquetStatistics( + Version.FULL_VERSION, + ParquetMetadataConverter.toParquetStatistics(stats), + Types.required(PrimitiveTypeName.BINARY) + .as(OriginalType.UTF8).named("b")); + + Assert.assertTrue("Stats should be empty", convertedStats.isEmpty()); + } + + @Test + public void testUseStatsWithSignedSortOrder() { + // override defaults and use stats that were accumulated using signed order + Configuration conf = new Configuration(); + conf.set("parquet.strings.use-signed-order", "true"); + + ParquetMetadataConverter converter = new ParquetMetadataConverter(conf); + BinaryStatistics stats = new BinaryStatistics(); + stats.incrementNumNulls(); + stats.updateStats(Binary.fromString("A")); + stats.incrementNumNulls(); + stats.updateStats(Binary.fromString("z")); + stats.incrementNumNulls(); + + Statistics convertedStats = converter.fromParquetStatistics( + Version.FULL_VERSION, + ParquetMetadataConverter.toParquetStatistics(stats), + Types.required(PrimitiveTypeName.BINARY).named("b")); + + Assert.assertFalse("Stats should not be empty", convertedStats.isEmpty()); + Assert.assertEquals("Should have 3 nulls", 3, convertedStats.getNumNulls()); + Assert.assertEquals("Should have correct min (unsigned sort)", + Binary.fromString("A"), convertedStats.genericGetMin()); + Assert.assertEquals("Should have correct max (unsigned sort)", + Binary.fromString("z"), convertedStats.genericGetMax()); + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 597daa801b..5dd4e88b15 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -452,6 +452,7 @@ public void testWriteReadStatistics() throws Exception { Path path = new Path(testFile.toURI()); Configuration configuration = new Configuration(); + configuration.set("parquet.strings.use-signed-order", "true"); MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;} required group c { required int64 d; }}"); String[] path1 = {"a", "b"}; @@ -591,6 +592,7 @@ public void testWriteReadStatisticsAllNulls() throws Exception { MessageType schema = MessageTypeParser.parseMessageType(writeSchema); Configuration configuration = new Configuration(); + configuration.set("parquet.strings.use-signed-order", "true"); GroupWriteSupport.setSchema(schema, configuration); ParquetWriter writer = new ParquetWriter(path, configuration, new GroupWriteSupport()); diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java index 1c416dd165..5b8c10f249 100644 --- a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java +++ b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java @@ -114,6 +114,7 @@ public void testWriteStatistics() throws Exception { new RequiredPrimitiveFixture(false, (byte)100, (short)100, 100, 287l, -9.0d, "world"), new RequiredPrimitiveFixture(true, (byte)2, (short)2, 9, -17l, 9.63d, "hello")); final Configuration configuration = new Configuration(); + configuration.set("parquet.strings.use-signed-order", "true"); final FileSystem fs = p.getFileSystem(configuration); FileStatus fileStatus = fs.getFileStatus(p); ParquetMetadata footer = ParquetFileReader.readFooter(configuration, p); @@ -160,6 +161,7 @@ public void testWriteStatistics() throws Exception { // make new configuration and create file with new large stats final Configuration configuration_large = new Configuration(); + configuration_large.set("parquet.strings.use-signed-order", "true"); final FileSystem fs_large = p_large.getFileSystem(configuration_large); FileStatus fileStatus_large = fs_large.getFileStatus(p_large); ParquetMetadata footer_large = ParquetFileReader.readFooter(configuration_large, p_large); From 301bd3a12ae3a87fe64c90883926f54c3899b512 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 21 Sep 2016 10:49:30 -0700 Subject: [PATCH 2/3] PARQUET-686: Address review comments. --- .../converter/ParquetMetadataConverter.java | 81 ++++++++++++------- .../TestParquetMetadataConverter.java | 5 +- .../parquet/hadoop/TestParquetFileWriter.java | 10 +-- .../thrift/TestThriftToParquetFileWriter.java | 4 +- 4 files changed, 63 insertions(+), 37 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 077b96a4e0..6481b8f7c7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -85,11 +85,15 @@ public class ParquetMetadataConverter { private final boolean useSignedStringMinMax; public ParquetMetadataConverter() { - this.useSignedStringMinMax = false; + this(false); } public ParquetMetadataConverter(Configuration conf) { - this.useSignedStringMinMax = conf.getBoolean("parquet.strings.use-signed-order", false); + this(conf.getBoolean("parquet.strings.signed-min-max.enabled", false)); + } + + private ParquetMetadataConverter(boolean useSignedStringMinMax) { + this.useSignedStringMinMax = useSignedStringMinMax; } // NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate @@ -330,14 +334,17 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist // Visible for testing static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal - (String createdBy, Statistics statistics, PrimitiveTypeName type, SortOrder expectedOrder) { + (String createdBy, Statistics statistics, PrimitiveTypeName type, SortOrder typeSortOrder) { // create stats object based on the column type org.apache.parquet.column.statistics.Statistics stats = org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType(type); // If there was no statistics written to the footer, create an empty Statistics object and return // NOTE: See docs in CorruptStatistics for explanation of why this check is needed + // The sort order is checked to avoid returning min/max stats that are not + // valid with the type's sort order. Currently, all stats are aggregated + // using a signed ordering, which isn't valid for strings or unsigned ints. if (statistics != null && !CorruptStatistics.shouldIgnoreStatistics(createdBy, type) && - SortOrder.SIGNED == expectedOrder) { + SortOrder.SIGNED == typeSortOrder) { if (statistics.isSetMax() && statistics.isSetMin()) { stats.setMinMaxFromBytes(statistics.min.array(), statistics.max.array()); } @@ -348,40 +355,56 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist public org.apache.parquet.column.statistics.Statistics fromParquetStatistics( String createdBy, Statistics statistics, PrimitiveType type) { - SortOrder expectedOrder = isSignedOrderOkay(type) ? SortOrder.SIGNED : sortOrder(type); + SortOrder expectedOrder = overrideSortOrderToSigned(type) ? + SortOrder.SIGNED : sortOrder(type); return fromParquetStatisticsInternal( createdBy, statistics, type.getPrimitiveTypeName(), expectedOrder); } + /** + * Sort order for page and column statistics. Types are associated with sort + * orders (e.g., UTF8 columns should use UNSIGNED) and column stats are + * aggregated using a sort order. As of parquet-format version 2.3.1, the + * order used to aggregate stats is always SIGNED and is not stored in the + * Parquet file. These stats are discarded for types that need unsigned. + * + * See PARQUET-686. + */ enum SortOrder { SIGNED, UNSIGNED, UNKNOWN } - private boolean isSignedOrderOkay(PrimitiveType type) { - if (!useSignedStringMinMax) { - return false; - } + private static final Set STRING_TYPES = Collections + .unmodifiableSet(new HashSet<>(Arrays.asList( + OriginalType.UTF8, OriginalType.ENUM, OriginalType.JSON + ))); + /** + * Returns whether to use signed order min and max with a type. It is safe to + * use signed min and max when the type is a string type and contains only + * ASCII characters (where the sign bit was 0). This checks whether the type + * is a string type and uses {@code useSignedStringMinMax} to determine if + * only ASCII characters were written. + * + * @param type a primitive type with a logical type annotation + * @return true if signed order min/max can be used with this type + */ + private boolean overrideSortOrderToSigned(PrimitiveType type) { // even if the override is set, only return stats for string-ish types - if (type.getPrimitiveTypeName() != PrimitiveTypeName.BINARY) { - return false; - } - if (type.getOriginalType() == null) { - return true; // plain binary is okay - } - switch (type.getOriginalType()) { - case UTF8: - case ENUM: - case BSON: - case JSON: - return true; - default: // includes decimal - return false; - } + // a null type annotation is considered string-ish because some writers + // failed to use the UTF8 annotation. + OriginalType annotation = type.getOriginalType(); + return useSignedStringMinMax && + PrimitiveTypeName.BINARY == type.getPrimitiveTypeName() && + (annotation == null || STRING_TYPES.contains(annotation)); } + /** + * @param primitive a primitive physical type + * @return the default sort order used when the logical type is not known + */ private static SortOrder defaultSortOrder(PrimitiveTypeName primitive) { switch (primitive) { case BOOLEAN: @@ -389,15 +412,19 @@ private static SortOrder defaultSortOrder(PrimitiveTypeName primitive) { case INT64: case FLOAT: case DOUBLE: - case BINARY: // without a logical type, signed is okay - case FIXED_LEN_BYTE_ARRAY: return SortOrder.SIGNED; + case BINARY: + case FIXED_LEN_BYTE_ARRAY: case INT96: // only used for timestamp, which uses unsigned values return SortOrder.UNSIGNED; } return SortOrder.UNKNOWN; } + /** + * @param primitive a primitive type with a logical type annotation + * @return the "correct" sort order of the type that applications assume + */ private static SortOrder sortOrder(PrimitiveType primitive) { OriginalType annotation = primitive.getOriginalType(); if (annotation != null) { @@ -416,12 +443,12 @@ private static SortOrder sortOrder(PrimitiveType primitive) { case UINT_16: case UINT_32: case UINT_64: - case DECIMAL: case ENUM: case UTF8: case BSON: case JSON: return SortOrder.UNSIGNED; + case DECIMAL: case LIST: case MAP: case MAP_KEY_VALUE: diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index b7b9bdb5aa..35c35c19c6 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -541,7 +541,7 @@ public void testIgnoreStatsWithSignedSortOrder() { public void testUseStatsWithSignedSortOrder() { // override defaults and use stats that were accumulated using signed order Configuration conf = new Configuration(); - conf.set("parquet.strings.use-signed-order", "true"); + conf.setBoolean("parquet.strings.signed-min-max.enabled", true); ParquetMetadataConverter converter = new ParquetMetadataConverter(conf); BinaryStatistics stats = new BinaryStatistics(); @@ -554,7 +554,8 @@ public void testUseStatsWithSignedSortOrder() { Statistics convertedStats = converter.fromParquetStatistics( Version.FULL_VERSION, ParquetMetadataConverter.toParquetStatistics(stats), - Types.required(PrimitiveTypeName.BINARY).named("b")); + Types.required(PrimitiveTypeName.BINARY) + .as(OriginalType.UTF8).named("b")); Assert.assertFalse("Stats should not be empty", convertedStats.isEmpty()); Assert.assertEquals("Should have 3 nulls", 3, convertedStats.getNumNulls()); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 5dd4e88b15..c56515fcaf 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -24,9 +24,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.parquet.CorruptStatistics; import org.apache.parquet.Version; -import org.apache.parquet.VersionParser; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.junit.Assume; @@ -452,9 +450,9 @@ public void testWriteReadStatistics() throws Exception { Path path = new Path(testFile.toURI()); Configuration configuration = new Configuration(); - configuration.set("parquet.strings.use-signed-order", "true"); + configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); - MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;} required group c { required int64 d; }}"); + MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b (UTF8);} required group c { required int64 d; }}"); String[] path1 = {"a", "b"}; ColumnDescriptor c1 = schema.getColumnDescription(path1); String[] path2 = {"c", "d"}; @@ -585,14 +583,14 @@ public void testWriteReadStatisticsAllNulls() throws Exception { testFile.delete(); writeSchema = "message example {\n" + - "required binary content;\n" + + "required binary content (UTF8);\n" + "}"; Path path = new Path(testFile.toURI()); MessageType schema = MessageTypeParser.parseMessageType(writeSchema); Configuration configuration = new Configuration(); - configuration.set("parquet.strings.use-signed-order", "true"); + configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); GroupWriteSupport.setSchema(schema, configuration); ParquetWriter writer = new ParquetWriter(path, configuration, new GroupWriteSupport()); diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java index 5b8c10f249..2407e6198a 100644 --- a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java +++ b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java @@ -114,7 +114,7 @@ public void testWriteStatistics() throws Exception { new RequiredPrimitiveFixture(false, (byte)100, (short)100, 100, 287l, -9.0d, "world"), new RequiredPrimitiveFixture(true, (byte)2, (short)2, 9, -17l, 9.63d, "hello")); final Configuration configuration = new Configuration(); - configuration.set("parquet.strings.use-signed-order", "true"); + configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); final FileSystem fs = p.getFileSystem(configuration); FileStatus fileStatus = fs.getFileStatus(p); ParquetMetadata footer = ParquetFileReader.readFooter(configuration, p); @@ -161,7 +161,7 @@ public void testWriteStatistics() throws Exception { // make new configuration and create file with new large stats final Configuration configuration_large = new Configuration(); - configuration_large.set("parquet.strings.use-signed-order", "true"); + configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); final FileSystem fs_large = p_large.getFileSystem(configuration_large); FileStatus fileStatus_large = fs_large.getFileStatus(p_large); ParquetMetadata footer_large = ParquetFileReader.readFooter(configuration_large, p_large); From f9d459ff24ab690f3178da67ae0f878b575d22a3 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 11 Oct 2016 16:29:35 -0700 Subject: [PATCH 3/3] PARQUET-686: Add getConfiguration to HadoopInputFile. This is used to avoiding extra public methods in ParquetFileReader. --- .../apache/parquet/hadoop/ParquetFileReader.java | 15 +++++++++++++-- .../parquet/hadoop/util/HadoopInputFile.java | 12 +++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 68b845867b..9e95535b7c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -52,6 +52,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -444,9 +445,19 @@ public static final ParquetMetadata readFooter(Configuration configuration, File */ public static final ParquetMetadata readFooter( InputFile file, MetadataFilter filter) throws IOException { + ParquetMetadataConverter converter; + // TODO: remove this temporary work-around. + // this is necessary to pass the Configuration to ParquetMetadataConverter + // and should be removed when there is a non-Hadoop configuration. + if (file instanceof HadoopInputFile) { + converter = new ParquetMetadataConverter( + ((HadoopInputFile) file).getConfiguration()); + } else { + converter = new ParquetMetadataConverter(); + } try (SeekableInputStream in = file.newStream()) { - return readFooter(new ParquetMetadataConverter(), file.getLength(), - file.toString(), in, filter); + + return readFooter(converter, file.getLength(), file.toString(), in, filter); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java index d5868d3011..fb876a8501 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java @@ -31,22 +31,28 @@ public class HadoopInputFile implements InputFile { private final FileSystem fs; private final FileStatus stat; + private final Configuration conf; public static HadoopInputFile fromPath(Path path, Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); - return new HadoopInputFile(fs, fs.getFileStatus(path)); + return new HadoopInputFile(fs, fs.getFileStatus(path), conf); } public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf) throws IOException { FileSystem fs = stat.getPath().getFileSystem(conf); - return new HadoopInputFile(fs, stat); + return new HadoopInputFile(fs, stat, conf); } - private HadoopInputFile(FileSystem fs, FileStatus stat) { + private HadoopInputFile(FileSystem fs, FileStatus stat, Configuration conf) { this.fs = fs; this.stat = stat; + this.conf = conf; + } + + public Configuration getConfiguration() { + return conf; } @Override