Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.metrics.MetricsReporter;

/** Represents a table. */
public interface Table {
Expand Down Expand Up @@ -346,4 +347,13 @@ default Snapshot snapshot(String name) {

return null;
}

/**
* Returns the metrics reporter for this table.
*
* @return the metrics reporter for this table.
*/
default MetricsReporter metricsReporter() {
throw new UnsupportedOperationException("Accessing metrics reporter is not supported");
}
}
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.metrics;

import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

/** This interface defines the basic API for reporting metrics for operations to a Table. */
@FunctionalInterface
Expand All @@ -33,6 +34,15 @@ public interface MetricsReporter {
*/
default void initialize(Map<String, String> properties) {}

/**
* Return the properties for this metrics reporter
*
* @return the properties for this metrics reporter
*/
default Map<String, String> properties() {
return ImmutableMap.of();
}

/**
* Indicates that an operation is done by reporting a {@link MetricsReport}. A {@link
* MetricsReport} is usually directly derived from a {@link MetricsReport} instance.
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -213,4 +214,9 @@ public String toString() {
final Object writeReplace() {
return SerializableTable.copyOf(this);
}

@Override
public MetricsReporter metricsReporter() {
return table().metricsReporter();
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,9 @@ public String toString() {
Object writeReplace() {
return SerializableTable.copyOf(this);
}

@Override
public MetricsReporter metricsReporter() {
return reporter;
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,11 @@ public String toString() {
Object writeReplace() {
return SerializableTable.copyOf(this);
}

@Override
public MetricsReporter metricsReporter() {
return BaseTransaction.this.reporter;
}
}

@VisibleForTesting
Expand Down
19 changes: 18 additions & 1 deletion core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
Expand Down Expand Up @@ -63,10 +64,13 @@ public class SerializableTable implements Table, Serializable {
private final LocationProvider locationProvider;
private final Map<String, SnapshotRef> refs;

private final Map<String, String> metricsReporterProperties;

private transient volatile Table lazyTable = null;
private transient volatile Schema lazySchema = null;
private transient volatile Map<Integer, PartitionSpec> lazySpecs = null;
private transient volatile SortOrder lazySortOrder = null;
private transient volatile MetricsReporter lazyMetricsReporter = null;

protected SerializableTable(Table table) {
this.name = table.name();
Expand All @@ -83,6 +87,7 @@ protected SerializableTable(Table table) {
this.encryption = table.encryption();
this.locationProvider = table.locationProvider();
this.refs = SerializableMap.copyOf(table.refs());
this.metricsReporterProperties = SerializableMap.copyOf(table.metricsReporter().properties());
}

/**
Expand Down Expand Up @@ -136,7 +141,7 @@ private Table lazyTable() {
}

protected Table newTable(TableOperations ops, String tableName) {
return new BaseTable(ops, tableName);
return new BaseTable(ops, tableName, metricsReporter());
}

@Override
Expand Down Expand Up @@ -247,6 +252,18 @@ public Map<String, SnapshotRef> refs() {
return refs;
}

@Override
public MetricsReporter metricsReporter() {
Copy link
Contributor

@nastra nastra Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test that makes sure the metrics reporter is preserved with the right properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL

if (lazyMetricsReporter == null) {
synchronized (this) {
if (lazyMetricsReporter == null) {
lazyMetricsReporter = CatalogUtil.loadMetricsReporter(this.metricsReporterProperties);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metrics reporter is configured by catalog properties, not table properties. I add the interface method MetricsReporter.properties() to return the properties which used to load and initialize this MetricsReporter, therefor the SerializableTable can use this properties to load and initialize MetricsReporter agagin

}
}
}
return lazyMetricsReporter;
}

@Override
public void refresh() {
throw new UnsupportedOperationException(errorMsg("refresh"));
Expand Down
98 changes: 98 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestSerializableTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.apache.iceberg.TestHelpers.roundTripSerialize;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestSerializableTable {

private static final Schema SCHEMA =
new Schema(
required(1, "id", Types.LongType.get()),
optional(2, "data", Types.StringType.get()),
required(3, "date", Types.StringType.get()),
optional(4, "double", Types.DoubleType.get()));

private static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).identity("date").build();

private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build();

@TempDir private File temp;

@AfterAll
public static void clean() {
TestTables.clearTables();
}

@Test
public void testSerializableTableWithMetricsReporter()
throws IOException, ClassNotFoundException {
Map<String, String> properties =
ImmutableMap.of(
CatalogProperties.METRICS_REPORTER_IMPL,
TestMetricsReporter.class.getName(),
"key1",
"value1");
MetricsReporter reporter = CatalogUtil.loadMetricsReporter(properties);
Table table = TestTables.create(temp, "tbl_A", SCHEMA, SPEC, SORT_ORDER, 2, reporter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where we're testing that the reporter itself has the correct properties.

Shouldn't this do

Map<String, String> reporterProperties = ImmutableMap.of("a", "1", "b", "2");
reporter.initialize(reporterProperties);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reporter.initialize(reporterProperties) is called in the method CatalogUtil.loadMetricsReporter(properties)

Table serializableTable = roundTripSerialize(SerializableTable.copyOf(table));
assertSerializedMetricsReporter(reporter, serializableTable.metricsReporter());

serializableTable = TestHelpers.KryoHelpers.roundTripSerialize(SerializableTable.copyOf(table));
assertSerializedMetricsReporter(reporter, serializableTable.metricsReporter());
}

private void assertSerializedMetricsReporter(MetricsReporter expected, MetricsReporter actual) {
Assertions.assertThat(actual).isNotNull().isInstanceOf(TestMetricsReporter.class);
Assertions.assertThat(actual.properties()).isEqualTo(expected.properties());
Assertions.assertThat(actual.properties()).containsEntry("key1", "value1");
}

public static class TestMetricsReporter implements MetricsReporter {
private Map<String, String> properties;

@Override
public void initialize(Map<String, String> props) {
this.properties = props;
}

@Override
public Map<String, String> properties() {
return properties;
}

@Override
public void report(MetricsReport report) {}
}
}