Skip to content

Commit 2e46847

Browse files
authored
Core: Fix float and double metrics for Parquet and ORC (#2464)
This replaces the metrics from Parquet and ORC with metrics that are accumulated by Iceberg writers to ensure that the metrics do not include NaN values.
1 parent 111fe81 commit 2e46847

File tree

19 files changed

+438
-212
lines changed

19 files changed

+438
-212
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg;
21+
22+
/**
23+
* Iceberg internally tracked field level metrics, used by Parquet and ORC writers only.
24+
* <p>
25+
* Parquet/ORC keeps track of most metrics in file statistics, and only NaN counter is actually tracked by writers.
26+
* This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing
27+
* exceptions when they are accessed.
28+
*/
29+
public class DoubleFieldMetrics extends FieldMetrics<Double> {
30+
31+
private DoubleFieldMetrics(int id, long valueCount, long nanValueCount, Double lowerBound, Double upperBound) {
32+
super(id, valueCount, 0L, nanValueCount, lowerBound, upperBound);
33+
}
34+
35+
public static class Builder {
36+
private final int id;
37+
private long valueCount = 0;
38+
private long nanValueCount = 0;
39+
private double lowerBound = Double.POSITIVE_INFINITY;
40+
private double upperBound = Double.NEGATIVE_INFINITY;
41+
42+
public Builder(int id) {
43+
this.id = id;
44+
}
45+
46+
public void addValue(double value) {
47+
this.valueCount++;
48+
if (Double.isNaN(value)) {
49+
this.nanValueCount++;
50+
} else {
51+
if (Double.compare(value, lowerBound) < 0) {
52+
this.lowerBound = value;
53+
}
54+
if (Double.compare(value, upperBound) > 0) {
55+
this.upperBound = value;
56+
}
57+
}
58+
}
59+
60+
public DoubleFieldMetrics build() {
61+
boolean hasBound = valueCount - nanValueCount > 0;
62+
return new DoubleFieldMetrics(id, valueCount, nanValueCount,
63+
hasBound ? lowerBound : null, hasBound ? upperBound : null);
64+
}
65+
}
66+
}

core/src/main/java/org/apache/iceberg/FieldMetrics.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,23 @@
2020
package org.apache.iceberg;
2121

2222

23-
import java.nio.ByteBuffer;
24-
2523
/**
2624
* Iceberg internally tracked field level metrics.
2725
*/
28-
public class FieldMetrics {
26+
public class FieldMetrics<T> {
2927
private final int id;
3028
private final long valueCount;
3129
private final long nullValueCount;
3230
private final long nanValueCount;
33-
private final ByteBuffer lowerBound;
34-
private final ByteBuffer upperBound;
31+
private final T lowerBound;
32+
private final T upperBound;
3533

3634
public FieldMetrics(int id,
3735
long valueCount,
3836
long nullValueCount,
3937
long nanValueCount,
40-
ByteBuffer lowerBound,
41-
ByteBuffer upperBound) {
38+
T lowerBound,
39+
T upperBound) {
4240
this.id = id;
4341
this.valueCount = valueCount;
4442
this.nullValueCount = nullValueCount;
@@ -78,14 +76,21 @@ public long nanValueCount() {
7876
/**
7977
* Returns the lower bound value of this field.
8078
*/
81-
public ByteBuffer lowerBound() {
79+
public T lowerBound() {
8280
return lowerBound;
8381
}
8482

8583
/**
8684
* Returns the upper bound value of this field.
8785
*/
88-
public ByteBuffer upperBound() {
86+
public T upperBound() {
8987
return upperBound;
9088
}
89+
90+
/**
91+
* Returns if the metrics has bounds (i.e. there is at least non-null value for this field)
92+
*/
93+
public boolean hasBounds() {
94+
return upperBound != null;
95+
}
9196
}

core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,44 +19,52 @@
1919

2020
package org.apache.iceberg;
2121

22-
import java.nio.ByteBuffer;
23-
2422
/**
2523
* Iceberg internally tracked field level metrics, used by Parquet and ORC writers only.
2624
* <p>
2725
* Parquet/ORC keeps track of most metrics in file statistics, and only NaN counter is actually tracked by writers.
2826
* This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing
2927
* exceptions when they are accessed.
3028
*/
31-
public class FloatFieldMetrics extends FieldMetrics {
32-
33-
/**
34-
* Constructor for creating a FieldMetrics with only NaN counter.
35-
* @param id field id being tracked by the writer
36-
* @param nanValueCount number of NaN values, will only be non-0 for double or float field.
37-
*/
38-
public FloatFieldMetrics(int id,
39-
long nanValueCount) {
40-
super(id, 0L, 0L, nanValueCount, null, null);
41-
}
29+
public class FloatFieldMetrics extends FieldMetrics<Float> {
4230

43-
@Override
44-
public long valueCount() {
45-
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
31+
private FloatFieldMetrics(int id, long valueCount, long nanValueCount, Float lowerBound, Float upperBound) {
32+
super(id, valueCount, 0L, nanValueCount, lowerBound, upperBound);
4633
}
4734

48-
@Override
49-
public long nullValueCount() {
50-
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
35+
public Builder builderFor(int id) {
36+
return new Builder(id);
5137
}
5238

53-
@Override
54-
public ByteBuffer lowerBound() {
55-
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
56-
}
39+
public static class Builder {
40+
private final int id;
41+
private long valueCount = 0;
42+
private long nanValueCount = 0;
43+
private float lowerBound = Float.POSITIVE_INFINITY;
44+
private float upperBound = Float.NEGATIVE_INFINITY;
45+
46+
public Builder(int id) {
47+
this.id = id;
48+
}
49+
50+
public void addValue(float value) {
51+
this.valueCount++;
52+
if (Float.isNaN(value)) {
53+
this.nanValueCount++;
54+
} else {
55+
if (Float.compare(value, lowerBound) < 0) {
56+
this.lowerBound = value;
57+
}
58+
if (Float.compare(value, upperBound) > 0) {
59+
this.upperBound = value;
60+
}
61+
}
62+
}
5763

58-
@Override
59-
public ByteBuffer upperBound() {
60-
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
64+
public FloatFieldMetrics build() {
65+
boolean hasBound = valueCount - nanValueCount > 0;
66+
return new FloatFieldMetrics(id, valueCount, nanValueCount,
67+
hasBound ? lowerBound : null, hasBound ? upperBound : null);
68+
}
6169
}
6270
}

core/src/main/java/org/apache/iceberg/MetricsUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private MetricsUtil() {
3434
* Construct mapping relationship between column id to NaN value counts from input metrics and metrics config.
3535
*/
3636
public static Map<Integer, Long> createNanValueCounts(
37-
Stream<FieldMetrics> fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) {
37+
Stream<FieldMetrics<?>> fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) {
3838
Preconditions.checkNotNull(metricsConfig, "metricsConfig is required");
3939

4040
if (fieldMetrics == null || inputSchema == null) {

core/src/test/java/org/apache/iceberg/TestMetrics.java

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ public void testMetricsForNestedStructFields() throws IOException {
274274
assertBounds(6, BinaryType.get(),
275275
ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics);
276276
assertCounts(7, 1L, 0L, 1L, metrics);
277-
assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics);
277+
assertBounds(7, DoubleType.get(), null, null, metrics);
278278
}
279279

280280
private Record buildNestedTestRecord() {
@@ -354,9 +354,9 @@ public void testMetricsForNaNColumns() throws IOException {
354354
Assert.assertEquals(2L, (long) metrics.recordCount());
355355
assertCounts(1, 2L, 0L, 2L, metrics);
356356
assertCounts(2, 2L, 0L, 2L, metrics);
357-
// below: current behavior; will be null once NaN is excluded from upper/lower bound
358-
assertBounds(1, FloatType.get(), Float.NaN, Float.NaN, metrics);
359-
assertBounds(2, DoubleType.get(), Double.NaN, Double.NaN, metrics);
357+
358+
assertBounds(1, FloatType.get(), null, null, metrics);
359+
assertBounds(2, DoubleType.get(), null, null, metrics);
360360
}
361361

362362
@Test
@@ -367,15 +367,8 @@ public void testColumnBoundsWithNaNValueAtFront() throws IOException {
367367
assertCounts(1, 3L, 0L, 1L, metrics);
368368
assertCounts(2, 3L, 0L, 1L, metrics);
369369

370-
// below: current behavior; will be non-NaN values once NaN is excluded from upper/lower bound. ORC and Parquet's
371-
// behaviors differ due to their implementation of comparison being different.
372-
if (fileFormat() == FileFormat.ORC) {
373-
assertBounds(1, FloatType.get(), Float.NaN, Float.NaN, metrics);
374-
assertBounds(2, DoubleType.get(), Double.NaN, Double.NaN, metrics);
375-
} else {
376-
assertBounds(1, FloatType.get(), 1.2F, Float.NaN, metrics);
377-
assertBounds(2, DoubleType.get(), 3.4D, Double.NaN, metrics);
378-
}
370+
assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics);
371+
assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics);
379372
}
380373

381374
@Test
@@ -386,15 +379,8 @@ public void testColumnBoundsWithNaNValueInMiddle() throws IOException {
386379
assertCounts(1, 3L, 0L, 1L, metrics);
387380
assertCounts(2, 3L, 0L, 1L, metrics);
388381

389-
// below: current behavior; will be non-NaN values once NaN is excluded from upper/lower bound. ORC and Parquet's
390-
// behaviors differ due to their implementation of comparison being different.
391-
if (fileFormat() == FileFormat.ORC) {
392-
assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics);
393-
assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics);
394-
} else {
395-
assertBounds(1, FloatType.get(), 1.2F, Float.NaN, metrics);
396-
assertBounds(2, DoubleType.get(), 3.4D, Double.NaN, metrics);
397-
}
382+
assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics);
383+
assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics);
398384
}
399385

400386
@Test
@@ -405,15 +391,8 @@ public void testColumnBoundsWithNaNValueAtEnd() throws IOException {
405391
assertCounts(1, 3L, 0L, 1L, metrics);
406392
assertCounts(2, 3L, 0L, 1L, metrics);
407393

408-
// below: current behavior; will be non-NaN values once NaN is excluded from upper/lower bound. ORC and Parquet's
409-
// behaviors differ due to their implementation of comparison being different.
410-
if (fileFormat() == FileFormat.ORC) {
411-
assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics);
412-
assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics);
413-
} else {
414-
assertBounds(1, FloatType.get(), 1.2F, Float.NaN, metrics);
415-
assertBounds(2, DoubleType.get(), 3.4D, Double.NaN, metrics);
416-
}
394+
assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics);
395+
assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics);
417396
}
418397

419398
@Test
@@ -506,7 +485,7 @@ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOExce
506485
assertBounds(6, BinaryType.get(),
507486
ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics);
508487
assertCounts(7, 201L, 0L, 201L, metrics);
509-
assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics);
488+
assertBounds(7, DoubleType.get(), null, null, metrics);
510489
}
511490

512491
@Test
@@ -567,7 +546,7 @@ public void testFullMetricsMode() throws IOException {
567546
assertBounds(6, Types.BinaryType.get(),
568547
ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics);
569548
assertCounts(7, 1L, 0L, 1L, metrics);
570-
assertBounds(7, Types.DoubleType.get(), Double.NaN, Double.NaN, metrics);
549+
assertBounds(7, Types.DoubleType.get(), null, null, metrics);
571550
}
572551

573552
@Test

data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void write(Record value, VectorizedRowBatch output) {
129129
}
130130

131131
@Override
132-
public Stream<FieldMetrics> metrics() {
132+
public Stream<FieldMetrics<?>> metrics() {
133133
return writer.metrics();
134134
}
135135

@@ -160,7 +160,7 @@ public void nonNullWrite(int rowId, Record data, ColumnVector output) {
160160
}
161161

162162
@Override
163-
public Stream<FieldMetrics> metrics() {
163+
public Stream<FieldMetrics<?>> metrics() {
164164
return writers.stream().flatMap(OrcValueWriter::metrics);
165165
}
166166
}

0 commit comments

Comments
 (0)