From 3f0d3aee242b91aaf27faf3341430920051768fe Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Fri, 20 Nov 2020 18:50:46 -0800 Subject: [PATCH 1/3] Add NaN value count to content file --- .../java/org/apache/iceberg/ContentFile.java | 8 +- .../java/org/apache/iceberg/DataFile.java | 14 +- .../java/org/apache/iceberg/BaseFile.java | 19 +- .../java/org/apache/iceberg/DataFiles.java | 6 +- .../org/apache/iceberg/DataTableScan.java | 2 +- .../java/org/apache/iceberg/FileMetadata.java | 6 +- .../org/apache/iceberg/GenericDataFile.java | 2 +- .../org/apache/iceberg/GenericDeleteFile.java | 2 +- .../org/apache/iceberg/ManifestReader.java | 2 +- .../java/org/apache/iceberg/MetricsModes.java | 12 +- .../java/org/apache/iceberg/V1Metadata.java | 2 +- .../java/org/apache/iceberg/V2Metadata.java | 12 +- .../iceberg/TestManifestReaderStats.java | 172 ++++++++++++++++++ .../iceberg/TestManifestWriterVersions.java | 46 +++-- .../java/org/apache/iceberg/TestMetrics.java | 15 +- .../apache/iceberg/spark/SparkDataFile.java | 7 + .../iceberg/TestDataFileSerialization.java | 10 +- .../spark/source/TestSparkDataFile.java | 1 + 18 files changed, 297 insertions(+), 41 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java index dca2fec57e61..d2ab9ea43895 100644 --- a/api/src/main/java/org/apache/iceberg/ContentFile.java +++ b/api/src/main/java/org/apache/iceberg/ContentFile.java @@ -84,6 +84,11 @@ public interface ContentFile { */ Map nullValueCounts(); + /** + * Returns if collected, map from column ID to its NaN value count, null otherwise. + */ + Map nanValueCounts(); + /** * Returns if collected, map from column ID to value lower bounds, null otherwise. */ @@ -132,7 +137,8 @@ public interface ContentFile { * Copies this file without file stats. Manifest readers can reuse file instances; use * this method to copy data without stats when collecting files. * - * @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts + * @return a copy of this data file, without lower bounds, upper bounds, value counts, + * null value counts, or nan value counts */ F copyWithoutStats(); } diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 1b8092639b9b..51195533fc1f 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -20,6 +20,7 @@ package org.apache.iceberg; import java.util.List; +import java.util.Map; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.BinaryType; import org.apache.iceberg.types.Types.IntegerType; @@ -59,10 +60,13 @@ public interface DataFile extends ContentFile { "Splittable offsets"); Types.NestedField EQUALITY_IDS = optional(135, "equality_ids", ListType.ofRequired(136, IntegerType.get()), "Equality comparison field IDs"); + Types.NestedField NAN_VALUE_COUNTS = optional(137, "nan_value_counts", MapType.ofRequired(138, 139, + IntegerType.get(), LongType.get()), "Map of column id to number of NaN values in the column"); + int PARTITION_ID = 102; String PARTITION_NAME = "partition"; String PARTITION_DOC = "Partition data tuple, schema based on the partition spec"; - // NEXT ID TO ASSIGN: 137 + // NEXT ID TO ASSIGN: 140 static StructType getType(StructType partitionType) { // IDs start at 100 to leave room for changes to ManifestEntry @@ -80,7 +84,8 @@ static StructType getType(StructType partitionType) { UPPER_BOUNDS, KEY_METADATA, SPLIT_OFFSETS, - EQUALITY_IDS + EQUALITY_IDS, + NAN_VALUE_COUNTS ); } @@ -96,4 +101,9 @@ default FileContent content() { default List equalityFieldIds() { return null; } + + @Override + default Map nanValueCounts() { + return null; + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index 9fb79c807f0a..5aa1c965f251 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -67,6 +67,7 @@ public PartitionData copy() { private Map columnSizes = null; private Map valueCounts = null; private Map nullValueCounts = null; + private Map nanValueCounts = null; private Map lowerBounds = null; private Map upperBounds = null; private long[] splitOffsets = null; @@ -116,8 +117,8 @@ public PartitionData copy() { } BaseFile(int specId, FileContent content, String filePath, FileFormat format, - PartitionData partition, long fileSizeInBytes, long recordCount, - Map columnSizes, Map valueCounts, Map nullValueCounts, + PartitionData partition, long fileSizeInBytes, long recordCount, Map columnSizes, + Map valueCounts, Map nullValueCounts, Map nanValueCounts, Map lowerBounds, Map upperBounds, List splitOffsets, int[] equalityFieldIds, ByteBuffer keyMetadata) { this.partitionSpecId = specId; @@ -140,6 +141,7 @@ public PartitionData copy() { this.columnSizes = columnSizes; this.valueCounts = valueCounts; this.nullValueCounts = nullValueCounts; + this.nanValueCounts = nanValueCounts; this.lowerBounds = SerializableByteBufferMap.wrap(lowerBounds); this.upperBounds = SerializableByteBufferMap.wrap(upperBounds); this.splitOffsets = ArrayUtil.toLongArray(splitOffsets); @@ -168,12 +170,14 @@ public PartitionData copy() { this.columnSizes = copy(toCopy.columnSizes); this.valueCounts = copy(toCopy.valueCounts); this.nullValueCounts = copy(toCopy.nullValueCounts); + this.nanValueCounts = copy(toCopy.nanValueCounts); this.lowerBounds = SerializableByteBufferMap.wrap(copy(toCopy.lowerBounds)); this.upperBounds = SerializableByteBufferMap.wrap(copy(toCopy.upperBounds)); } else { this.columnSizes = null; this.valueCounts = null; this.nullValueCounts = null; + this.nanValueCounts = null; this.lowerBounds = null; this.upperBounds = null; } @@ -262,6 +266,9 @@ public void put(int i, Object value) { this.equalityIds = ArrayUtil.toIntArray((List) value); return; case 14: + this.nanValueCounts = (Map) value; + return; + case 15: this.fileOrdinal = (long) value; return; default: @@ -311,6 +318,8 @@ public Object get(int i) { case 13: return equalityFieldIds(); case 14: + return nanValueCounts; + case 15: return pos; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); @@ -377,6 +386,11 @@ public Map nullValueCounts() { return nullValueCounts; } + @Override + public Map nanValueCounts() { + return nanValueCounts; + } + @Override public Map lowerBounds() { return lowerBounds; @@ -423,6 +437,7 @@ public String toString() { .add("column_sizes", columnSizes) .add("value_counts", valueCounts) .add("null_value_counts", nullValueCounts) + .add("nan_value_counts", nanValueCounts) .add("lower_bounds", lowerBounds) .add("upper_bounds", upperBounds) .add("key_metadata", keyMetadata == null ? "null" : "(redacted)") diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java b/core/src/main/java/org/apache/iceberg/DataFiles.java index 5268cac28eda..3753c41093fd 100644 --- a/core/src/main/java/org/apache/iceberg/DataFiles.java +++ b/core/src/main/java/org/apache/iceberg/DataFiles.java @@ -126,6 +126,7 @@ public static class Builder { private Map columnSizes = null; private Map valueCounts = null; private Map nullValueCounts = null; + private Map nanValueCounts = null; private Map lowerBounds = null; private Map upperBounds = null; private ByteBuffer keyMetadata = null; @@ -149,6 +150,7 @@ public void clear() { this.columnSizes = null; this.valueCounts = null; this.nullValueCounts = null; + this.nanValueCounts = null; this.lowerBounds = null; this.upperBounds = null; this.splitOffsets = null; @@ -166,6 +168,7 @@ public Builder copy(DataFile toCopy) { this.columnSizes = toCopy.columnSizes(); this.valueCounts = toCopy.valueCounts(); this.nullValueCounts = toCopy.nullValueCounts(); + this.nanValueCounts = toCopy.nanValueCounts(); this.lowerBounds = toCopy.lowerBounds(); this.upperBounds = toCopy.upperBounds(); this.keyMetadata = toCopy.keyMetadata() == null ? null @@ -241,6 +244,7 @@ public Builder withMetrics(Metrics metrics) { this.columnSizes = metrics.columnSizes(); this.valueCounts = metrics.valueCounts(); this.nullValueCounts = metrics.nullValueCounts(); + this.nanValueCounts = metrics.nanValueCounts(); this.lowerBounds = metrics.lowerBounds(); this.upperBounds = metrics.upperBounds(); return this; @@ -276,7 +280,7 @@ public DataFile build() { return new GenericDataFile( specId, filePath, format, isPartitioned ? partitionData.copy() : null, fileSizeInBytes, new Metrics( - recordCount, columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds), + recordCount, columnSizes, valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds), keyMetadata, splitOffsets); } } diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index 9a6b85f652b8..cf83e70f84b7 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -32,7 +32,7 @@ public class DataTableScan extends BaseTableScan { ); static final ImmutableList SCAN_WITH_STATS_COLUMNS = ImmutableList.builder() .addAll(SCAN_COLUMNS) - .add("value_counts", "null_value_counts", "lower_bounds", "upper_bounds", "column_sizes") + .add("value_counts", "null_value_counts", "nan_value_counts", "lower_bounds", "upper_bounds", "column_sizes") .build(); static final boolean PLAN_SCANS_WITH_WORKER_POOL = SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true); diff --git a/core/src/main/java/org/apache/iceberg/FileMetadata.java b/core/src/main/java/org/apache/iceberg/FileMetadata.java index 72d3ad5775bc..2fb992f7f053 100644 --- a/core/src/main/java/org/apache/iceberg/FileMetadata.java +++ b/core/src/main/java/org/apache/iceberg/FileMetadata.java @@ -54,6 +54,7 @@ public static class Builder { private Map columnSizes = null; private Map valueCounts = null; private Map nullValueCounts = null; + private Map nanValueCounts = null; private Map lowerBounds = null; private Map upperBounds = null; private ByteBuffer keyMetadata = null; @@ -76,6 +77,7 @@ public void clear() { this.columnSizes = null; this.valueCounts = null; this.nullValueCounts = null; + this.nanValueCounts = null; this.lowerBounds = null; this.upperBounds = null; } @@ -93,6 +95,7 @@ public Builder copy(DeleteFile toCopy) { this.columnSizes = toCopy.columnSizes(); this.valueCounts = toCopy.valueCounts(); this.nullValueCounts = toCopy.nullValueCounts(); + this.nanValueCounts = toCopy.nanValueCounts(); this.lowerBounds = toCopy.lowerBounds(); this.upperBounds = toCopy.upperBounds(); this.keyMetadata = toCopy.keyMetadata() == null ? null @@ -179,6 +182,7 @@ public Builder withMetrics(Metrics metrics) { this.columnSizes = metrics.columnSizes(); this.valueCounts = metrics.valueCounts(); this.nullValueCounts = metrics.nullValueCounts(); + this.nanValueCounts = metrics.nanValueCounts(); this.lowerBounds = metrics.lowerBounds(); this.upperBounds = metrics.upperBounds(); return this; @@ -206,7 +210,7 @@ public DeleteFile build() { return new GenericDeleteFile( specId, content, filePath, format, isPartitioned ? DataFiles.copy(spec, partitionData) : null, fileSizeInBytes, new Metrics( - recordCount, columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds), + recordCount, columnSizes, valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds), equalityFieldIds, keyMetadata); } } diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index 92f662eee446..dbdbe6d727df 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -38,7 +38,7 @@ class GenericDataFile extends BaseFile implements DataFile { long fileSizeInBytes, Metrics metrics, ByteBuffer keyMetadata, List splitOffsets) { super(specId, FileContent.DATA, filePath, format, partition, fileSizeInBytes, metrics.recordCount(), - metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(), + metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(), metrics.nanValueCounts(), metrics.lowerBounds(), metrics.upperBounds(), splitOffsets, null, keyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java index 73a8df7a8a38..b9900c5ac2ec 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java @@ -37,7 +37,7 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { GenericDeleteFile(int specId, FileContent content, String filePath, FileFormat format, PartitionData partition, long fileSizeInBytes, Metrics metrics, int[] equalityFieldIds, ByteBuffer keyMetadata) { super(specId, content, filePath, format, partition, fileSizeInBytes, metrics.recordCount(), - metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(), + metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(), metrics.nanValueCounts(), metrics.lowerBounds(), metrics.upperBounds(), null, equalityFieldIds, keyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index f7910799593e..c6751f381099 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -53,7 +53,7 @@ public class ManifestReader> extends CloseableGroup implements CloseableIterable { static final ImmutableList ALL_COLUMNS = ImmutableList.of("*"); static final Set STATS_COLUMNS = Sets.newHashSet( - "value_counts", "null_value_counts", "lower_bounds", "upper_bounds"); + "value_counts", "null_value_counts", "nan_value_counts", "lower_bounds", "upper_bounds"); protected enum FileType { DATA_FILES(GenericDataFile.class.getName()), diff --git a/core/src/main/java/org/apache/iceberg/MetricsModes.java b/core/src/main/java/org/apache/iceberg/MetricsModes.java index db767433fb5e..cc805f75200c 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsModes.java +++ b/core/src/main/java/org/apache/iceberg/MetricsModes.java @@ -29,7 +29,7 @@ /** * This class defines different metrics modes, which allow users to control the collection of - * value_counts, null_value_counts, lower_bounds, upper_bounds for different columns in metadata. + * value_counts, null_value_counts, nan_value_counts, lower_bounds, upper_bounds for different columns in metadata. */ public class MetricsModes { @@ -60,7 +60,7 @@ public interface MetricsMode extends Serializable { } /** - * Under this mode, value_counts, null_value_counts, lower_bounds, upper_bounds are not persisted. + * Under this mode, value_counts, null_value_counts, nan_value_counts, lower_bounds, upper_bounds are not persisted. */ public static class None extends ProxySerializableMetricsMode { private static final None INSTANCE = new None(); @@ -76,7 +76,7 @@ public String toString() { } /** - * Under this mode, only value_counts, null_value_counts are persisted. + * Under this mode, only value_counts, null_value_counts, nan_value_counts are persisted. */ public static class Counts extends ProxySerializableMetricsMode { private static final Counts INSTANCE = new Counts(); @@ -92,7 +92,8 @@ public String toString() { } /** - * Under this mode, value_counts, null_value_counts and truncated lower_bounds, upper_bounds are persisted. + * Under this mode, value_counts, null_value_counts, nan_value_counts + * and truncated lower_bounds, upper_bounds are persisted. */ public static class Truncate extends ProxySerializableMetricsMode { private final int length; @@ -133,7 +134,8 @@ public int hashCode() { } /** - * Under this mode, value_counts, null_value_counts and full lower_bounds, upper_bounds are persisted. + * Under this mode, value_counts, null_value_counts, nan_value_counts + * and full lower_bounds, upper_bounds are persisted. */ public static class Full extends ProxySerializableMetricsMode { private static final Full INSTANCE = new Full(); diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index 4b1186074267..8e5f5fb02372 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -356,7 +356,7 @@ public Object get(int pos) { @Override public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot read into IndexedDataFile"); + throw new UnsupportedOperationException("Cannot write into IndexedDataFile"); } @Override diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 55a91c95f182..fee4bc3609e2 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -254,7 +254,8 @@ static Types.StructType fileType(Types.StructType partitionType) { DataFile.UPPER_BOUNDS, DataFile.KEY_METADATA, DataFile.SPLIT_OFFSETS, - DataFile.EQUALITY_IDS + DataFile.EQUALITY_IDS, + DataFile.NAN_VALUE_COUNTS ); } @@ -406,13 +407,15 @@ public Object get(int pos) { return wrapped.splitOffsets(); case 13: return wrapped.equalityFieldIds(); + case 14: + return wrapped.nanValueCounts(); } throw new IllegalArgumentException("Unknown field ordinal: " + pos); } @Override public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot read into IndexedDataFile"); + throw new UnsupportedOperationException("Cannot write into IndexedDataFile"); } @Override @@ -470,6 +473,11 @@ public Map nullValueCounts() { return wrapped.nullValueCounts(); } + @Override + public Map nanValueCounts() { + return wrapped.nanValueCounts(); + } + @Override public Map lowerBounds() { return wrapped.lowerBounds(); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java b/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java new file mode 100644 index 000000000000..4cb0260dc06c --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestManifestReaderStats extends TableTestBase { + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] { 1, 2 }; + } + + public TestManifestReaderStats(int formatVersion) { + super(formatVersion); + } + + private static final Map VALUE_COUNT = ImmutableMap.of(3, 3L); + private static final Map NULL_VALUE_COUNTS = ImmutableMap.of(3, 0L); + private static final Map NAN_VALUE_COUNTS = ImmutableMap.of(3, 1L); + private static final Map LOWER_BOUNDS = + ImmutableMap.of(3, Conversions.toByteBuffer(Types.IntegerType.get(), 2)); + private static final Map UPPER_BOUNDS = + ImmutableMap.of(3, Conversions.toByteBuffer(Types.IntegerType.get(), 4)); + + private static final Metrics METRICS = new Metrics(3L, null, + VALUE_COUNT, NULL_VALUE_COUNTS, NAN_VALUE_COUNTS, LOWER_BOUNDS, UPPER_BOUNDS); + + private static final DataFile FILE = DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withRecordCount(3) + .withMetrics(METRICS) + .build(); + + @Test + public void testReadIncludesFullStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { + CloseableIterable> entries = reader.entries(); + ManifestEntry entry = entries.iterator().next(); + assertFullStats(entry.file()); + } + } + + @Test + public void testReadWithFilterIncludesFullStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .filterRows(Expressions.equal("id", 3))) { + CloseableIterable> entries = reader.entries(); + ManifestEntry entry = entries.iterator().next(); + assertFullStats(entry.file()); + } + } + + @Test + public void testReadEntriesWithFilterAndSelectIncludesFullStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .select(ImmutableSet.of("record_count")) + .filterRows(Expressions.equal("id", 3))) { + CloseableIterable> entries = reader.entries(); + ManifestEntry entry = entries.iterator().next(); + assertFullStats(entry.file()); + } + } + + @Test + public void testReadIteratorWithFilterAndSelectDropsStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .select(ImmutableSet.of("record_count")) + .filterRows(Expressions.equal("id", 3))) { + DataFile entry = reader.iterator().next(); + assertStatsDropped(entry); + } + } + @Test + public void testReadIteratorWithFilterAndSelectStatsIncludesFullStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .select(ImmutableSet.of("record_count", "value_counts")) + .filterRows(Expressions.equal("id", 3))) { + DataFile entry = reader.iterator().next(); + assertFullStats(entry); + } + } + + @Test + public void testReadEntriesWithSelectNotIncludeFullStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .select(ImmutableSet.of("record_count"))) { + CloseableIterable> entries = reader.entries(); + ManifestEntry entry = entries.iterator().next(); + assertStatsDropped(entry.file()); + } + } + @Test + public void testReadEntriesWithSelectCertainStatNotIncludeFullStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .select(ImmutableSet.of("record_count", "value_counts"))) { + DataFile dataFile = reader.iterator().next(); + + Assert.assertEquals(3, dataFile.recordCount()); + Assert.assertNull(dataFile.columnSizes()); + Assert.assertEquals(VALUE_COUNT, dataFile.valueCounts()); + Assert.assertNull(dataFile.nullValueCounts()); + Assert.assertNull(dataFile.lowerBounds()); + Assert.assertNull(dataFile.upperBounds()); + Assert.assertNull(dataFile.nanValueCounts()); + } + } + + private void assertFullStats(DataFile dataFile) { + Assert.assertEquals(3, dataFile.recordCount()); + Assert.assertNull(dataFile.columnSizes()); + Assert.assertEquals(VALUE_COUNT, dataFile.valueCounts()); + Assert.assertEquals(NULL_VALUE_COUNTS, dataFile.nullValueCounts()); + Assert.assertEquals(LOWER_BOUNDS, dataFile.lowerBounds()); + Assert.assertEquals(UPPER_BOUNDS, dataFile.upperBounds()); + + if (formatVersion == 1) { + Assert.assertNull(dataFile.nanValueCounts()); + } else { + Assert.assertEquals(NAN_VALUE_COUNTS, dataFile.nanValueCounts()); + } + } + + private void assertStatsDropped(DataFile dataFile) { + Assert.assertEquals(3, dataFile.recordCount()); // always select record count in all test cases + Assert.assertNull(dataFile.columnSizes()); + Assert.assertNull(dataFile.valueCounts()); + Assert.assertNull(dataFile.nullValueCounts()); + Assert.assertNull(dataFile.lowerBounds()); + Assert.assertNull(dataFile.upperBounds()); + Assert.assertNull(dataFile.nanValueCounts()); + } + +} diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index c8ede073722e..35519313870f 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -45,7 +45,8 @@ public class TestManifestWriterVersions { required(1, "id", Types.LongType.get()), required(2, "timestamp", Types.TimestampType.withZone()), required(3, "category", Types.StringType.get()), - required(4, "data", Types.StringType.get())); + required(4, "data", Types.StringType.get()), + required(5, "double", Types.DoubleType.get())); private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) .identity("category") @@ -60,9 +61,10 @@ public class TestManifestWriterVersions { private static final PartitionData PARTITION = DataFiles.data(SPEC, "category=cheesy/timestamp_hour=10/id_bucket=3"); private static final Metrics METRICS = new Metrics( 1587L, - ImmutableMap.of(1, 15L, 2, 122L, 3, 4021L, 4, 9411L), // sizes - ImmutableMap.of(1, 100L, 2, 100L, 3, 100L, 4, 100L), // value counts - ImmutableMap.of(1, 0L, 2, 0L, 3, 0L, 4, 0L), // null value counts + ImmutableMap.of(1, 15L, 2, 122L, 3, 4021L, 4, 9411L, 5, 15L), // sizes + ImmutableMap.of(1, 100L, 2, 100L, 3, 100L, 4, 100L, 5, 100L), // value counts + ImmutableMap.of(1, 0L, 2, 0L, 3, 0L, 4, 0L, 5, 0L), // null value counts + ImmutableMap.of(5, 10L), // null value counts ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1)), // lower bounds ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds private static final List OFFSETS = ImmutableList.of(4L); @@ -83,7 +85,7 @@ public class TestManifestWriterVersions { public void testV1Write() throws IOException { ManifestFile manifest = writeManifest(1); checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); - checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA); + checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA, false); } @Test @@ -99,15 +101,15 @@ public void testV1WriteWithInheritance() throws IOException { checkManifest(manifest, 0L); // v1 should be read using sequence number 0 because it was missing from the manifest list file - checkEntry(readManifest(manifest), 0L, FileContent.DATA); + checkEntry(readManifest(manifest), 0L, FileContent.DATA, false); } @Test public void testV2Write() throws IOException { - ManifestFile manifest = writeManifest(1); + ManifestFile manifest = writeManifest(2); checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); Assert.assertEquals("Content", ManifestContent.DATA, manifest.content()); - checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA); + checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA, true); } @Test @@ -117,7 +119,7 @@ public void testV2WriteWithInheritance() throws IOException { Assert.assertEquals("Content", ManifestContent.DATA, manifest.content()); // v2 should use the correct sequence number by inheriting it - checkEntry(readManifest(manifest), SEQUENCE_NUMBER, FileContent.DATA); + checkEntry(readManifest(manifest), SEQUENCE_NUMBER, FileContent.DATA, true); } @Test @@ -125,7 +127,7 @@ public void testV2WriteDelete() throws IOException { ManifestFile manifest = writeDeleteManifest(2); checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); Assert.assertEquals("Content", ManifestContent.DELETES, manifest.content()); - checkEntry(readDeleteManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.EQUALITY_DELETES); + checkEntry(readDeleteManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.EQUALITY_DELETES, true); } @Test @@ -135,7 +137,7 @@ public void testV2WriteDeleteWithInheritance() throws IOException { Assert.assertEquals("Content", ManifestContent.DELETES, manifest.content()); // v2 should use the correct sequence number by inheriting it - checkEntry(readDeleteManifest(manifest), SEQUENCE_NUMBER, FileContent.EQUALITY_DELETES); + checkEntry(readDeleteManifest(manifest), SEQUENCE_NUMBER, FileContent.EQUALITY_DELETES, true); } @Test @@ -150,7 +152,8 @@ public void testV2ManifestListRewriteWithInheritance() throws IOException { checkManifest(manifest2, 0L); // should not inherit the v2 sequence number because it was a rewrite - checkEntry(readManifest(manifest2), 0L, FileContent.DATA); + // NaN count also won't be present since v1 manifest doesn't have this information + checkEntry(readManifest(manifest2), 0L, FileContent.DATA, false); } @Test @@ -169,24 +172,26 @@ public void testV2ManifestRewriteWithInheritance() throws IOException { checkRewrittenManifest(manifest2, SEQUENCE_NUMBER, 0L); // should not inherit the v2 sequence number because it was written into the v2 manifest - checkRewrittenEntry(readManifest(manifest2), 0L, FileContent.DATA); + // NaN count also won't be present since v1 manifest doesn't have this information + checkRewrittenEntry(readManifest(manifest2), 0L, FileContent.DATA, false); } - void checkEntry(ManifestEntry entry, Long expectedSequenceNumber, FileContent content) { + void checkEntry(ManifestEntry entry, Long expectedSequenceNumber, FileContent content, boolean hasNaNCount) { Assert.assertEquals("Status", ManifestEntry.Status.ADDED, entry.status()); Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId()); Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber()); - checkDataFile(entry.file(), content); + checkDataFile(entry.file(), content, hasNaNCount); } - void checkRewrittenEntry(ManifestEntry entry, Long expectedSequenceNumber, FileContent content) { + void checkRewrittenEntry(ManifestEntry entry, Long expectedSequenceNumber, + FileContent content, boolean hasNaNCount) { Assert.assertEquals("Status", ManifestEntry.Status.EXISTING, entry.status()); Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId()); Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber()); - checkDataFile(entry.file(), content); + checkDataFile(entry.file(), content, hasNaNCount); } - void checkDataFile(ContentFile dataFile, FileContent content) { + void checkDataFile(ContentFile dataFile, FileContent content, boolean hasNaNCount) { // DataFile is the superclass of DeleteFile, so this method can check both Assert.assertEquals("Content", content, dataFile.content()); Assert.assertEquals("Path", PATH, dataFile.path()); @@ -203,6 +208,11 @@ void checkDataFile(ContentFile dataFile, FileContent content) { } else { Assert.assertNull(dataFile.equalityFieldIds()); } + if (hasNaNCount) { + Assert.assertEquals("NaN", METRICS.nanValueCounts(), dataFile.nanValueCounts()); + } else { + Assert.assertNull("NaN", dataFile.nanValueCounts()); + } } void checkManifest(ManifestFile manifest, long expectedSequenceNumber) { diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java index 6cd7f88ddf27..f1a65f6202fd 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetrics.java +++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java @@ -73,7 +73,8 @@ public abstract class TestMetrics { private static final StructType NESTED_STRUCT_TYPE = StructType.of( required(3, "longCol", LongType.get()), - required(4, "leafStructCol", LEAF_STRUCT_TYPE) + required(4, "leafStructCol", LEAF_STRUCT_TYPE), + required(7, "doubleCol", DoubleType.get()) ); private static final Schema NESTED_SCHEMA = new Schema( @@ -272,6 +273,8 @@ public void testMetricsForNestedStructFields() throws IOException { assertCounts(6, 1L, 0L, metrics); assertBounds(6, BinaryType.get(), ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); + assertCounts(7, 1L, 0L, 1L, metrics); + assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics); } private Record buildNestedTestRecord() { @@ -281,6 +284,7 @@ private Record buildNestedTestRecord() { Record nestedStruct = GenericRecord.create(NESTED_STRUCT_TYPE); nestedStruct.setField("longCol", 100L); nestedStruct.setField("leafStructCol", leafStruct); + nestedStruct.setField("doubleCol", Double.NaN); Record record = GenericRecord.create(NESTED_SCHEMA); record.setField("intCol", Integer.MAX_VALUE); record.setField("nestedStructCol", nestedStruct); @@ -475,6 +479,7 @@ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOExce Record newNestedStruct = GenericRecord.create(NESTED_STRUCT_TYPE); newNestedStruct.setField("longCol", i + 1L); newNestedStruct.setField("leafStructCol", newLeafStruct); + newNestedStruct.setField("doubleCol", Double.NaN); Record newRecord = GenericRecord.create(NESTED_SCHEMA); newRecord.setField("intCol", i + 1); newRecord.setField("nestedStructCol", newNestedStruct); @@ -500,6 +505,8 @@ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOExce assertCounts(6, 201L, 0L, metrics); assertBounds(6, BinaryType.get(), ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); + assertCounts(7, 201L, 0L, 201L, metrics); + assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics); } @Test @@ -518,6 +525,8 @@ public void testNoneMetricsMode() throws IOException { assertBounds(5, Types.LongType.get(), null, null, metrics); assertCounts(6, null, null, metrics); assertBounds(6, Types.BinaryType.get(), null, null, metrics); + assertCounts(7, null, null, metrics); + assertBounds(7, Types.DoubleType.get(), null, null, metrics); } @Test @@ -536,6 +545,8 @@ public void testCountsMetricsMode() throws IOException { assertBounds(5, Types.LongType.get(), null, null, metrics); assertCounts(6, 1L, 0L, metrics); assertBounds(6, Types.BinaryType.get(), null, null, metrics); + assertCounts(7, 1L, 0L, 1L, metrics); + assertBounds(7, Types.DoubleType.get(), null, null, metrics); } @Test @@ -555,6 +566,8 @@ public void testFullMetricsMode() throws IOException { assertCounts(6, 1L, 0L, metrics); assertBounds(6, Types.BinaryType.get(), ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); + assertCounts(7, 1L, 0L, 1L, metrics); + assertBounds(7, Types.DoubleType.get(), Double.NaN, Double.NaN, metrics); } @Test diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index 14233c949815..5f8f22721160 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -42,6 +42,7 @@ public class SparkDataFile implements DataFile { private final int columnSizesPosition; private final int valueCountsPosition; private final int nullValueCountsPosition; + private final int nanValueCountsPosition; private final int lowerBoundsPosition; private final int upperBoundsPosition; private final int keyMetadataPosition; @@ -73,6 +74,7 @@ public SparkDataFile(Types.StructType type, StructType sparkType) { columnSizesPosition = positions.get("column_sizes"); valueCountsPosition = positions.get("value_counts"); nullValueCountsPosition = positions.get("null_value_counts"); + nanValueCountsPosition = positions.get("nan_value_counts"); lowerBoundsPosition = positions.get("lower_bounds"); upperBoundsPosition = positions.get("upper_bounds"); keyMetadataPosition = positions.get("key_metadata"); @@ -138,6 +140,11 @@ public Map nullValueCounts() { return wrapped.isNullAt(nullValueCountsPosition) ? null : wrapped.getJavaMap(nullValueCountsPosition); } + @Override + public Map nanValueCounts() { + return wrapped.isNullAt(nanValueCountsPosition) ? null : wrapped.getJavaMap(nanValueCountsPosition); + } + @Override public Map lowerBounds() { Map lowerBounds = wrapped.isNullAt(lowerBoundsPosition) ? null : wrapped.getJavaMap(lowerBoundsPosition); diff --git a/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java b/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java index 2c06372087ed..fa7dcb0a393f 100644 --- a/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java +++ b/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java @@ -59,7 +59,8 @@ public class TestDataFileSerialization { private static final Schema DATE_SCHEMA = new Schema( required(1, "id", Types.LongType.get()), optional(2, "data", Types.StringType.get()), - required(3, "date", Types.StringType.get())); + required(3, "date", Types.StringType.get()), + optional(4, "double", Types.DoubleType.get())); private static final PartitionSpec PARTITION_SPEC = PartitionSpec .builderFor(DATE_SCHEMA) @@ -68,14 +69,16 @@ public class TestDataFileSerialization { private static final Map VALUE_COUNTS = Maps.newHashMap(); private static final Map NULL_VALUE_COUNTS = Maps.newHashMap(); + private static final Map NAN_VALUE_COUNTS = Maps.newHashMap(); private static final Map LOWER_BOUNDS = Maps.newHashMap(); private static final Map UPPER_BOUNDS = Maps.newHashMap(); static { VALUE_COUNTS.put(1, 5L); VALUE_COUNTS.put(2, 3L); + VALUE_COUNTS.put(4, 2L); NULL_VALUE_COUNTS.put(1, 0L); - NULL_VALUE_COUNTS.put(2, 2L); + NAN_VALUE_COUNTS.put(4, 1L); LOWER_BOUNDS.put(1, longToBuffer(0L)); UPPER_BOUNDS.put(1, longToBuffer(4L)); } @@ -85,7 +88,8 @@ public class TestDataFileSerialization { .withPath("/path/to/data-1.parquet") .withFileSizeInBytes(1234) .withPartitionPath("date=2018-06-08") - .withMetrics(new Metrics(5L, null, VALUE_COUNTS, NULL_VALUE_COUNTS, LOWER_BOUNDS, UPPER_BOUNDS)) + .withMetrics(new Metrics( + 5L, null, VALUE_COUNTS, NULL_VALUE_COUNTS, NAN_VALUE_COUNTS, LOWER_BOUNDS, UPPER_BOUNDS)) .withSplitOffsets(ImmutableList.of(4L)) .withEncryptionKeyMetadata(ByteBuffer.allocate(4).putInt(34)) .build(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java index 126d0455c326..3269db51caf7 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java @@ -193,6 +193,7 @@ private void checkDataFile(DataFile expected, DataFile actual) { Assert.assertEquals("Size must match", expected.fileSizeInBytes(), actual.fileSizeInBytes()); Assert.assertEquals("Record value counts must match", expected.valueCounts(), actual.valueCounts()); Assert.assertEquals("Record null value counts must match", expected.nullValueCounts(), actual.nullValueCounts()); + Assert.assertEquals("Record nan value counts must match", expected.nanValueCounts(), actual.nanValueCounts()); Assert.assertEquals("Lower bounds must match", expected.lowerBounds(), actual.lowerBounds()); Assert.assertEquals("Upper bounds must match", expected.upperBounds(), actual.upperBounds()); Assert.assertEquals("Key metadata must match", expected.keyMetadata(), actual.keyMetadata()); From 505a67112d8f359c00ae61975b4c710930c2738e Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Mon, 23 Nov 2020 11:33:51 -0800 Subject: [PATCH 2/3] minor update --- api/src/main/java/org/apache/iceberg/ContentFile.java | 2 +- core/src/main/java/org/apache/iceberg/BaseFile.java | 5 +++-- .../java/org/apache/iceberg/TestDataFileSerialization.java | 1 + 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java index d2ab9ea43895..3190f7b08ba4 100644 --- a/api/src/main/java/org/apache/iceberg/ContentFile.java +++ b/api/src/main/java/org/apache/iceberg/ContentFile.java @@ -138,7 +138,7 @@ public interface ContentFile { * this method to copy data without stats when collecting files. * * @return a copy of this data file, without lower bounds, upper bounds, value counts, - * null value counts, or nan value counts + * null value counts, or nan value counts */ F copyWithoutStats(); } diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index 5aa1c965f251..53d688a84434 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -117,8 +117,9 @@ public PartitionData copy() { } BaseFile(int specId, FileContent content, String filePath, FileFormat format, - PartitionData partition, long fileSizeInBytes, long recordCount, Map columnSizes, - Map valueCounts, Map nullValueCounts, Map nanValueCounts, + PartitionData partition, long fileSizeInBytes, long recordCount, + Map columnSizes, Map valueCounts, + Map nullValueCounts, Map nanValueCounts, Map lowerBounds, Map upperBounds, List splitOffsets, int[] equalityFieldIds, ByteBuffer keyMetadata) { this.partitionSpecId = specId; diff --git a/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java b/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java index fa7dcb0a393f..783a62099c11 100644 --- a/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java +++ b/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java @@ -78,6 +78,7 @@ public class TestDataFileSerialization { VALUE_COUNTS.put(2, 3L); VALUE_COUNTS.put(4, 2L); NULL_VALUE_COUNTS.put(1, 0L); + NULL_VALUE_COUNTS.put(2, 2L); NAN_VALUE_COUNTS.put(4, 1L); LOWER_BOUNDS.put(1, longToBuffer(0L)); UPPER_BOUNDS.put(1, longToBuffer(4L)); From 4fc80679c7517184250e62510e1a04284ce5b80c Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Tue, 24 Nov 2020 15:10:40 -0800 Subject: [PATCH 3/3] update attributes order, support nan in v1 metadata --- .../java/org/apache/iceberg/DataFile.java | 14 ++----- .../java/org/apache/iceberg/TestHelpers.java | 5 +++ .../java/org/apache/iceberg/BaseFile.java | 24 ++++++------ .../java/org/apache/iceberg/V1Metadata.java | 16 ++++++-- .../java/org/apache/iceberg/V2Metadata.java | 18 ++++----- .../iceberg/TestManifestReaderStats.java | 7 +--- .../iceberg/TestManifestWriterVersions.java | 37 ++++++++----------- 7 files changed, 58 insertions(+), 63 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 51195533fc1f..a763f9565631 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -20,7 +20,6 @@ package org.apache.iceberg; import java.util.List; -import java.util.Map; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.BinaryType; import org.apache.iceberg.types.Types.IntegerType; @@ -51,6 +50,8 @@ public interface DataFile extends ContentFile { IntegerType.get(), LongType.get()), "Map of column id to total count, including null and NaN"); Types.NestedField NULL_VALUE_COUNTS = optional(110, "null_value_counts", MapType.ofRequired(121, 122, IntegerType.get(), LongType.get()), "Map of column id to null value count"); + Types.NestedField NAN_VALUE_COUNTS = optional(137, "nan_value_counts", MapType.ofRequired(138, 139, + IntegerType.get(), LongType.get()), "Map of column id to number of NaN values in the column"); Types.NestedField LOWER_BOUNDS = optional(125, "lower_bounds", MapType.ofRequired(126, 127, IntegerType.get(), BinaryType.get()), "Map of column id to lower bound"); Types.NestedField UPPER_BOUNDS = optional(128, "upper_bounds", MapType.ofRequired(129, 130, @@ -60,8 +61,6 @@ public interface DataFile extends ContentFile { "Splittable offsets"); Types.NestedField EQUALITY_IDS = optional(135, "equality_ids", ListType.ofRequired(136, IntegerType.get()), "Equality comparison field IDs"); - Types.NestedField NAN_VALUE_COUNTS = optional(137, "nan_value_counts", MapType.ofRequired(138, 139, - IntegerType.get(), LongType.get()), "Map of column id to number of NaN values in the column"); int PARTITION_ID = 102; String PARTITION_NAME = "partition"; @@ -80,12 +79,12 @@ static StructType getType(StructType partitionType) { COLUMN_SIZES, VALUE_COUNTS, NULL_VALUE_COUNTS, + NAN_VALUE_COUNTS, LOWER_BOUNDS, UPPER_BOUNDS, KEY_METADATA, SPLIT_OFFSETS, - EQUALITY_IDS, - NAN_VALUE_COUNTS + EQUALITY_IDS ); } @@ -101,9 +100,4 @@ default FileContent content() { default List equalityFieldIds() { return null; } - - @Override - default Map nanValueCounts() { - return null; - } } diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index efdceba0806d..92d87ac66045 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -382,6 +382,11 @@ public Map nullValueCounts() { return nullValueCounts; } + @Override + public Map nanValueCounts() { + return null; // will be updated in a separate pr soon + } + @Override public Map lowerBounds() { return lowerBounds; diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index 53d688a84434..1ca5346e31ca 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -252,22 +252,22 @@ public void put(int i, Object value) { this.nullValueCounts = (Map) value; return; case 9: - this.lowerBounds = SerializableByteBufferMap.wrap((Map) value); + this.nanValueCounts = (Map) value; return; case 10: - this.upperBounds = SerializableByteBufferMap.wrap((Map) value); + this.lowerBounds = SerializableByteBufferMap.wrap((Map) value); return; case 11: - this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); + this.upperBounds = SerializableByteBufferMap.wrap((Map) value); return; case 12: - this.splitOffsets = ArrayUtil.toLongArray((List) value); + this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); return; case 13: - this.equalityIds = ArrayUtil.toIntArray((List) value); + this.splitOffsets = ArrayUtil.toLongArray((List) value); return; case 14: - this.nanValueCounts = (Map) value; + this.equalityIds = ArrayUtil.toIntArray((List) value); return; case 15: this.fileOrdinal = (long) value; @@ -309,17 +309,17 @@ public Object get(int i) { case 8: return nullValueCounts; case 9: - return lowerBounds; + return nanValueCounts; case 10: - return upperBounds; + return lowerBounds; case 11: - return keyMetadata(); + return upperBounds; case 12: - return splitOffsets(); + return keyMetadata(); case 13: - return equalityFieldIds(); + return splitOffsets(); case 14: - return nanValueCounts; + return equalityFieldIds(); case 15: return pos; default: diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index 8e5f5fb02372..4b035830decf 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -210,6 +210,7 @@ static Types.StructType dataFileSchema(Types.StructType partitionType) { DataFile.COLUMN_SIZES, DataFile.VALUE_COUNTS, DataFile.NULL_VALUE_COUNTS, + DataFile.NAN_VALUE_COUNTS, DataFile.LOWER_BOUNDS, DataFile.UPPER_BOUNDS, DataFile.KEY_METADATA, @@ -343,12 +344,14 @@ public Object get(int pos) { case 8: return wrapped.nullValueCounts(); case 9: - return wrapped.lowerBounds(); + return wrapped.nanValueCounts(); case 10: - return wrapped.upperBounds(); + return wrapped.lowerBounds(); case 11: - return wrapped.keyMetadata(); + return wrapped.upperBounds(); case 12: + return wrapped.keyMetadata(); + case 13: return wrapped.splitOffsets(); } throw new IllegalArgumentException("Unknown field ordinal: " + pos); @@ -356,7 +359,7 @@ public Object get(int pos) { @Override public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot write into IndexedDataFile"); + throw new UnsupportedOperationException("Cannot read into IndexedDataFile"); } @Override @@ -419,6 +422,11 @@ public Map nullValueCounts() { return wrapped.nullValueCounts(); } + @Override + public Map nanValueCounts() { + return wrapped.nanValueCounts(); + } + @Override public Map lowerBounds() { return wrapped.lowerBounds(); diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index fee4bc3609e2..a468632f4159 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -250,12 +250,12 @@ static Types.StructType fileType(Types.StructType partitionType) { DataFile.COLUMN_SIZES, DataFile.VALUE_COUNTS, DataFile.NULL_VALUE_COUNTS, + DataFile.NAN_VALUE_COUNTS, DataFile.LOWER_BOUNDS, DataFile.UPPER_BOUNDS, DataFile.KEY_METADATA, DataFile.SPLIT_OFFSETS, - DataFile.EQUALITY_IDS, - DataFile.NAN_VALUE_COUNTS + DataFile.EQUALITY_IDS ); } @@ -398,24 +398,24 @@ public Object get(int pos) { case 8: return wrapped.nullValueCounts(); case 9: - return wrapped.lowerBounds(); + return wrapped.nanValueCounts(); case 10: - return wrapped.upperBounds(); + return wrapped.lowerBounds(); case 11: - return wrapped.keyMetadata(); + return wrapped.upperBounds(); case 12: - return wrapped.splitOffsets(); + return wrapped.keyMetadata(); case 13: - return wrapped.equalityFieldIds(); + return wrapped.splitOffsets(); case 14: - return wrapped.nanValueCounts(); + return wrapped.equalityFieldIds(); } throw new IllegalArgumentException("Unknown field ordinal: " + pos); } @Override public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot write into IndexedDataFile"); + throw new UnsupportedOperationException("Cannot read into IndexedDataFile"); } @Override diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java b/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java index 4cb0260dc06c..8d5f8b63046d 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java @@ -151,12 +151,7 @@ private void assertFullStats(DataFile dataFile) { Assert.assertEquals(NULL_VALUE_COUNTS, dataFile.nullValueCounts()); Assert.assertEquals(LOWER_BOUNDS, dataFile.lowerBounds()); Assert.assertEquals(UPPER_BOUNDS, dataFile.upperBounds()); - - if (formatVersion == 1) { - Assert.assertNull(dataFile.nanValueCounts()); - } else { - Assert.assertEquals(NAN_VALUE_COUNTS, dataFile.nanValueCounts()); - } + Assert.assertEquals(NAN_VALUE_COUNTS, dataFile.nanValueCounts()); } private void assertStatsDropped(DataFile dataFile) { diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index 35519313870f..f2cd7bb7fd4a 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -64,7 +64,7 @@ public class TestManifestWriterVersions { ImmutableMap.of(1, 15L, 2, 122L, 3, 4021L, 4, 9411L, 5, 15L), // sizes ImmutableMap.of(1, 100L, 2, 100L, 3, 100L, 4, 100L, 5, 100L), // value counts ImmutableMap.of(1, 0L, 2, 0L, 3, 0L, 4, 0L, 5, 0L), // null value counts - ImmutableMap.of(5, 10L), // null value counts + ImmutableMap.of(5, 10L), // nan value counts ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1)), // lower bounds ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds private static final List OFFSETS = ImmutableList.of(4L); @@ -85,7 +85,7 @@ public class TestManifestWriterVersions { public void testV1Write() throws IOException { ManifestFile manifest = writeManifest(1); checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); - checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA, false); + checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA); } @Test @@ -101,7 +101,7 @@ public void testV1WriteWithInheritance() throws IOException { checkManifest(manifest, 0L); // v1 should be read using sequence number 0 because it was missing from the manifest list file - checkEntry(readManifest(manifest), 0L, FileContent.DATA, false); + checkEntry(readManifest(manifest), 0L, FileContent.DATA); } @Test @@ -109,7 +109,7 @@ public void testV2Write() throws IOException { ManifestFile manifest = writeManifest(2); checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); Assert.assertEquals("Content", ManifestContent.DATA, manifest.content()); - checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA, true); + checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA); } @Test @@ -119,7 +119,7 @@ public void testV2WriteWithInheritance() throws IOException { Assert.assertEquals("Content", ManifestContent.DATA, manifest.content()); // v2 should use the correct sequence number by inheriting it - checkEntry(readManifest(manifest), SEQUENCE_NUMBER, FileContent.DATA, true); + checkEntry(readManifest(manifest), SEQUENCE_NUMBER, FileContent.DATA); } @Test @@ -127,7 +127,7 @@ public void testV2WriteDelete() throws IOException { ManifestFile manifest = writeDeleteManifest(2); checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); Assert.assertEquals("Content", ManifestContent.DELETES, manifest.content()); - checkEntry(readDeleteManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.EQUALITY_DELETES, true); + checkEntry(readDeleteManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.EQUALITY_DELETES); } @Test @@ -137,7 +137,7 @@ public void testV2WriteDeleteWithInheritance() throws IOException { Assert.assertEquals("Content", ManifestContent.DELETES, manifest.content()); // v2 should use the correct sequence number by inheriting it - checkEntry(readDeleteManifest(manifest), SEQUENCE_NUMBER, FileContent.EQUALITY_DELETES, true); + checkEntry(readDeleteManifest(manifest), SEQUENCE_NUMBER, FileContent.EQUALITY_DELETES); } @Test @@ -152,8 +152,7 @@ public void testV2ManifestListRewriteWithInheritance() throws IOException { checkManifest(manifest2, 0L); // should not inherit the v2 sequence number because it was a rewrite - // NaN count also won't be present since v1 manifest doesn't have this information - checkEntry(readManifest(manifest2), 0L, FileContent.DATA, false); + checkEntry(readManifest(manifest2), 0L, FileContent.DATA); } @Test @@ -172,26 +171,24 @@ public void testV2ManifestRewriteWithInheritance() throws IOException { checkRewrittenManifest(manifest2, SEQUENCE_NUMBER, 0L); // should not inherit the v2 sequence number because it was written into the v2 manifest - // NaN count also won't be present since v1 manifest doesn't have this information - checkRewrittenEntry(readManifest(manifest2), 0L, FileContent.DATA, false); + checkRewrittenEntry(readManifest(manifest2), 0L, FileContent.DATA); } - void checkEntry(ManifestEntry entry, Long expectedSequenceNumber, FileContent content, boolean hasNaNCount) { + void checkEntry(ManifestEntry entry, Long expectedSequenceNumber, FileContent content) { Assert.assertEquals("Status", ManifestEntry.Status.ADDED, entry.status()); Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId()); Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber()); - checkDataFile(entry.file(), content, hasNaNCount); + checkDataFile(entry.file(), content); } - void checkRewrittenEntry(ManifestEntry entry, Long expectedSequenceNumber, - FileContent content, boolean hasNaNCount) { + void checkRewrittenEntry(ManifestEntry entry, Long expectedSequenceNumber, FileContent content) { Assert.assertEquals("Status", ManifestEntry.Status.EXISTING, entry.status()); Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId()); Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber()); - checkDataFile(entry.file(), content, hasNaNCount); + checkDataFile(entry.file(), content); } - void checkDataFile(ContentFile dataFile, FileContent content, boolean hasNaNCount) { + void checkDataFile(ContentFile dataFile, FileContent content) { // DataFile is the superclass of DeleteFile, so this method can check both Assert.assertEquals("Content", content, dataFile.content()); Assert.assertEquals("Path", PATH, dataFile.path()); @@ -201,6 +198,7 @@ void checkDataFile(ContentFile dataFile, FileContent content, boolean hasNaNC Assert.assertEquals("Column sizes", METRICS.columnSizes(), dataFile.columnSizes()); Assert.assertEquals("Value counts", METRICS.valueCounts(), dataFile.valueCounts()); Assert.assertEquals("Null value counts", METRICS.nullValueCounts(), dataFile.nullValueCounts()); + Assert.assertEquals("NaN value counts", METRICS.nanValueCounts(), dataFile.nanValueCounts()); Assert.assertEquals("Lower bounds", METRICS.lowerBounds(), dataFile.lowerBounds()); Assert.assertEquals("Upper bounds", METRICS.upperBounds(), dataFile.upperBounds()); if (dataFile.content() == FileContent.EQUALITY_DELETES) { @@ -208,11 +206,6 @@ void checkDataFile(ContentFile dataFile, FileContent content, boolean hasNaNC } else { Assert.assertNull(dataFile.equalityFieldIds()); } - if (hasNaNCount) { - Assert.assertEquals("NaN", METRICS.nanValueCounts(), dataFile.nanValueCounts()); - } else { - Assert.assertNull("NaN", dataFile.nanValueCounts()); - } } void checkManifest(ManifestFile manifest, long expectedSequenceNumber) {