Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,46 @@
* under the License.
*/

package org.apache.iceberg.parquet;
package org.apache.iceberg;

import java.nio.ByteBuffer;
import org.apache.iceberg.FieldMetrics;

/**
* Iceberg internally tracked field level metrics, used by Parquet writer only.
* Iceberg internally tracked field level metrics, used by Parquet and ORC writers only.
* <p>
* Parquet keeps track of most metrics in its footer, and only NaN counter is actually tracked by writers.
* This wrapper ensures that metrics not being updated by Parquet writers will not be incorrectly used, by throwing
* Parquet/ORC keeps track of most metrics in file statistics, and only NaN counter is actually tracked by writers.
* This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing
* exceptions when they are accessed.
*/
public class ParquetFieldMetrics extends FieldMetrics {
public class FloatFieldMetrics extends FieldMetrics {

/**
* Constructor for creating a Parquet-specific FieldMetrics.
* Constructor for creating a FieldMetrics with only NaN counter.
* @param id field id being tracked by the writer
* @param nanValueCount number of NaN values, will only be non-0 for double or float field.
*/
public ParquetFieldMetrics(int id,
long nanValueCount) {
public FloatFieldMetrics(int id,
long nanValueCount) {
super(id, 0L, 0L, nanValueCount, null, null);
}

@Override
public long valueCount() {
throw new IllegalStateException(
"Shouldn't access valueCount() within ParquetFieldMetrics, as this metric is tracked by Parquet footer. ");
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
}

@Override
public long nullValueCount() {
throw new IllegalStateException(
"Shouldn't access nullValueCount() within ParquetFieldMetrics, as this metric is tracked by Parquet footer. ");
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
}

@Override
public ByteBuffer lowerBound() {
throw new IllegalStateException(
"Shouldn't access lowerBound() within ParquetFieldMetrics, as this metric is tracked by Parquet footer. ");
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
}

@Override
public ByteBuffer upperBound() {
throw new IllegalStateException(
"Shouldn't access upperBound() within ParquetFieldMetrics, as this metric is tracked by Parquet footer. ");
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
}
}
60 changes: 60 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetricsUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class MetricsUtil {

private MetricsUtil() {
}

/**
* Construct mapping relationship between column id to NaN value counts from input metrics and metrics config.
*/
public static Map<Integer, Long> createNanValueCounts(
Stream<FieldMetrics> fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) {
Preconditions.checkNotNull(metricsConfig, "metricsConfig is required");

if (fieldMetrics == null || inputSchema == null) {
return Maps.newHashMap();
}

return fieldMetrics
.filter(metrics -> metricsMode(inputSchema, metricsConfig, metrics.id()) != MetricsModes.None.get())
.collect(Collectors.toMap(FieldMetrics::id, FieldMetrics::nanValueCount));
}

/**
* Extract MetricsMode for the given field id from metrics config.
*/
public static MetricsModes.MetricsMode metricsMode(Schema inputSchema, MetricsConfig metricsConfig, int fieldId) {
Preconditions.checkNotNull(inputSchema, "inputSchema is required");
Preconditions.checkNotNull(metricsConfig, "metricsConfig is required");

String columnName = inputSchema.findColumnName(fieldId);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inputschema can be null at this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a validation check to ensure they are not null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Let's add checks to fail when metricsConfig or inputSchema is null. This is a useful method so I think it is worth keeping it public, but we would ideally not fail with a NullPointerException. This is not called in a tight loop, so it should be fine to add the checks on each call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry I overwrote my own earlier change that addressed this when I renamed this file...

return metricsConfig.columnMode(columnName);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for metrics config.

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@
* under the License.
*/

package org.apache.iceberg.parquet;
package org.apache.iceberg;

import java.io.IOException;
import java.util.List;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.TestMergingMetrics;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileAppender;

public class TestParquetMergingMetrics extends TestMergingMetrics<Record> {
public class TestGenericMergingMetrics extends TestMergingMetrics<Record> {

public TestGenericMergingMetrics(FileFormat fileFormat) {
super(fileFormat);
}

@Override
protected FileAppender<Record> writeAndGetAppender(List<Record> records) throws IOException {
FileAppender<Record> appender = new GenericAppenderFactory(SCHEMA).newAppender(
org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET);
org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat);
try (FileAppender<Record> fileAppender = appender) {
records.forEach(fileAppender::add);
}
Expand Down
14 changes: 14 additions & 0 deletions data/src/test/java/org/apache/iceberg/TestMergingMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

@RunWith(Parameterized.class)
public abstract class TestMergingMetrics<T> {

// all supported fields, except for UUID which is on deprecation path: see https://github.com/apache/iceberg/pull/1611
Expand Down Expand Up @@ -83,6 +86,17 @@ public abstract class TestMergingMetrics<T> {
STRUCT_FIELD
);

protected final FileFormat fileFormat;

@Parameterized.Parameters(name = "fileFormat = {0}")
public static Object[] parameters() {
return new Object[] {FileFormat.PARQUET };
}

public TestMergingMetrics(FileFormat fileFormat) {
this.fileFormat = fileFormat;
}

protected abstract FileAppender<T> writeAndGetAppender(List<Record> records) throws Exception;

@Rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@

public class TestFlinkMergingMetrics extends TestMergingMetrics<RowData> {

public TestFlinkMergingMetrics(FileFormat fileFormat) {
super(fileFormat);
}

@Override
protected FileAppender<RowData> writeAndGetAppender(List<Record> records) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(SCHEMA);

FileAppender<RowData> appender =
new FlinkAppenderFactory(SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned())
.newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET);
.newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat);
try (FileAppender<RowData> fileAppender = appender) {
records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add);
}
Expand Down
22 changes: 3 additions & 19 deletions parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.MetricsModes.MetricsMode;
import org.apache.iceberg.MetricsUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Literal;
Expand Down Expand Up @@ -113,8 +113,7 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetric

increment(columnSizes, fieldId, column.getTotalSize());

String columnName = fileSchema.findColumnName(fieldId);
MetricsMode metricsMode = metricsConfig.columnMode(columnName);
MetricsMode metricsMode = MetricsUtil.metricsMode(fileSchema, metricsConfig, fieldId);
if (metricsMode == MetricsModes.None.get()) {
continue;
}
Expand Down Expand Up @@ -149,25 +148,10 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetric
}

return new Metrics(rowCount, columnSizes, valueCounts, nullValueCounts,
getNanValueCounts(fieldMetrics, metricsConfig, fileSchema),
MetricsUtil.createNanValueCounts(fieldMetrics, metricsConfig, fileSchema),
toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, upperBounds));
}

private static Map<Integer, Long> getNanValueCounts(
Stream<FieldMetrics> fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) {
if (fieldMetrics == null || inputSchema == null) {
return Maps.newHashMap();
}

return fieldMetrics
.filter(metrics -> {
String columnName = inputSchema.findColumnName(metrics.id());
MetricsMode metricsMode = metricsConfig.columnMode(columnName);
return metricsMode != MetricsModes.None.get();
})
.collect(Collectors.toMap(FieldMetrics::id, FieldMetrics::nanValueCount));
}

private static MessageType getParquetTypeWithIds(ParquetMetadata metadata, NameMapping nameMapping) {
MessageType type = metadata.getFileMetaData().getSchema();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.Stream;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.FloatFieldMetrics;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -188,7 +189,7 @@ public void write(int repetitionLevel, Float value) {

@Override
public Stream<FieldMetrics> metrics() {
return Stream.of(new ParquetFieldMetrics(id, nanCount));
return Stream.of(new FloatFieldMetrics(id, nanCount));
}
}

Expand All @@ -212,7 +213,7 @@ public void write(int repetitionLevel, Double value) {

@Override
public Stream<FieldMetrics> metrics() {
return Stream.of(new ParquetFieldMetrics(id, nanCount));
return Stream.of(new FloatFieldMetrics(id, nanCount));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +46,7 @@
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
Expand All @@ -52,6 +57,7 @@
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

Expand Down Expand Up @@ -115,12 +121,30 @@ public short getShort(int ordinal) {

@Override
public int getInt(int ordinal) {
return struct.get(ordinal, Integer.class);
Object integer = struct.get(ordinal, Object.class);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for changing this class is discussed in this thread


if (integer instanceof Integer) {
return (int) integer;
} else if (integer instanceof LocalDate) {
return (int) ((LocalDate) integer).toEpochDay();
} else {
throw new IllegalStateException("Unknown type for int field. Type name: " + integer.getClass().getName());
}
}

@Override
public long getLong(int ordinal) {
return struct.get(ordinal, Long.class);
Object longVal = struct.get(ordinal, Object.class);

if (longVal instanceof Long) {
return (long) longVal;
} else if (longVal instanceof OffsetDateTime) {
return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000;
} else if (longVal instanceof LocalDate) {
return ((LocalDate) longVal).toEpochDay();
} else {
throw new IllegalStateException("Unknown type for long field. Type name: " + longVal.getClass().getName());
}
}

@Override
Expand Down Expand Up @@ -240,6 +264,10 @@ public Object get(int ordinal, DataType dataType) {
return getByte(ordinal);
} else if (dataType instanceof ShortType) {
return getShort(ordinal);
} else if (dataType instanceof DateType) {
return getInt(ordinal);
} else if (dataType instanceof TimestampType) {
return getLong(ordinal);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.catalyst.InternalRow;

public class TestSparkParquetMergingMetrics extends TestMergingMetrics<InternalRow> {
public class TestSparkMergingMetrics extends TestMergingMetrics<InternalRow> {

public TestSparkMergingMetrics(FileFormat fileFormat) {
super(fileFormat);
}

@Override
protected FileAppender<InternalRow> writeAndGetAppender(List<Record> records) throws IOException {
FileAppender<InternalRow> appender =
new SparkAppenderFactory(new HashMap<>(), SCHEMA, SparkSchemaUtil.convert(SCHEMA)).newAppender(
org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.PARQUET);
org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat);
try (FileAppender<InternalRow> fileAppender = appender) {
records.stream().map(r -> new StructInternalRow(SCHEMA.asStruct()).setStruct(r)).forEach(fileAppender::add);
}
Expand Down