diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 02db808417a4..111ca9a7a40c 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -23,6 +23,7 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.metrics.MetricsReporter; /** Represents a table. */ public interface Table { @@ -346,4 +347,13 @@ default Snapshot snapshot(String name) { return null; } + + /** + * Returns the metrics reporter for this table. + * + * @return the metrics reporter for this table. + */ + default MetricsReporter metricsReporter() { + throw new UnsupportedOperationException("Accessing metrics reporter is not supported"); + } } diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java index 365f7f99d6f0..5f57a520d484 100644 --- a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java +++ b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java @@ -19,6 +19,7 @@ package org.apache.iceberg.metrics; import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; /** This interface defines the basic API for reporting metrics for operations to a Table. */ @FunctionalInterface @@ -33,6 +34,15 @@ public interface MetricsReporter { */ default void initialize(Map properties) {} + /** + * Return the properties for this metrics reporter + * + * @return the properties for this metrics reporter + */ + default Map properties() { + return ImmutableMap.of(); + } + /** * Indicates that an operation is done by reporting a {@link MetricsReport}. A {@link * MetricsReport} is usually directly derived from a {@link MetricsReport} instance. diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index b974b3548f45..73280323ba70 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -25,6 +25,7 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -213,4 +214,9 @@ public String toString() { final Object writeReplace() { return SerializableTable.copyOf(this); } + + @Override + public MetricsReporter metricsReporter() { + return table().metricsReporter(); + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index bb2e534ae739..03f6fd39bab9 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -262,4 +262,9 @@ public String toString() { Object writeReplace() { return SerializableTable.copyOf(this); } + + @Override + public MetricsReporter metricsReporter() { + return reporter; + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index cef487931b0e..b15c40f1e638 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -777,6 +777,11 @@ public String toString() { Object writeReplace() { return SerializableTable.copyOf(this); } + + @Override + public MetricsReporter metricsReporter() { + return BaseTransaction.this.reporter; + } } @VisibleForTesting diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index cf8e1b3fbaa7..e1bf261fab6a 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -26,6 +26,7 @@ import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; @@ -63,10 +64,13 @@ public class SerializableTable implements Table, Serializable { private final LocationProvider locationProvider; private final Map refs; + private final Map metricsReporterProperties; + private transient volatile Table lazyTable = null; private transient volatile Schema lazySchema = null; private transient volatile Map lazySpecs = null; private transient volatile SortOrder lazySortOrder = null; + private transient volatile MetricsReporter lazyMetricsReporter = null; protected SerializableTable(Table table) { this.name = table.name(); @@ -83,6 +87,7 @@ protected SerializableTable(Table table) { this.encryption = table.encryption(); this.locationProvider = table.locationProvider(); this.refs = SerializableMap.copyOf(table.refs()); + this.metricsReporterProperties = SerializableMap.copyOf(table.metricsReporter().properties()); } /** @@ -136,7 +141,7 @@ private Table lazyTable() { } protected Table newTable(TableOperations ops, String tableName) { - return new BaseTable(ops, tableName); + return new BaseTable(ops, tableName, metricsReporter()); } @Override @@ -247,6 +252,18 @@ public Map refs() { return refs; } + @Override + public MetricsReporter metricsReporter() { + if (lazyMetricsReporter == null) { + synchronized (this) { + if (lazyMetricsReporter == null) { + lazyMetricsReporter = CatalogUtil.loadMetricsReporter(this.metricsReporterProperties); + } + } + } + return lazyMetricsReporter; + } + @Override public void refresh() { throw new UnsupportedOperationException(errorMsg("refresh")); diff --git a/core/src/test/java/org/apache/iceberg/TestSerializableTable.java b/core/src/test/java/org/apache/iceberg/TestSerializableTable.java new file mode 100644 index 000000000000..01a053fddf8f --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSerializableTable.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.TestHelpers.roundTripSerialize; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestSerializableTable { + + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.LongType.get()), + optional(2, "data", Types.StringType.get()), + required(3, "date", Types.StringType.get()), + optional(4, "double", Types.DoubleType.get())); + + private static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("date").build(); + + private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build(); + + @TempDir private File temp; + + @AfterAll + public static void clean() { + TestTables.clearTables(); + } + + @Test + public void testSerializableTableWithMetricsReporter() + throws IOException, ClassNotFoundException { + Map properties = + ImmutableMap.of( + CatalogProperties.METRICS_REPORTER_IMPL, + TestMetricsReporter.class.getName(), + "key1", + "value1"); + MetricsReporter reporter = CatalogUtil.loadMetricsReporter(properties); + Table table = TestTables.create(temp, "tbl_A", SCHEMA, SPEC, SORT_ORDER, 2, reporter); + Table serializableTable = roundTripSerialize(SerializableTable.copyOf(table)); + assertSerializedMetricsReporter(reporter, serializableTable.metricsReporter()); + + serializableTable = TestHelpers.KryoHelpers.roundTripSerialize(SerializableTable.copyOf(table)); + assertSerializedMetricsReporter(reporter, serializableTable.metricsReporter()); + } + + private void assertSerializedMetricsReporter(MetricsReporter expected, MetricsReporter actual) { + Assertions.assertThat(actual).isNotNull().isInstanceOf(TestMetricsReporter.class); + Assertions.assertThat(actual.properties()).isEqualTo(expected.properties()); + Assertions.assertThat(actual.properties()).containsEntry("key1", "value1"); + } + + public static class TestMetricsReporter implements MetricsReporter { + private Map properties; + + @Override + public void initialize(Map props) { + this.properties = props; + } + + @Override + public Map properties() { + return properties; + } + + @Override + public void report(MetricsReport report) {} + } +}