From 382d39d47ed373116d18afa125475060b0b48827 Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Tue, 24 Nov 2020 21:28:15 -0800 Subject: [PATCH 1/7] Refactor/rename metrics related classes for NaN support --- .../apache/iceberg/NaNOnlyFieldMetrics.java | 23 +++++---- .../java/org/apache/iceberg/MetricsUtil.java | 48 +++++++++++++++++++ .../apache/iceberg/TestMergingMetrics.java | 14 ++++++ ...cs.java => TestGenericMergingMetrics.java} | 8 +++- .../flink/source/TestFlinkMergingMetrics.java | 6 ++- .../apache/iceberg/parquet/ParquetUtil.java | 22 ++------- .../iceberg/parquet/ParquetValueWriters.java | 5 +- .../spark/source/StructInternalRow.java | 31 +++++++++++- ...rics.java => TestSparkMergingMetrics.java} | 8 +++- 9 files changed, 125 insertions(+), 40 deletions(-) rename parquet/src/main/java/org/apache/iceberg/parquet/ParquetFieldMetrics.java => api/src/main/java/org/apache/iceberg/NaNOnlyFieldMetrics.java (62%) create mode 100644 core/src/main/java/org/apache/iceberg/MetricsUtil.java rename data/src/test/java/org/apache/iceberg/parquet/{TestParquetMergingMetrics.java => TestGenericMergingMetrics.java} (86%) rename spark/src/test/java/org/apache/iceberg/spark/source/{TestSparkParquetMergingMetrics.java => TestSparkMergingMetrics.java} (90%) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFieldMetrics.java b/api/src/main/java/org/apache/iceberg/NaNOnlyFieldMetrics.java similarity index 62% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetFieldMetrics.java rename to api/src/main/java/org/apache/iceberg/NaNOnlyFieldMetrics.java index 708785a5b627..3b6f9c47686c 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFieldMetrics.java +++ b/api/src/main/java/org/apache/iceberg/NaNOnlyFieldMetrics.java @@ -17,26 +17,25 @@ * under the License. */ -package org.apache.iceberg.parquet; +package org.apache.iceberg; import java.nio.ByteBuffer; -import org.apache.iceberg.FieldMetrics; /** - * Iceberg internally tracked field level metrics, used by Parquet writer only. + * Iceberg internally tracked field level metrics, used by Parquet and ORC writers only. *

- * Parquet keeps track of most metrics in its footer, and only NaN counter is actually tracked by writers. - * This wrapper ensures that metrics not being updated by Parquet writers will not be incorrectly used, by throwing + * Parquet/ORC keeps track of most metrics in file statistics, and only NaN counter is actually tracked by writers. + * This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing * exceptions when they are accessed. */ -public class ParquetFieldMetrics extends FieldMetrics { +public class NaNOnlyFieldMetrics extends FieldMetrics { /** - * Constructor for creating a Parquet-specific FieldMetrics. + * Constructor for creating a FieldMetrics with only NaN counter. * @param id field id being tracked by the writer * @param nanValueCount number of NaN values, will only be non-0 for double or float field. */ - public ParquetFieldMetrics(int id, + public NaNOnlyFieldMetrics(int id, long nanValueCount) { super(id, 0L, 0L, nanValueCount, null, null); } @@ -44,24 +43,24 @@ public ParquetFieldMetrics(int id, @Override public long valueCount() { throw new IllegalStateException( - "Shouldn't access valueCount() within ParquetFieldMetrics, as this metric is tracked by Parquet footer. "); + "Shouldn't access valueCount() within NaNOnlyFieldMetrics, as this metric is tracked in file statistics. "); } @Override public long nullValueCount() { throw new IllegalStateException( - "Shouldn't access nullValueCount() within ParquetFieldMetrics, as this metric is tracked by Parquet footer. "); + "Shouldn't access nullValueCount() within NaNOnlyFieldMetrics, as this metric is tracked in file statistics. "); } @Override public ByteBuffer lowerBound() { throw new IllegalStateException( - "Shouldn't access lowerBound() within ParquetFieldMetrics, as this metric is tracked by Parquet footer. "); + "Shouldn't access lowerBound() within NaNOnlyFieldMetrics, as this metric is tracked in file statistics. "); } @Override public ByteBuffer upperBound() { throw new IllegalStateException( - "Shouldn't access upperBound() within ParquetFieldMetrics, as this metric is tracked by Parquet footer. "); + "Shouldn't access upperBound() within NaNOnlyFieldMetrics, as this metric is tracked in file statistics. "); } } diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java b/core/src/main/java/org/apache/iceberg/MetricsUtil.java new file mode 100644 index 000000000000..61982d617e5f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class MetricsUtil { + + private MetricsUtil() { + } + + public static Map getNanValueCounts( + Stream fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) { + if (fieldMetrics == null || inputSchema == null) { + return Maps.newHashMap(); + } + + return fieldMetrics + .filter(metrics -> getMetricsMode(inputSchema, metricsConfig, metrics.id()) != MetricsModes.None.get()) + .collect(Collectors.toMap(FieldMetrics::id, FieldMetrics::nanValueCount)); + } + + public static MetricsModes.MetricsMode getMetricsMode(Schema inputSchema, MetricsConfig metricsConfig, int fieldId) { + String columnName = inputSchema.findColumnName(fieldId); + return metricsConfig.columnMode(columnName); + } + +} diff --git a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java index 5258d8b2997f..96217229d879 100644 --- a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java @@ -32,10 +32,13 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +@RunWith(Parameterized.class) public abstract class TestMergingMetrics { // all supported fields, except for UUID which is on deprecation path: see https://github.com/apache/iceberg/pull/1611 @@ -83,6 +86,17 @@ public abstract class TestMergingMetrics { STRUCT_FIELD ); + protected final FileFormat fileFormat; + + @Parameterized.Parameters(name = "fileFormat = {0}") + public static Object[] parameters() { + return new Object[] {FileFormat.PARQUET }; + } + + public TestMergingMetrics(FileFormat fileFormat) { + this.fileFormat = fileFormat; + } + protected abstract FileAppender writeAndGetAppender(List records) throws Exception; @Rule diff --git a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java b/data/src/test/java/org/apache/iceberg/parquet/TestGenericMergingMetrics.java similarity index 86% rename from data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java rename to data/src/test/java/org/apache/iceberg/parquet/TestGenericMergingMetrics.java index 03118e3a65f2..72db36a9908c 100644 --- a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/parquet/TestGenericMergingMetrics.java @@ -27,12 +27,16 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppender; -public class TestParquetMergingMetrics extends TestMergingMetrics { +public class TestGenericMergingMetrics extends TestMergingMetrics { + + public TestGenericMergingMetrics(FileFormat fileFormat) { + super(fileFormat); + } @Override protected FileAppender writeAndGetAppender(List records) throws IOException { FileAppender appender = new GenericAppenderFactory(SCHEMA).newAppender( - org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); + org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat); try (FileAppender fileAppender = appender) { records.forEach(fileAppender::add); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java index cd0a89ba7c76..1670ed733421 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java @@ -35,13 +35,17 @@ public class TestFlinkMergingMetrics extends TestMergingMetrics { + public TestFlinkMergingMetrics(FileFormat fileFormat) { + super(fileFormat); + } + @Override protected FileAppender writeAndGetAppender(List records) throws IOException { RowType flinkSchema = FlinkSchemaUtil.convert(SCHEMA); FileAppender appender = new FlinkAppenderFactory(SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned()) - .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); + .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat); try (FileAppender fileAppender = appender) { records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java index 9bc3404dcf1b..4cefa8ff146f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -28,13 +28,13 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Metrics; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.MetricsModes; import org.apache.iceberg.MetricsModes.MetricsMode; +import org.apache.iceberg.MetricsUtil; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Literal; @@ -113,8 +113,7 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream getNanValueCounts( - Stream fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) { - if (fieldMetrics == null || inputSchema == null) { - return Maps.newHashMap(); - } - - return fieldMetrics - .filter(metrics -> { - String columnName = inputSchema.findColumnName(metrics.id()); - MetricsMode metricsMode = metricsConfig.columnMode(columnName); - return metricsMode != MetricsModes.None.get(); - }) - .collect(Collectors.toMap(FieldMetrics::id, FieldMetrics::nanValueCount)); - } - private static MessageType getParquetTypeWithIds(ParquetMetadata metadata, NameMapping nameMapping) { MessageType type = metadata.getFileMetaData().getSchema(); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 7d77996e9968..f6775630b39d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -31,6 +31,7 @@ import java.util.stream.Stream; import org.apache.avro.util.Utf8; import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.NaNOnlyFieldMetrics; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -188,7 +189,7 @@ public void write(int repetitionLevel, Float value) { @Override public Stream metrics() { - return Stream.of(new ParquetFieldMetrics(id, nanCount)); + return Stream.of(new NaNOnlyFieldMetrics(id, nanCount)); } } @@ -212,7 +213,7 @@ public void write(int repetitionLevel, Double value) { @Override public Stream metrics() { - return Stream.of(new ParquetFieldMetrics(id, nanCount)); + return Stream.of(new NaNOnlyFieldMetrics(id, nanCount)); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java b/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java index 964c1293098f..23208a8b126c 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java @@ -21,9 +21,12 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.OffsetDateTime; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; import org.apache.iceberg.StructLike; @@ -42,6 +45,7 @@ import org.apache.spark.sql.types.BooleanType; import org.apache.spark.sql.types.ByteType; import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.DecimalType; import org.apache.spark.sql.types.DoubleType; @@ -52,6 +56,7 @@ import org.apache.spark.sql.types.ShortType; import org.apache.spark.sql.types.StringType; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.TimestampType; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -115,12 +120,30 @@ public short getShort(int ordinal) { @Override public int getInt(int ordinal) { - return struct.get(ordinal, Integer.class); + Object integer = struct.get(ordinal, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException("Unknown type for int field. Type name: " + integer.getClass().getName()); + } } @Override public long getLong(int ordinal) { - return struct.get(ordinal, Long.class); + Object longVal = struct.get(ordinal, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return TimeUnit.SECONDS.toDays(((OffsetDateTime) longVal).toEpochSecond()); + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException("Unknown type for long field. Type name: " + longVal.getClass().getName()); + } } @Override @@ -240,6 +263,10 @@ public Object get(int ordinal, DataType dataType) { return getByte(ordinal); } else if (dataType instanceof ShortType) { return getShort(ordinal); + } else if (dataType instanceof DateType) { + return getInt(ordinal); + } else if (dataType instanceof TimestampType) { + return getLong(ordinal); } return null; } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java similarity index 90% rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java rename to spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java index e49a18be5338..16f25cc17ddc 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java @@ -29,13 +29,17 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.spark.sql.catalyst.InternalRow; -public class TestSparkParquetMergingMetrics extends TestMergingMetrics { +public class TestSparkMergingMetrics extends TestMergingMetrics { + + public TestSparkMergingMetrics(FileFormat fileFormat) { + super(fileFormat); + } @Override protected FileAppender writeAndGetAppender(List records) throws IOException { FileAppender appender = new SparkAppenderFactory(new HashMap<>(), SCHEMA, SparkSchemaUtil.convert(SCHEMA)).newAppender( - org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET); + org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat); try (FileAppender fileAppender = appender) { records.stream().map(r -> new StructInternalRow(SCHEMA.asStruct()).setStruct(r)).forEach(fileAppender::add); } From 6ee1853f3bc641af0833d38125434a81d1fa577c Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Wed, 25 Nov 2020 15:30:02 -0800 Subject: [PATCH 2/7] add javadoc and null check --- core/src/main/java/org/apache/iceberg/MetricsUtil.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java b/core/src/main/java/org/apache/iceberg/MetricsUtil.java index 61982d617e5f..5d3830239e0e 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsUtil.java +++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; public class MetricsUtil { @@ -29,6 +30,9 @@ public class MetricsUtil { private MetricsUtil() { } + /** + * Construct mapping relationship between column id to NaN value counts from input metrics and metrics config. + */ public static Map getNanValueCounts( Stream fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) { if (fieldMetrics == null || inputSchema == null) { @@ -40,7 +44,13 @@ public static Map getNanValueCounts( .collect(Collectors.toMap(FieldMetrics::id, FieldMetrics::nanValueCount)); } + /** + * Extract MetricsMode for the given field id from metrics config. + */ public static MetricsModes.MetricsMode getMetricsMode(Schema inputSchema, MetricsConfig metricsConfig, int fieldId) { + Preconditions.checkNotNull(inputSchema, "inputSchema is required"); + Preconditions.checkNotNull(metricsConfig, "metricsConfig is required"); + String columnName = inputSchema.findColumnName(fieldId); return metricsConfig.columnMode(columnName); } From ee0ae1d7f93702f4acc74fe80f9c0d69f0491bdc Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Tue, 1 Dec 2020 18:01:54 -0800 Subject: [PATCH 3/7] update based on comments in #1790 --- .../java/org/apache/iceberg/FieldMetrics.java | 0 .../java/org/apache/iceberg/MetricsUtil.java | 16 +++------------- .../java/org/apache/iceberg/NaNFieldMetrics.java | 6 +++--- .../iceberg/parquet/ParquetValueWriters.java | 6 +++--- 4 files changed, 9 insertions(+), 19 deletions(-) rename {api => core}/src/main/java/org/apache/iceberg/FieldMetrics.java (100%) rename api/src/main/java/org/apache/iceberg/NaNOnlyFieldMetrics.java => core/src/main/java/org/apache/iceberg/NaNFieldMetrics.java (94%) diff --git a/api/src/main/java/org/apache/iceberg/FieldMetrics.java b/core/src/main/java/org/apache/iceberg/FieldMetrics.java similarity index 100% rename from api/src/main/java/org/apache/iceberg/FieldMetrics.java rename to core/src/main/java/org/apache/iceberg/FieldMetrics.java diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java b/core/src/main/java/org/apache/iceberg/MetricsUtil.java index 5d3830239e0e..97a43fac657b 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsUtil.java +++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; public class MetricsUtil { @@ -30,27 +29,18 @@ public class MetricsUtil { private MetricsUtil() { } - /** - * Construct mapping relationship between column id to NaN value counts from input metrics and metrics config. - */ - public static Map getNanValueCounts( + public static Map createNanValueCounts( Stream fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) { if (fieldMetrics == null || inputSchema == null) { return Maps.newHashMap(); } return fieldMetrics - .filter(metrics -> getMetricsMode(inputSchema, metricsConfig, metrics.id()) != MetricsModes.None.get()) + .filter(metrics -> metricsMode(inputSchema, metricsConfig, metrics.id()) != MetricsModes.None.get()) .collect(Collectors.toMap(FieldMetrics::id, FieldMetrics::nanValueCount)); } - /** - * Extract MetricsMode for the given field id from metrics config. - */ - public static MetricsModes.MetricsMode getMetricsMode(Schema inputSchema, MetricsConfig metricsConfig, int fieldId) { - Preconditions.checkNotNull(inputSchema, "inputSchema is required"); - Preconditions.checkNotNull(metricsConfig, "metricsConfig is required"); - + public static MetricsModes.MetricsMode metricsMode(Schema inputSchema, MetricsConfig metricsConfig, int fieldId) { String columnName = inputSchema.findColumnName(fieldId); return metricsConfig.columnMode(columnName); } diff --git a/api/src/main/java/org/apache/iceberg/NaNOnlyFieldMetrics.java b/core/src/main/java/org/apache/iceberg/NaNFieldMetrics.java similarity index 94% rename from api/src/main/java/org/apache/iceberg/NaNOnlyFieldMetrics.java rename to core/src/main/java/org/apache/iceberg/NaNFieldMetrics.java index 3b6f9c47686c..efb966f4ee8c 100644 --- a/api/src/main/java/org/apache/iceberg/NaNOnlyFieldMetrics.java +++ b/core/src/main/java/org/apache/iceberg/NaNFieldMetrics.java @@ -28,15 +28,15 @@ * This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing * exceptions when they are accessed. */ -public class NaNOnlyFieldMetrics extends FieldMetrics { +public class NaNFieldMetrics extends FieldMetrics { /** * Constructor for creating a FieldMetrics with only NaN counter. * @param id field id being tracked by the writer * @param nanValueCount number of NaN values, will only be non-0 for double or float field. */ - public NaNOnlyFieldMetrics(int id, - long nanValueCount) { + public NaNFieldMetrics(int id, + long nanValueCount) { super(id, 0L, 0L, nanValueCount, null, null); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index f6775630b39d..93874702fc17 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -31,7 +31,7 @@ import java.util.stream.Stream; import org.apache.avro.util.Utf8; import org.apache.iceberg.FieldMetrics; -import org.apache.iceberg.NaNOnlyFieldMetrics; +import org.apache.iceberg.NaNFieldMetrics; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -189,7 +189,7 @@ public void write(int repetitionLevel, Float value) { @Override public Stream metrics() { - return Stream.of(new NaNOnlyFieldMetrics(id, nanCount)); + return Stream.of(new NaNFieldMetrics(id, nanCount)); } } @@ -213,7 +213,7 @@ public void write(int repetitionLevel, Double value) { @Override public Stream metrics() { - return Stream.of(new NaNOnlyFieldMetrics(id, nanCount)); + return Stream.of(new NaNFieldMetrics(id, nanCount)); } } From e2da2becdb948a116fa814a9b13e031cb559200e Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Tue, 1 Dec 2020 18:06:12 -0800 Subject: [PATCH 4/7] add forgot file --- .../src/main/java/org/apache/iceberg/parquet/ParquetUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java index 4cefa8ff146f..3337d2f78ff5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -113,7 +113,7 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream Date: Mon, 7 Dec 2020 11:45:21 -0800 Subject: [PATCH 5/7] address comments --- ...ieldMetrics.java => FloatFieldMetrics.java} | 18 +++++++----------- .../java/org/apache/iceberg/MetricsUtil.java | 12 ++++++++++++ .../iceberg/parquet/ParquetValueWriters.java | 6 +++--- 3 files changed, 22 insertions(+), 14 deletions(-) rename core/src/main/java/org/apache/iceberg/{NaNFieldMetrics.java => FloatFieldMetrics.java} (69%) diff --git a/core/src/main/java/org/apache/iceberg/NaNFieldMetrics.java b/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java similarity index 69% rename from core/src/main/java/org/apache/iceberg/NaNFieldMetrics.java rename to core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java index efb966f4ee8c..873138750685 100644 --- a/core/src/main/java/org/apache/iceberg/NaNFieldMetrics.java +++ b/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java @@ -28,39 +28,35 @@ * This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing * exceptions when they are accessed. */ -public class NaNFieldMetrics extends FieldMetrics { +public class FloatFieldMetrics extends FieldMetrics { /** * Constructor for creating a FieldMetrics with only NaN counter. * @param id field id being tracked by the writer * @param nanValueCount number of NaN values, will only be non-0 for double or float field. */ - public NaNFieldMetrics(int id, - long nanValueCount) { + public FloatFieldMetrics(int id, + long nanValueCount) { super(id, 0L, 0L, nanValueCount, null, null); } @Override public long valueCount() { - throw new IllegalStateException( - "Shouldn't access valueCount() within NaNOnlyFieldMetrics, as this metric is tracked in file statistics. "); + throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); } @Override public long nullValueCount() { - throw new IllegalStateException( - "Shouldn't access nullValueCount() within NaNOnlyFieldMetrics, as this metric is tracked in file statistics. "); + throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); } @Override public ByteBuffer lowerBound() { - throw new IllegalStateException( - "Shouldn't access lowerBound() within NaNOnlyFieldMetrics, as this metric is tracked in file statistics. "); + throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); } @Override public ByteBuffer upperBound() { - throw new IllegalStateException( - "Shouldn't access upperBound() within NaNOnlyFieldMetrics, as this metric is tracked in file statistics. "); + throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); } } diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java b/core/src/main/java/org/apache/iceberg/MetricsUtil.java index 97a43fac657b..5f5b1c659920 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsUtil.java +++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; public class MetricsUtil { @@ -29,8 +30,13 @@ public class MetricsUtil { private MetricsUtil() { } + /** + * Construct mapping relationship between column id to NaN value counts from input metrics and metrics config. + */ public static Map createNanValueCounts( Stream fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) { + Preconditions.checkNotNull(metricsConfig, "metricsConfig is required"); + if (fieldMetrics == null || inputSchema == null) { return Maps.newHashMap(); } @@ -40,7 +46,13 @@ public static Map createNanValueCounts( .collect(Collectors.toMap(FieldMetrics::id, FieldMetrics::nanValueCount)); } + /** + * Extract MetricsMode for the given field id from metrics config. + */ public static MetricsModes.MetricsMode metricsMode(Schema inputSchema, MetricsConfig metricsConfig, int fieldId) { + Preconditions.checkNotNull(inputSchema, "inputSchema is required"); + Preconditions.checkNotNull(metricsConfig, "metricsConfig is required"); + String columnName = inputSchema.findColumnName(fieldId); return metricsConfig.columnMode(columnName); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 93874702fc17..5413b3379387 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -31,7 +31,7 @@ import java.util.stream.Stream; import org.apache.avro.util.Utf8; import org.apache.iceberg.FieldMetrics; -import org.apache.iceberg.NaNFieldMetrics; +import org.apache.iceberg.FloatFieldMetrics; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -189,7 +189,7 @@ public void write(int repetitionLevel, Float value) { @Override public Stream metrics() { - return Stream.of(new NaNFieldMetrics(id, nanCount)); + return Stream.of(new FloatFieldMetrics(id, nanCount)); } } @@ -213,7 +213,7 @@ public void write(int repetitionLevel, Double value) { @Override public Stream metrics() { - return Stream.of(new NaNFieldMetrics(id, nanCount)); + return Stream.of(new FloatFieldMetrics(id, nanCount)); } } From 55b1e5bbf1002ab82f63623acc4e44e9f35b1e5d Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Mon, 14 Dec 2020 15:24:23 -0800 Subject: [PATCH 6/7] move generic testing outside of parquet folder --- .../iceberg/{parquet => }/TestGenericMergingMetrics.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) rename data/src/test/java/org/apache/iceberg/{parquet => }/TestGenericMergingMetrics.java (92%) diff --git a/data/src/test/java/org/apache/iceberg/parquet/TestGenericMergingMetrics.java b/data/src/test/java/org/apache/iceberg/TestGenericMergingMetrics.java similarity index 92% rename from data/src/test/java/org/apache/iceberg/parquet/TestGenericMergingMetrics.java rename to data/src/test/java/org/apache/iceberg/TestGenericMergingMetrics.java index 72db36a9908c..e7181feadaf5 100644 --- a/data/src/test/java/org/apache/iceberg/parquet/TestGenericMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/TestGenericMergingMetrics.java @@ -17,12 +17,10 @@ * under the License. */ -package org.apache.iceberg.parquet; +package org.apache.iceberg; import java.io.IOException; import java.util.List; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppender; From 37a86810f26dbeb7693f78d40f10cec4694165e0 Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Tue, 15 Dec 2020 22:21:15 -0800 Subject: [PATCH 7/7] fix timestamp type to use microseconds instead --- .../org/apache/iceberg/spark/source/StructInternalRow.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java b/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java index 23208a8b126c..13f7595a74cf 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java @@ -21,12 +21,13 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; import java.time.LocalDate; import java.time.OffsetDateTime; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; import org.apache.iceberg.StructLike; @@ -138,7 +139,7 @@ public long getLong(int ordinal) { if (longVal instanceof Long) { return (long) longVal; } else if (longVal instanceof OffsetDateTime) { - return TimeUnit.SECONDS.toDays(((OffsetDateTime) longVal).toEpochSecond()); + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; } else if (longVal instanceof LocalDate) { return ((LocalDate) longVal).toEpochDay(); } else {