diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java index dca2fec57e61..3190f7b08ba4 100644 --- a/api/src/main/java/org/apache/iceberg/ContentFile.java +++ b/api/src/main/java/org/apache/iceberg/ContentFile.java @@ -84,6 +84,11 @@ public interface ContentFile { */ Map nullValueCounts(); + /** + * Returns if collected, map from column ID to its NaN value count, null otherwise. + */ + Map nanValueCounts(); + /** * Returns if collected, map from column ID to value lower bounds, null otherwise. */ @@ -132,7 +137,8 @@ public interface ContentFile { * Copies this file without file stats. Manifest readers can reuse file instances; use * this method to copy data without stats when collecting files. * - * @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts + * @return a copy of this data file, without lower bounds, upper bounds, value counts, + * null value counts, or nan value counts */ F copyWithoutStats(); } diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 1b8092639b9b..a763f9565631 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -50,6 +50,8 @@ public interface DataFile extends ContentFile { IntegerType.get(), LongType.get()), "Map of column id to total count, including null and NaN"); Types.NestedField NULL_VALUE_COUNTS = optional(110, "null_value_counts", MapType.ofRequired(121, 122, IntegerType.get(), LongType.get()), "Map of column id to null value count"); + Types.NestedField NAN_VALUE_COUNTS = optional(137, "nan_value_counts", MapType.ofRequired(138, 139, + IntegerType.get(), LongType.get()), "Map of column id to number of NaN values in the column"); Types.NestedField LOWER_BOUNDS = optional(125, "lower_bounds", MapType.ofRequired(126, 127, IntegerType.get(), BinaryType.get()), "Map of column id to lower bound"); Types.NestedField UPPER_BOUNDS = optional(128, "upper_bounds", MapType.ofRequired(129, 130, @@ -59,10 +61,11 @@ public interface DataFile extends ContentFile { "Splittable offsets"); Types.NestedField EQUALITY_IDS = optional(135, "equality_ids", ListType.ofRequired(136, IntegerType.get()), "Equality comparison field IDs"); + int PARTITION_ID = 102; String PARTITION_NAME = "partition"; String PARTITION_DOC = "Partition data tuple, schema based on the partition spec"; - // NEXT ID TO ASSIGN: 137 + // NEXT ID TO ASSIGN: 140 static StructType getType(StructType partitionType) { // IDs start at 100 to leave room for changes to ManifestEntry @@ -76,6 +79,7 @@ static StructType getType(StructType partitionType) { COLUMN_SIZES, VALUE_COUNTS, NULL_VALUE_COUNTS, + NAN_VALUE_COUNTS, LOWER_BOUNDS, UPPER_BOUNDS, KEY_METADATA, diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index efdceba0806d..92d87ac66045 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -382,6 +382,11 @@ public Map nullValueCounts() { return nullValueCounts; } + @Override + public Map nanValueCounts() { + return null; // will be updated in a separate pr soon + } + @Override public Map lowerBounds() { return lowerBounds; diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index 9fb79c807f0a..1ca5346e31ca 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -67,6 +67,7 @@ public PartitionData copy() { private Map columnSizes = null; private Map valueCounts = null; private Map nullValueCounts = null; + private Map nanValueCounts = null; private Map lowerBounds = null; private Map upperBounds = null; private long[] splitOffsets = null; @@ -117,7 +118,8 @@ public PartitionData copy() { BaseFile(int specId, FileContent content, String filePath, FileFormat format, PartitionData partition, long fileSizeInBytes, long recordCount, - Map columnSizes, Map valueCounts, Map nullValueCounts, + Map columnSizes, Map valueCounts, + Map nullValueCounts, Map nanValueCounts, Map lowerBounds, Map upperBounds, List splitOffsets, int[] equalityFieldIds, ByteBuffer keyMetadata) { this.partitionSpecId = specId; @@ -140,6 +142,7 @@ public PartitionData copy() { this.columnSizes = columnSizes; this.valueCounts = valueCounts; this.nullValueCounts = nullValueCounts; + this.nanValueCounts = nanValueCounts; this.lowerBounds = SerializableByteBufferMap.wrap(lowerBounds); this.upperBounds = SerializableByteBufferMap.wrap(upperBounds); this.splitOffsets = ArrayUtil.toLongArray(splitOffsets); @@ -168,12 +171,14 @@ public PartitionData copy() { this.columnSizes = copy(toCopy.columnSizes); this.valueCounts = copy(toCopy.valueCounts); this.nullValueCounts = copy(toCopy.nullValueCounts); + this.nanValueCounts = copy(toCopy.nanValueCounts); this.lowerBounds = SerializableByteBufferMap.wrap(copy(toCopy.lowerBounds)); this.upperBounds = SerializableByteBufferMap.wrap(copy(toCopy.upperBounds)); } else { this.columnSizes = null; this.valueCounts = null; this.nullValueCounts = null; + this.nanValueCounts = null; this.lowerBounds = null; this.upperBounds = null; } @@ -247,21 +252,24 @@ public void put(int i, Object value) { this.nullValueCounts = (Map) value; return; case 9: - this.lowerBounds = SerializableByteBufferMap.wrap((Map) value); + this.nanValueCounts = (Map) value; return; case 10: - this.upperBounds = SerializableByteBufferMap.wrap((Map) value); + this.lowerBounds = SerializableByteBufferMap.wrap((Map) value); return; case 11: - this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); + this.upperBounds = SerializableByteBufferMap.wrap((Map) value); return; case 12: - this.splitOffsets = ArrayUtil.toLongArray((List) value); + this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); return; case 13: - this.equalityIds = ArrayUtil.toIntArray((List) value); + this.splitOffsets = ArrayUtil.toLongArray((List) value); return; case 14: + this.equalityIds = ArrayUtil.toIntArray((List) value); + return; + case 15: this.fileOrdinal = (long) value; return; default: @@ -301,16 +309,18 @@ public Object get(int i) { case 8: return nullValueCounts; case 9: - return lowerBounds; + return nanValueCounts; case 10: - return upperBounds; + return lowerBounds; case 11: - return keyMetadata(); + return upperBounds; case 12: - return splitOffsets(); + return keyMetadata(); case 13: - return equalityFieldIds(); + return splitOffsets(); case 14: + return equalityFieldIds(); + case 15: return pos; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); @@ -377,6 +387,11 @@ public Map nullValueCounts() { return nullValueCounts; } + @Override + public Map nanValueCounts() { + return nanValueCounts; + } + @Override public Map lowerBounds() { return lowerBounds; @@ -423,6 +438,7 @@ public String toString() { .add("column_sizes", columnSizes) .add("value_counts", valueCounts) .add("null_value_counts", nullValueCounts) + .add("nan_value_counts", nanValueCounts) .add("lower_bounds", lowerBounds) .add("upper_bounds", upperBounds) .add("key_metadata", keyMetadata == null ? "null" : "(redacted)") diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java b/core/src/main/java/org/apache/iceberg/DataFiles.java index 5268cac28eda..3753c41093fd 100644 --- a/core/src/main/java/org/apache/iceberg/DataFiles.java +++ b/core/src/main/java/org/apache/iceberg/DataFiles.java @@ -126,6 +126,7 @@ public static class Builder { private Map columnSizes = null; private Map valueCounts = null; private Map nullValueCounts = null; + private Map nanValueCounts = null; private Map lowerBounds = null; private Map upperBounds = null; private ByteBuffer keyMetadata = null; @@ -149,6 +150,7 @@ public void clear() { this.columnSizes = null; this.valueCounts = null; this.nullValueCounts = null; + this.nanValueCounts = null; this.lowerBounds = null; this.upperBounds = null; this.splitOffsets = null; @@ -166,6 +168,7 @@ public Builder copy(DataFile toCopy) { this.columnSizes = toCopy.columnSizes(); this.valueCounts = toCopy.valueCounts(); this.nullValueCounts = toCopy.nullValueCounts(); + this.nanValueCounts = toCopy.nanValueCounts(); this.lowerBounds = toCopy.lowerBounds(); this.upperBounds = toCopy.upperBounds(); this.keyMetadata = toCopy.keyMetadata() == null ? null @@ -241,6 +244,7 @@ public Builder withMetrics(Metrics metrics) { this.columnSizes = metrics.columnSizes(); this.valueCounts = metrics.valueCounts(); this.nullValueCounts = metrics.nullValueCounts(); + this.nanValueCounts = metrics.nanValueCounts(); this.lowerBounds = metrics.lowerBounds(); this.upperBounds = metrics.upperBounds(); return this; @@ -276,7 +280,7 @@ public DataFile build() { return new GenericDataFile( specId, filePath, format, isPartitioned ? partitionData.copy() : null, fileSizeInBytes, new Metrics( - recordCount, columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds), + recordCount, columnSizes, valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds), keyMetadata, splitOffsets); } } diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index 9a6b85f652b8..cf83e70f84b7 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -32,7 +32,7 @@ public class DataTableScan extends BaseTableScan { ); static final ImmutableList SCAN_WITH_STATS_COLUMNS = ImmutableList.builder() .addAll(SCAN_COLUMNS) - .add("value_counts", "null_value_counts", "lower_bounds", "upper_bounds", "column_sizes") + .add("value_counts", "null_value_counts", "nan_value_counts", "lower_bounds", "upper_bounds", "column_sizes") .build(); static final boolean PLAN_SCANS_WITH_WORKER_POOL = SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true); diff --git a/core/src/main/java/org/apache/iceberg/FileMetadata.java b/core/src/main/java/org/apache/iceberg/FileMetadata.java index 72d3ad5775bc..2fb992f7f053 100644 --- a/core/src/main/java/org/apache/iceberg/FileMetadata.java +++ b/core/src/main/java/org/apache/iceberg/FileMetadata.java @@ -54,6 +54,7 @@ public static class Builder { private Map columnSizes = null; private Map valueCounts = null; private Map nullValueCounts = null; + private Map nanValueCounts = null; private Map lowerBounds = null; private Map upperBounds = null; private ByteBuffer keyMetadata = null; @@ -76,6 +77,7 @@ public void clear() { this.columnSizes = null; this.valueCounts = null; this.nullValueCounts = null; + this.nanValueCounts = null; this.lowerBounds = null; this.upperBounds = null; } @@ -93,6 +95,7 @@ public Builder copy(DeleteFile toCopy) { this.columnSizes = toCopy.columnSizes(); this.valueCounts = toCopy.valueCounts(); this.nullValueCounts = toCopy.nullValueCounts(); + this.nanValueCounts = toCopy.nanValueCounts(); this.lowerBounds = toCopy.lowerBounds(); this.upperBounds = toCopy.upperBounds(); this.keyMetadata = toCopy.keyMetadata() == null ? null @@ -179,6 +182,7 @@ public Builder withMetrics(Metrics metrics) { this.columnSizes = metrics.columnSizes(); this.valueCounts = metrics.valueCounts(); this.nullValueCounts = metrics.nullValueCounts(); + this.nanValueCounts = metrics.nanValueCounts(); this.lowerBounds = metrics.lowerBounds(); this.upperBounds = metrics.upperBounds(); return this; @@ -206,7 +210,7 @@ public DeleteFile build() { return new GenericDeleteFile( specId, content, filePath, format, isPartitioned ? DataFiles.copy(spec, partitionData) : null, fileSizeInBytes, new Metrics( - recordCount, columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds), + recordCount, columnSizes, valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds), equalityFieldIds, keyMetadata); } } diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index 92f662eee446..dbdbe6d727df 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -38,7 +38,7 @@ class GenericDataFile extends BaseFile implements DataFile { long fileSizeInBytes, Metrics metrics, ByteBuffer keyMetadata, List splitOffsets) { super(specId, FileContent.DATA, filePath, format, partition, fileSizeInBytes, metrics.recordCount(), - metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(), + metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(), metrics.nanValueCounts(), metrics.lowerBounds(), metrics.upperBounds(), splitOffsets, null, keyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java index 73a8df7a8a38..b9900c5ac2ec 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java @@ -37,7 +37,7 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { GenericDeleteFile(int specId, FileContent content, String filePath, FileFormat format, PartitionData partition, long fileSizeInBytes, Metrics metrics, int[] equalityFieldIds, ByteBuffer keyMetadata) { super(specId, content, filePath, format, partition, fileSizeInBytes, metrics.recordCount(), - metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(), + metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(), metrics.nanValueCounts(), metrics.lowerBounds(), metrics.upperBounds(), null, equalityFieldIds, keyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index f7910799593e..c6751f381099 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -53,7 +53,7 @@ public class ManifestReader> extends CloseableGroup implements CloseableIterable { static final ImmutableList ALL_COLUMNS = ImmutableList.of("*"); static final Set STATS_COLUMNS = Sets.newHashSet( - "value_counts", "null_value_counts", "lower_bounds", "upper_bounds"); + "value_counts", "null_value_counts", "nan_value_counts", "lower_bounds", "upper_bounds"); protected enum FileType { DATA_FILES(GenericDataFile.class.getName()), diff --git a/core/src/main/java/org/apache/iceberg/MetricsModes.java b/core/src/main/java/org/apache/iceberg/MetricsModes.java index db767433fb5e..cc805f75200c 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsModes.java +++ b/core/src/main/java/org/apache/iceberg/MetricsModes.java @@ -29,7 +29,7 @@ /** * This class defines different metrics modes, which allow users to control the collection of - * value_counts, null_value_counts, lower_bounds, upper_bounds for different columns in metadata. + * value_counts, null_value_counts, nan_value_counts, lower_bounds, upper_bounds for different columns in metadata. */ public class MetricsModes { @@ -60,7 +60,7 @@ public interface MetricsMode extends Serializable { } /** - * Under this mode, value_counts, null_value_counts, lower_bounds, upper_bounds are not persisted. + * Under this mode, value_counts, null_value_counts, nan_value_counts, lower_bounds, upper_bounds are not persisted. */ public static class None extends ProxySerializableMetricsMode { private static final None INSTANCE = new None(); @@ -76,7 +76,7 @@ public String toString() { } /** - * Under this mode, only value_counts, null_value_counts are persisted. + * Under this mode, only value_counts, null_value_counts, nan_value_counts are persisted. */ public static class Counts extends ProxySerializableMetricsMode { private static final Counts INSTANCE = new Counts(); @@ -92,7 +92,8 @@ public String toString() { } /** - * Under this mode, value_counts, null_value_counts and truncated lower_bounds, upper_bounds are persisted. + * Under this mode, value_counts, null_value_counts, nan_value_counts + * and truncated lower_bounds, upper_bounds are persisted. */ public static class Truncate extends ProxySerializableMetricsMode { private final int length; @@ -133,7 +134,8 @@ public int hashCode() { } /** - * Under this mode, value_counts, null_value_counts and full lower_bounds, upper_bounds are persisted. + * Under this mode, value_counts, null_value_counts, nan_value_counts + * and full lower_bounds, upper_bounds are persisted. */ public static class Full extends ProxySerializableMetricsMode { private static final Full INSTANCE = new Full(); diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index 4b1186074267..4b035830decf 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -210,6 +210,7 @@ static Types.StructType dataFileSchema(Types.StructType partitionType) { DataFile.COLUMN_SIZES, DataFile.VALUE_COUNTS, DataFile.NULL_VALUE_COUNTS, + DataFile.NAN_VALUE_COUNTS, DataFile.LOWER_BOUNDS, DataFile.UPPER_BOUNDS, DataFile.KEY_METADATA, @@ -343,12 +344,14 @@ public Object get(int pos) { case 8: return wrapped.nullValueCounts(); case 9: - return wrapped.lowerBounds(); + return wrapped.nanValueCounts(); case 10: - return wrapped.upperBounds(); + return wrapped.lowerBounds(); case 11: - return wrapped.keyMetadata(); + return wrapped.upperBounds(); case 12: + return wrapped.keyMetadata(); + case 13: return wrapped.splitOffsets(); } throw new IllegalArgumentException("Unknown field ordinal: " + pos); @@ -419,6 +422,11 @@ public Map nullValueCounts() { return wrapped.nullValueCounts(); } + @Override + public Map nanValueCounts() { + return wrapped.nanValueCounts(); + } + @Override public Map lowerBounds() { return wrapped.lowerBounds(); diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 55a91c95f182..a468632f4159 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -250,6 +250,7 @@ static Types.StructType fileType(Types.StructType partitionType) { DataFile.COLUMN_SIZES, DataFile.VALUE_COUNTS, DataFile.NULL_VALUE_COUNTS, + DataFile.NAN_VALUE_COUNTS, DataFile.LOWER_BOUNDS, DataFile.UPPER_BOUNDS, DataFile.KEY_METADATA, @@ -397,14 +398,16 @@ public Object get(int pos) { case 8: return wrapped.nullValueCounts(); case 9: - return wrapped.lowerBounds(); + return wrapped.nanValueCounts(); case 10: - return wrapped.upperBounds(); + return wrapped.lowerBounds(); case 11: - return wrapped.keyMetadata(); + return wrapped.upperBounds(); case 12: - return wrapped.splitOffsets(); + return wrapped.keyMetadata(); case 13: + return wrapped.splitOffsets(); + case 14: return wrapped.equalityFieldIds(); } throw new IllegalArgumentException("Unknown field ordinal: " + pos); @@ -470,6 +473,11 @@ public Map nullValueCounts() { return wrapped.nullValueCounts(); } + @Override + public Map nanValueCounts() { + return wrapped.nanValueCounts(); + } + @Override public Map lowerBounds() { return wrapped.lowerBounds(); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java b/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java new file mode 100644 index 000000000000..8d5f8b63046d --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestManifestReaderStats extends TableTestBase { + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] { 1, 2 }; + } + + public TestManifestReaderStats(int formatVersion) { + super(formatVersion); + } + + private static final Map VALUE_COUNT = ImmutableMap.of(3, 3L); + private static final Map NULL_VALUE_COUNTS = ImmutableMap.of(3, 0L); + private static final Map NAN_VALUE_COUNTS = ImmutableMap.of(3, 1L); + private static final Map LOWER_BOUNDS = + ImmutableMap.of(3, Conversions.toByteBuffer(Types.IntegerType.get(), 2)); + private static final Map UPPER_BOUNDS = + ImmutableMap.of(3, Conversions.toByteBuffer(Types.IntegerType.get(), 4)); + + private static final Metrics METRICS = new Metrics(3L, null, + VALUE_COUNT, NULL_VALUE_COUNTS, NAN_VALUE_COUNTS, LOWER_BOUNDS, UPPER_BOUNDS); + + private static final DataFile FILE = DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withRecordCount(3) + .withMetrics(METRICS) + .build(); + + @Test + public void testReadIncludesFullStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { + CloseableIterable> entries = reader.entries(); + ManifestEntry entry = entries.iterator().next(); + assertFullStats(entry.file()); + } + } + + @Test + public void testReadWithFilterIncludesFullStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .filterRows(Expressions.equal("id", 3))) { + CloseableIterable> entries = reader.entries(); + ManifestEntry entry = entries.iterator().next(); + assertFullStats(entry.file()); + } + } + + @Test + public void testReadEntriesWithFilterAndSelectIncludesFullStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .select(ImmutableSet.of("record_count")) + .filterRows(Expressions.equal("id", 3))) { + CloseableIterable> entries = reader.entries(); + ManifestEntry entry = entries.iterator().next(); + assertFullStats(entry.file()); + } + } + + @Test + public void testReadIteratorWithFilterAndSelectDropsStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .select(ImmutableSet.of("record_count")) + .filterRows(Expressions.equal("id", 3))) { + DataFile entry = reader.iterator().next(); + assertStatsDropped(entry); + } + } + @Test + public void testReadIteratorWithFilterAndSelectStatsIncludesFullStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .select(ImmutableSet.of("record_count", "value_counts")) + .filterRows(Expressions.equal("id", 3))) { + DataFile entry = reader.iterator().next(); + assertFullStats(entry); + } + } + + @Test + public void testReadEntriesWithSelectNotIncludeFullStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .select(ImmutableSet.of("record_count"))) { + CloseableIterable> entries = reader.entries(); + ManifestEntry entry = entries.iterator().next(); + assertStatsDropped(entry.file()); + } + } + @Test + public void testReadEntriesWithSelectCertainStatNotIncludeFullStats() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .select(ImmutableSet.of("record_count", "value_counts"))) { + DataFile dataFile = reader.iterator().next(); + + Assert.assertEquals(3, dataFile.recordCount()); + Assert.assertNull(dataFile.columnSizes()); + Assert.assertEquals(VALUE_COUNT, dataFile.valueCounts()); + Assert.assertNull(dataFile.nullValueCounts()); + Assert.assertNull(dataFile.lowerBounds()); + Assert.assertNull(dataFile.upperBounds()); + Assert.assertNull(dataFile.nanValueCounts()); + } + } + + private void assertFullStats(DataFile dataFile) { + Assert.assertEquals(3, dataFile.recordCount()); + Assert.assertNull(dataFile.columnSizes()); + Assert.assertEquals(VALUE_COUNT, dataFile.valueCounts()); + Assert.assertEquals(NULL_VALUE_COUNTS, dataFile.nullValueCounts()); + Assert.assertEquals(LOWER_BOUNDS, dataFile.lowerBounds()); + Assert.assertEquals(UPPER_BOUNDS, dataFile.upperBounds()); + Assert.assertEquals(NAN_VALUE_COUNTS, dataFile.nanValueCounts()); + } + + private void assertStatsDropped(DataFile dataFile) { + Assert.assertEquals(3, dataFile.recordCount()); // always select record count in all test cases + Assert.assertNull(dataFile.columnSizes()); + Assert.assertNull(dataFile.valueCounts()); + Assert.assertNull(dataFile.nullValueCounts()); + Assert.assertNull(dataFile.lowerBounds()); + Assert.assertNull(dataFile.upperBounds()); + Assert.assertNull(dataFile.nanValueCounts()); + } + +} diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index c8ede073722e..f2cd7bb7fd4a 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -45,7 +45,8 @@ public class TestManifestWriterVersions { required(1, "id", Types.LongType.get()), required(2, "timestamp", Types.TimestampType.withZone()), required(3, "category", Types.StringType.get()), - required(4, "data", Types.StringType.get())); + required(4, "data", Types.StringType.get()), + required(5, "double", Types.DoubleType.get())); private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) .identity("category") @@ -60,9 +61,10 @@ public class TestManifestWriterVersions { private static final PartitionData PARTITION = DataFiles.data(SPEC, "category=cheesy/timestamp_hour=10/id_bucket=3"); private static final Metrics METRICS = new Metrics( 1587L, - ImmutableMap.of(1, 15L, 2, 122L, 3, 4021L, 4, 9411L), // sizes - ImmutableMap.of(1, 100L, 2, 100L, 3, 100L, 4, 100L), // value counts - ImmutableMap.of(1, 0L, 2, 0L, 3, 0L, 4, 0L), // null value counts + ImmutableMap.of(1, 15L, 2, 122L, 3, 4021L, 4, 9411L, 5, 15L), // sizes + ImmutableMap.of(1, 100L, 2, 100L, 3, 100L, 4, 100L, 5, 100L), // value counts + ImmutableMap.of(1, 0L, 2, 0L, 3, 0L, 4, 0L, 5, 0L), // null value counts + ImmutableMap.of(5, 10L), // nan value counts ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1)), // lower bounds ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds private static final List OFFSETS = ImmutableList.of(4L); @@ -104,7 +106,7 @@ public void testV1WriteWithInheritance() throws IOException { @Test public void testV2Write() throws IOException { - ManifestFile manifest = writeManifest(1); + ManifestFile manifest = writeManifest(2); checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ); Assert.assertEquals("Content", ManifestContent.DATA, manifest.content()); checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA); @@ -196,6 +198,7 @@ void checkDataFile(ContentFile dataFile, FileContent content) { Assert.assertEquals("Column sizes", METRICS.columnSizes(), dataFile.columnSizes()); Assert.assertEquals("Value counts", METRICS.valueCounts(), dataFile.valueCounts()); Assert.assertEquals("Null value counts", METRICS.nullValueCounts(), dataFile.nullValueCounts()); + Assert.assertEquals("NaN value counts", METRICS.nanValueCounts(), dataFile.nanValueCounts()); Assert.assertEquals("Lower bounds", METRICS.lowerBounds(), dataFile.lowerBounds()); Assert.assertEquals("Upper bounds", METRICS.upperBounds(), dataFile.upperBounds()); if (dataFile.content() == FileContent.EQUALITY_DELETES) { diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java index 6cd7f88ddf27..f1a65f6202fd 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetrics.java +++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java @@ -73,7 +73,8 @@ public abstract class TestMetrics { private static final StructType NESTED_STRUCT_TYPE = StructType.of( required(3, "longCol", LongType.get()), - required(4, "leafStructCol", LEAF_STRUCT_TYPE) + required(4, "leafStructCol", LEAF_STRUCT_TYPE), + required(7, "doubleCol", DoubleType.get()) ); private static final Schema NESTED_SCHEMA = new Schema( @@ -272,6 +273,8 @@ public void testMetricsForNestedStructFields() throws IOException { assertCounts(6, 1L, 0L, metrics); assertBounds(6, BinaryType.get(), ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); + assertCounts(7, 1L, 0L, 1L, metrics); + assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics); } private Record buildNestedTestRecord() { @@ -281,6 +284,7 @@ private Record buildNestedTestRecord() { Record nestedStruct = GenericRecord.create(NESTED_STRUCT_TYPE); nestedStruct.setField("longCol", 100L); nestedStruct.setField("leafStructCol", leafStruct); + nestedStruct.setField("doubleCol", Double.NaN); Record record = GenericRecord.create(NESTED_SCHEMA); record.setField("intCol", Integer.MAX_VALUE); record.setField("nestedStructCol", nestedStruct); @@ -475,6 +479,7 @@ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOExce Record newNestedStruct = GenericRecord.create(NESTED_STRUCT_TYPE); newNestedStruct.setField("longCol", i + 1L); newNestedStruct.setField("leafStructCol", newLeafStruct); + newNestedStruct.setField("doubleCol", Double.NaN); Record newRecord = GenericRecord.create(NESTED_SCHEMA); newRecord.setField("intCol", i + 1); newRecord.setField("nestedStructCol", newNestedStruct); @@ -500,6 +505,8 @@ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOExce assertCounts(6, 201L, 0L, metrics); assertBounds(6, BinaryType.get(), ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); + assertCounts(7, 201L, 0L, 201L, metrics); + assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics); } @Test @@ -518,6 +525,8 @@ public void testNoneMetricsMode() throws IOException { assertBounds(5, Types.LongType.get(), null, null, metrics); assertCounts(6, null, null, metrics); assertBounds(6, Types.BinaryType.get(), null, null, metrics); + assertCounts(7, null, null, metrics); + assertBounds(7, Types.DoubleType.get(), null, null, metrics); } @Test @@ -536,6 +545,8 @@ public void testCountsMetricsMode() throws IOException { assertBounds(5, Types.LongType.get(), null, null, metrics); assertCounts(6, 1L, 0L, metrics); assertBounds(6, Types.BinaryType.get(), null, null, metrics); + assertCounts(7, 1L, 0L, 1L, metrics); + assertBounds(7, Types.DoubleType.get(), null, null, metrics); } @Test @@ -555,6 +566,8 @@ public void testFullMetricsMode() throws IOException { assertCounts(6, 1L, 0L, metrics); assertBounds(6, Types.BinaryType.get(), ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); + assertCounts(7, 1L, 0L, 1L, metrics); + assertBounds(7, Types.DoubleType.get(), Double.NaN, Double.NaN, metrics); } @Test diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index 14233c949815..5f8f22721160 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -42,6 +42,7 @@ public class SparkDataFile implements DataFile { private final int columnSizesPosition; private final int valueCountsPosition; private final int nullValueCountsPosition; + private final int nanValueCountsPosition; private final int lowerBoundsPosition; private final int upperBoundsPosition; private final int keyMetadataPosition; @@ -73,6 +74,7 @@ public SparkDataFile(Types.StructType type, StructType sparkType) { columnSizesPosition = positions.get("column_sizes"); valueCountsPosition = positions.get("value_counts"); nullValueCountsPosition = positions.get("null_value_counts"); + nanValueCountsPosition = positions.get("nan_value_counts"); lowerBoundsPosition = positions.get("lower_bounds"); upperBoundsPosition = positions.get("upper_bounds"); keyMetadataPosition = positions.get("key_metadata"); @@ -138,6 +140,11 @@ public Map nullValueCounts() { return wrapped.isNullAt(nullValueCountsPosition) ? null : wrapped.getJavaMap(nullValueCountsPosition); } + @Override + public Map nanValueCounts() { + return wrapped.isNullAt(nanValueCountsPosition) ? null : wrapped.getJavaMap(nanValueCountsPosition); + } + @Override public Map lowerBounds() { Map lowerBounds = wrapped.isNullAt(lowerBoundsPosition) ? null : wrapped.getJavaMap(lowerBoundsPosition); diff --git a/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java b/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java index 2c06372087ed..783a62099c11 100644 --- a/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java +++ b/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java @@ -59,7 +59,8 @@ public class TestDataFileSerialization { private static final Schema DATE_SCHEMA = new Schema( required(1, "id", Types.LongType.get()), optional(2, "data", Types.StringType.get()), - required(3, "date", Types.StringType.get())); + required(3, "date", Types.StringType.get()), + optional(4, "double", Types.DoubleType.get())); private static final PartitionSpec PARTITION_SPEC = PartitionSpec .builderFor(DATE_SCHEMA) @@ -68,14 +69,17 @@ public class TestDataFileSerialization { private static final Map VALUE_COUNTS = Maps.newHashMap(); private static final Map NULL_VALUE_COUNTS = Maps.newHashMap(); + private static final Map NAN_VALUE_COUNTS = Maps.newHashMap(); private static final Map LOWER_BOUNDS = Maps.newHashMap(); private static final Map UPPER_BOUNDS = Maps.newHashMap(); static { VALUE_COUNTS.put(1, 5L); VALUE_COUNTS.put(2, 3L); + VALUE_COUNTS.put(4, 2L); NULL_VALUE_COUNTS.put(1, 0L); NULL_VALUE_COUNTS.put(2, 2L); + NAN_VALUE_COUNTS.put(4, 1L); LOWER_BOUNDS.put(1, longToBuffer(0L)); UPPER_BOUNDS.put(1, longToBuffer(4L)); } @@ -85,7 +89,8 @@ public class TestDataFileSerialization { .withPath("/path/to/data-1.parquet") .withFileSizeInBytes(1234) .withPartitionPath("date=2018-06-08") - .withMetrics(new Metrics(5L, null, VALUE_COUNTS, NULL_VALUE_COUNTS, LOWER_BOUNDS, UPPER_BOUNDS)) + .withMetrics(new Metrics( + 5L, null, VALUE_COUNTS, NULL_VALUE_COUNTS, NAN_VALUE_COUNTS, LOWER_BOUNDS, UPPER_BOUNDS)) .withSplitOffsets(ImmutableList.of(4L)) .withEncryptionKeyMetadata(ByteBuffer.allocate(4).putInt(34)) .build(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java index 126d0455c326..3269db51caf7 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java @@ -193,6 +193,7 @@ private void checkDataFile(DataFile expected, DataFile actual) { Assert.assertEquals("Size must match", expected.fileSizeInBytes(), actual.fileSizeInBytes()); Assert.assertEquals("Record value counts must match", expected.valueCounts(), actual.valueCounts()); Assert.assertEquals("Record null value counts must match", expected.nullValueCounts(), actual.nullValueCounts()); + Assert.assertEquals("Record nan value counts must match", expected.nanValueCounts(), actual.nanValueCounts()); Assert.assertEquals("Lower bounds must match", expected.lowerBounds(), actual.lowerBounds()); Assert.assertEquals("Upper bounds must match", expected.upperBounds(), actual.upperBounds()); Assert.assertEquals("Key metadata must match", expected.keyMetadata(), actual.keyMetadata());