diff --git a/api/src/main/java/org/apache/iceberg/FieldMetrics.java b/core/src/main/java/org/apache/iceberg/FieldMetrics.java
similarity index 100%
rename from api/src/main/java/org/apache/iceberg/FieldMetrics.java
rename to core/src/main/java/org/apache/iceberg/FieldMetrics.java
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFieldMetrics.java b/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java
similarity index 54%
rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetFieldMetrics.java
rename to core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java
index 708785a5b627..873138750685 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFieldMetrics.java
+++ b/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java
@@ -17,51 +17,46 @@
* under the License.
*/
-package org.apache.iceberg.parquet;
+package org.apache.iceberg;
import java.nio.ByteBuffer;
-import org.apache.iceberg.FieldMetrics;
/**
- * Iceberg internally tracked field level metrics, used by Parquet writer only.
+ * Iceberg internally tracked field level metrics, used by Parquet and ORC writers only.
*
- * Parquet keeps track of most metrics in its footer, and only NaN counter is actually tracked by writers.
- * This wrapper ensures that metrics not being updated by Parquet writers will not be incorrectly used, by throwing
+ * Parquet/ORC keeps track of most metrics in file statistics, and only NaN counter is actually tracked by writers.
+ * This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing
* exceptions when they are accessed.
*/
-public class ParquetFieldMetrics extends FieldMetrics {
+public class FloatFieldMetrics extends FieldMetrics {
/**
- * Constructor for creating a Parquet-specific FieldMetrics.
+ * Constructor for creating a FieldMetrics with only NaN counter.
* @param id field id being tracked by the writer
* @param nanValueCount number of NaN values, will only be non-0 for double or float field.
*/
- public ParquetFieldMetrics(int id,
- long nanValueCount) {
+ public FloatFieldMetrics(int id,
+ long nanValueCount) {
super(id, 0L, 0L, nanValueCount, null, null);
}
@Override
public long valueCount() {
- throw new IllegalStateException(
- "Shouldn't access valueCount() within ParquetFieldMetrics, as this metric is tracked by Parquet footer. ");
+ throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
}
@Override
public long nullValueCount() {
- throw new IllegalStateException(
- "Shouldn't access nullValueCount() within ParquetFieldMetrics, as this metric is tracked by Parquet footer. ");
+ throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
}
@Override
public ByteBuffer lowerBound() {
- throw new IllegalStateException(
- "Shouldn't access lowerBound() within ParquetFieldMetrics, as this metric is tracked by Parquet footer. ");
+ throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
}
@Override
public ByteBuffer upperBound() {
- throw new IllegalStateException(
- "Shouldn't access upperBound() within ParquetFieldMetrics, as this metric is tracked by Parquet footer. ");
+ throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
}
}
diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
new file mode 100644
index 000000000000..5f5b1c659920
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class MetricsUtil {
+
+ private MetricsUtil() {
+ }
+
+ /**
+ * Construct mapping relationship between column id to NaN value counts from input metrics and metrics config.
+ */
+ public static Map createNanValueCounts(
+ Stream fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) {
+ Preconditions.checkNotNull(metricsConfig, "metricsConfig is required");
+
+ if (fieldMetrics == null || inputSchema == null) {
+ return Maps.newHashMap();
+ }
+
+ return fieldMetrics
+ .filter(metrics -> metricsMode(inputSchema, metricsConfig, metrics.id()) != MetricsModes.None.get())
+ .collect(Collectors.toMap(FieldMetrics::id, FieldMetrics::nanValueCount));
+ }
+
+ /**
+ * Extract MetricsMode for the given field id from metrics config.
+ */
+ public static MetricsModes.MetricsMode metricsMode(Schema inputSchema, MetricsConfig metricsConfig, int fieldId) {
+ Preconditions.checkNotNull(inputSchema, "inputSchema is required");
+ Preconditions.checkNotNull(metricsConfig, "metricsConfig is required");
+
+ String columnName = inputSchema.findColumnName(fieldId);
+ return metricsConfig.columnMode(columnName);
+ }
+
+}
diff --git a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java b/data/src/test/java/org/apache/iceberg/TestGenericMergingMetrics.java
similarity index 83%
rename from data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java
rename to data/src/test/java/org/apache/iceberg/TestGenericMergingMetrics.java
index 03118e3a65f2..e7181feadaf5 100644
--- a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMergingMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/TestGenericMergingMetrics.java
@@ -17,22 +17,24 @@
* under the License.
*/
-package org.apache.iceberg.parquet;
+package org.apache.iceberg;
import java.io.IOException;
import java.util.List;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.TestMergingMetrics;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileAppender;
-public class TestParquetMergingMetrics extends TestMergingMetrics {
+public class TestGenericMergingMetrics extends TestMergingMetrics {
+
+ public TestGenericMergingMetrics(FileFormat fileFormat) {
+ super(fileFormat);
+ }
@Override
protected FileAppender writeAndGetAppender(List records) throws IOException {
FileAppender appender = new GenericAppenderFactory(SCHEMA).newAppender(
- org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET);
+ org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat);
try (FileAppender fileAppender = appender) {
records.forEach(fileAppender::add);
}
diff --git a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java
index 5258d8b2997f..96217229d879 100644
--- a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java
@@ -32,10 +32,13 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
+@RunWith(Parameterized.class)
public abstract class TestMergingMetrics {
// all supported fields, except for UUID which is on deprecation path: see https://github.com/apache/iceberg/pull/1611
@@ -83,6 +86,17 @@ public abstract class TestMergingMetrics {
STRUCT_FIELD
);
+ protected final FileFormat fileFormat;
+
+ @Parameterized.Parameters(name = "fileFormat = {0}")
+ public static Object[] parameters() {
+ return new Object[] {FileFormat.PARQUET };
+ }
+
+ public TestMergingMetrics(FileFormat fileFormat) {
+ this.fileFormat = fileFormat;
+ }
+
protected abstract FileAppender writeAndGetAppender(List records) throws Exception;
@Rule
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
index cd0a89ba7c76..1670ed733421 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
@@ -35,13 +35,17 @@
public class TestFlinkMergingMetrics extends TestMergingMetrics {
+ public TestFlinkMergingMetrics(FileFormat fileFormat) {
+ super(fileFormat);
+ }
+
@Override
protected FileAppender writeAndGetAppender(List records) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(SCHEMA);
FileAppender appender =
new FlinkAppenderFactory(SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned())
- .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET);
+ .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat);
try (FileAppender fileAppender = appender) {
records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add);
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index 9bc3404dcf1b..3337d2f78ff5 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -28,13 +28,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.MetricsModes.MetricsMode;
+import org.apache.iceberg.MetricsUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Literal;
@@ -113,8 +113,7 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream getNanValueCounts(
- Stream fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) {
- if (fieldMetrics == null || inputSchema == null) {
- return Maps.newHashMap();
- }
-
- return fieldMetrics
- .filter(metrics -> {
- String columnName = inputSchema.findColumnName(metrics.id());
- MetricsMode metricsMode = metricsConfig.columnMode(columnName);
- return metricsMode != MetricsModes.None.get();
- })
- .collect(Collectors.toMap(FieldMetrics::id, FieldMetrics::nanValueCount));
- }
-
private static MessageType getParquetTypeWithIds(ParquetMetadata metadata, NameMapping nameMapping) {
MessageType type = metadata.getFileMetaData().getSchema();
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java
index 7d77996e9968..5413b3379387 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java
@@ -31,6 +31,7 @@
import java.util.stream.Stream;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.FieldMetrics;
+import org.apache.iceberg.FloatFieldMetrics;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -188,7 +189,7 @@ public void write(int repetitionLevel, Float value) {
@Override
public Stream metrics() {
- return Stream.of(new ParquetFieldMetrics(id, nanCount));
+ return Stream.of(new FloatFieldMetrics(id, nanCount));
}
}
@@ -212,7 +213,7 @@ public void write(int repetitionLevel, Double value) {
@Override
public Stream metrics() {
- return Stream.of(new ParquetFieldMetrics(id, nanCount));
+ return Stream.of(new FloatFieldMetrics(id, nanCount));
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java b/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
index 964c1293098f..13f7595a74cf 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
@@ -21,6 +21,10 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -42,6 +46,7 @@
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
@@ -52,6 +57,7 @@
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
@@ -115,12 +121,30 @@ public short getShort(int ordinal) {
@Override
public int getInt(int ordinal) {
- return struct.get(ordinal, Integer.class);
+ Object integer = struct.get(ordinal, Object.class);
+
+ if (integer instanceof Integer) {
+ return (int) integer;
+ } else if (integer instanceof LocalDate) {
+ return (int) ((LocalDate) integer).toEpochDay();
+ } else {
+ throw new IllegalStateException("Unknown type for int field. Type name: " + integer.getClass().getName());
+ }
}
@Override
public long getLong(int ordinal) {
- return struct.get(ordinal, Long.class);
+ Object longVal = struct.get(ordinal, Object.class);
+
+ if (longVal instanceof Long) {
+ return (long) longVal;
+ } else if (longVal instanceof OffsetDateTime) {
+ return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000;
+ } else if (longVal instanceof LocalDate) {
+ return ((LocalDate) longVal).toEpochDay();
+ } else {
+ throw new IllegalStateException("Unknown type for long field. Type name: " + longVal.getClass().getName());
+ }
}
@Override
@@ -240,6 +264,10 @@ public Object get(int ordinal, DataType dataType) {
return getByte(ordinal);
} else if (dataType instanceof ShortType) {
return getShort(ordinal);
+ } else if (dataType instanceof DateType) {
+ return getInt(ordinal);
+ } else if (dataType instanceof TimestampType) {
+ return getLong(ordinal);
}
return null;
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java
similarity index 90%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java
rename to spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java
index e49a18be5338..16f25cc17ddc 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkParquetMergingMetrics.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMergingMetrics.java
@@ -29,13 +29,17 @@
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.catalyst.InternalRow;
-public class TestSparkParquetMergingMetrics extends TestMergingMetrics {
+public class TestSparkMergingMetrics extends TestMergingMetrics {
+
+ public TestSparkMergingMetrics(FileFormat fileFormat) {
+ super(fileFormat);
+ }
@Override
protected FileAppender writeAndGetAppender(List records) throws IOException {
FileAppender appender =
new SparkAppenderFactory(new HashMap<>(), SCHEMA, SparkSchemaUtil.convert(SCHEMA)).newAppender(
- org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET);
+ org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat);
try (FileAppender fileAppender = appender) {
records.stream().map(r -> new StructInternalRow(SCHEMA.asStruct()).setStruct(r)).forEach(fileAppender::add);
}