diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 4bfde0b516c1..40e1c7a63b93 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -460,6 +460,28 @@ acceptedBreaks: new: "parameter void org.apache.iceberg.actions.BaseCommitService::offer(===T===)\ \ @ org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService" justification: "Backwards compatible parameterization of argument" + - code: "java.method.parameterTypeChanged" + old: "parameter void org.apache.iceberg.rest.RESTCatalog::(===java.util.function.Function, org.apache.iceberg.rest.RESTClient>===)" + new: "parameter void org.apache.iceberg.rest.RESTCatalog::(===org.apache.iceberg.util.SerializableFunction, org.apache.iceberg.rest.RESTClient>===)" + justification: "Switching to SerializableFunction" + - code: "java.method.parameterTypeChanged" + old: "parameter void org.apache.iceberg.rest.RESTCatalog::(org.apache.iceberg.catalog.SessionCatalog.SessionContext,\ + \ ===java.util.function.Function,\ + \ org.apache.iceberg.rest.RESTClient>===)" + new: "parameter void org.apache.iceberg.rest.RESTCatalog::(org.apache.iceberg.catalog.SessionCatalog.SessionContext,\ + \ ===org.apache.iceberg.util.SerializableFunction, org.apache.iceberg.rest.RESTClient>===)" + justification: "Switching to SerializableFunction" + - code: "java.method.parameterTypeChanged" + old: "parameter void org.apache.iceberg.rest.RESTSessionCatalog::(===java.util.function.Function, org.apache.iceberg.rest.RESTClient>===, java.util.function.BiFunction, org.apache.iceberg.io.FileIO>)" + new: "parameter void org.apache.iceberg.rest.RESTSessionCatalog::(===org.apache.iceberg.util.SerializableFunction, org.apache.iceberg.rest.RESTClient>===, java.util.function.BiFunction, org.apache.iceberg.io.FileIO>)" + justification: "Switching to SerializableFunction" - code: "java.method.removed" old: "method ThisT org.apache.iceberg.BaseScan>::newRefinedScan(org.apache.iceberg.TableOperations,\ diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 02db808417a4..111ca9a7a40c 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -23,6 +23,7 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.metrics.MetricsReporter; /** Represents a table. */ public interface Table { @@ -346,4 +347,13 @@ default Snapshot snapshot(String name) { return null; } + + /** + * Returns the metrics reporter for this table. + * + * @return the metrics reporter for this table. + */ + default MetricsReporter metricsReporter() { + throw new UnsupportedOperationException("Accessing metrics reporter is not supported"); + } } diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java index 365f7f99d6f0..1ba5d14cdba6 100644 --- a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java +++ b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java @@ -18,11 +18,12 @@ */ package org.apache.iceberg.metrics; +import java.io.Serializable; import java.util.Map; /** This interface defines the basic API for reporting metrics for operations to a Table. */ @FunctionalInterface -public interface MetricsReporter { +public interface MetricsReporter extends Serializable { /** * A custom MetricsReporter implementation must have a no-arg constructor, which will be called diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index 5f7c48e95867..8850345627cc 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -25,6 +25,7 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -204,6 +205,11 @@ public String toString() { return name(); } + @Override + public MetricsReporter metricsReporter() { + return table().metricsReporter(); + } + final Object writeReplace() { return SerializableTable.copyOf(this); } diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index b9ed4f8d67ce..b34532a10c05 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -263,4 +263,9 @@ public String toString() { Object writeReplace() { return SerializableTable.copyOf(this); } + + @Override + public MetricsReporter metricsReporter() { + return reporter; + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 61da776f4c44..437bcfe1c8d3 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -773,6 +773,11 @@ public Map refs() { return current.refs(); } + @Override + public MetricsReporter metricsReporter() { + return BaseTransaction.this.reporter; + } + @Override public String toString() { return name(); diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index cf8e1b3fbaa7..3835f52648e9 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -26,6 +26,7 @@ import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; @@ -62,6 +63,7 @@ public class SerializableTable implements Table, Serializable { private final EncryptionManager encryption; private final LocationProvider locationProvider; private final Map refs; + private final MetricsReporter metricsReporter; private transient volatile Table lazyTable = null; private transient volatile Schema lazySchema = null; @@ -83,6 +85,7 @@ protected SerializableTable(Table table) { this.encryption = table.encryption(); this.locationProvider = table.locationProvider(); this.refs = SerializableMap.copyOf(table.refs()); + this.metricsReporter = table.metricsReporter(); } /** @@ -136,7 +139,7 @@ private Table lazyTable() { } protected Table newTable(TableOperations ops, String tableName) { - return new BaseTable(ops, tableName); + return new BaseTable(ops, tableName, metricsReporter()); } @Override @@ -247,6 +250,11 @@ public Map refs() { return refs; } + @Override + public MetricsReporter metricsReporter() { + return metricsReporter; + } + @Override public void refresh() { throw new UnsupportedOperationException(errorMsg("refresh")); diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java index f7369224ee29..bfedb2f9e5b2 100644 --- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java +++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java @@ -27,6 +27,7 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopConfigurable; +import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.SerializableConfiguration; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -131,6 +132,9 @@ private FileIO io(String location) { String impl = implFromLocation(location); FileIO io = ioInstances.get(impl); if (io != null) { + if (io instanceof HadoopFileIO && ((HadoopFileIO) io).conf() == null) { + ((HadoopFileIO) io).setConf(hadoopConf.get()); + } return io; } diff --git a/core/src/main/java/org/apache/iceberg/metrics/MetricsReporters.java b/core/src/main/java/org/apache/iceberg/metrics/MetricsReporters.java index 053823fe2abc..f5efba41f575 100644 --- a/core/src/main/java/org/apache/iceberg/metrics/MetricsReporters.java +++ b/core/src/main/java/org/apache/iceberg/metrics/MetricsReporters.java @@ -18,10 +18,10 @@ */ package org.apache.iceberg.metrics; -import java.util.Collections; import java.util.Set; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.SerializableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,14 +52,14 @@ public static MetricsReporter combine(MetricsReporter first, MetricsReporter sec reporters.add(second); } - return new CompositeMetricsReporter(reporters); + return new CompositeMetricsReporter(SerializableSet.copyOf(reporters)); } @VisibleForTesting static class CompositeMetricsReporter implements MetricsReporter { - private final Set reporters; + private final SerializableSet reporters; - private CompositeMetricsReporter(Set reporters) { + private CompositeMetricsReporter(SerializableSet reporters) { this.reporters = reporters; } @@ -79,7 +79,7 @@ public void report(MetricsReport report) { } Set reporters() { - return Collections.unmodifiableSet(reporters); + return reporters.immutableSet(); } } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java index 63b660c46aa3..4ad55b28464d 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -40,6 +39,7 @@ import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.SerializableFunction; public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable, Closeable { private final RESTSessionCatalog sessionCatalog; @@ -53,13 +53,13 @@ public RESTCatalog() { config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build()); } - public RESTCatalog(Function, RESTClient> clientBuilder) { + public RESTCatalog(SerializableFunction, RESTClient> clientBuilder) { this(SessionCatalog.SessionContext.createEmpty(), clientBuilder); } public RESTCatalog( SessionCatalog.SessionContext context, - Function, RESTClient> clientBuilder) { + SerializableFunction, RESTClient> clientBuilder) { this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null); this.delegate = sessionCatalog.asCatalog(context); this.nsDelegate = (SupportsNamespaces) delegate; diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTMetricsReporter.java b/core/src/main/java/org/apache/iceberg/rest/RESTMetricsReporter.java index 4ceb86f32c4a..662e0cecc894 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTMetricsReporter.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTMetricsReporter.java @@ -19,25 +19,35 @@ package org.apache.iceberg.rest; import java.util.Map; -import java.util.function.Supplier; import org.apache.iceberg.metrics.MetricsReport; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.rest.requests.ReportMetricsRequest; +import org.apache.iceberg.util.SerializableFunction; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.SerializableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; class RESTMetricsReporter implements MetricsReporter { private static final Logger LOG = LoggerFactory.getLogger(RESTMetricsReporter.class); - private final RESTClient client; + private transient volatile RESTClient client; private final String metricsEndpoint; - private final Supplier> headers; + private final SerializableSupplier> headers; + private final SerializableFunction, RESTClient> clientBuilder; + private final SerializableMap properties; RESTMetricsReporter( - RESTClient client, String metricsEndpoint, Supplier> headers) { + RESTClient client, + String metricsEndpoint, + SerializableSupplier> headers, + SerializableFunction, RESTClient> clientBuilder, + SerializableMap properties) { this.client = client; this.metricsEndpoint = metricsEndpoint; this.headers = headers; + this.clientBuilder = clientBuilder; + this.properties = properties; } @Override @@ -48,14 +58,36 @@ public void report(MetricsReport report) { } try { - client.post( - metricsEndpoint, - ReportMetricsRequest.of(report), - null, - headers, - ErrorHandlers.defaultErrorHandler()); + client() + .post( + metricsEndpoint, + ReportMetricsRequest.of(report), + null, + headers, + ErrorHandlers.defaultErrorHandler()); } catch (Exception e) { LOG.warn("Failed to report metrics to REST endpoint {}", metricsEndpoint, e); } } + + private RESTClient client() { + // lazy init the client in case RESTMetricsReporter was deserialized + if (null == client) { + synchronized (this) { + if (null == client) { + client = clientBuilder.apply(properties); + } + } + } + + return client; + } + + Object writeReplace() { + // fetch the latest headers from the AuthSession and carry them over in a separate supplier so + // that AuthSession doesn't have to be Serializable + Map authHeaders = headers.get(); + return new RESTMetricsReporter( + client, metricsEndpoint, () -> authHeaders, clientBuilder, properties); + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 884e35dd6251..6ee26a60e063 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -33,7 +33,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.function.Supplier; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; @@ -84,6 +83,9 @@ import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.iceberg.util.EnvironmentUtil; import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SerializableFunction; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.SerializableSupplier; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,7 +104,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog OAuth2Properties.SAML2_TOKEN_TYPE, OAuth2Properties.SAML1_TOKEN_TYPE); - private final Function, RESTClient> clientBuilder; + private final SerializableFunction, RESTClient> clientBuilder; private final BiFunction, FileIO> ioBuilder; private Cache sessions = null; private Cache fileIOCloser; @@ -134,7 +136,7 @@ public RESTSessionCatalog() { } public RESTSessionCatalog( - Function, RESTClient> clientBuilder, + SerializableFunction, RESTClient> clientBuilder, BiFunction, FileIO> ioBuilder) { Preconditions.checkNotNull(clientBuilder, "Invalid client builder: null"); this.clientBuilder = clientBuilder; @@ -382,11 +384,17 @@ private void trackFileIO(RESTTableOperations ops) { } private MetricsReporter metricsReporter( - String metricsEndpoint, Supplier> headers) { + String metricsEndpoint, SerializableSupplier> headers) { if (reportingViaRestEnabled) { RESTMetricsReporter restMetricsReporter = - new RESTMetricsReporter(client, metricsEndpoint, headers); - return MetricsReporters.combine(reporter, restMetricsReporter); + new RESTMetricsReporter( + client, + metricsEndpoint, + headers, + clientBuilder, + SerializableMap.copyOf(super.properties())); + + return MetricsReporters.combine(this.reporter, restMetricsReporter); } else { return this.reporter; } diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java index 678642830cda..192c11e19a24 100644 --- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java +++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java @@ -48,6 +48,7 @@ import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.util.JsonUtil; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -354,7 +355,7 @@ public static class AuthSession { private static int tokenRefreshNumRetries = 5; private static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes private static final long MIN_REFRESH_WAIT_MILLIS = 10; - private volatile Map headers; + private volatile SerializableMap headers; private volatile String token; private volatile String tokenType; private volatile Long expiresAtMillis; @@ -368,7 +369,7 @@ public AuthSession( String tokenType, String credential, String scope) { - this.headers = RESTUtil.merge(baseHeaders, authHeaders(token)); + this.headers = SerializableMap.copyOf(RESTUtil.merge(baseHeaders, authHeaders(token))); this.token = token; this.tokenType = tokenType; this.expiresAtMillis = OAuth2Util.expiresAtMillis(token); @@ -377,7 +378,7 @@ public AuthSession( } public Map headers() { - return headers; + return headers.immutableMap(); } public String token() { @@ -454,7 +455,7 @@ public Pair refresh(RESTClient client) { this.token = response.token(); this.tokenType = response.issuedTokenType(); this.expiresAtMillis = OAuth2Util.expiresAtMillis(token); - this.headers = RESTUtil.merge(headers, authHeaders(token)); + this.headers = SerializableMap.copyOf(RESTUtil.merge(headers, authHeaders(token))); if (response.expiresInSeconds() != null) { return Pair.of(response.expiresInSeconds(), TimeUnit.SECONDS); diff --git a/core/src/main/java/org/apache/iceberg/util/SerializableSet.java b/core/src/main/java/org/apache/iceberg/util/SerializableSet.java new file mode 100644 index 000000000000..63d32c2ce2c6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/SerializableSet.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; + +public class SerializableSet implements Set, Serializable { + private final Set copiedSet; + private transient volatile Set immutableSet; + + SerializableSet() { + this.copiedSet = Sets.newHashSet(); + } + + private SerializableSet(Set set) { + this.copiedSet = Sets.newHashSet(); + this.copiedSet.addAll(set); + } + + public static SerializableSet copyOf(Set set) { + return set == null ? null : new SerializableSet<>(set); + } + + public Set immutableSet() { + if (null == immutableSet) { + synchronized (this) { + if (null == immutableSet) { + immutableSet = Collections.unmodifiableSet(copiedSet); + } + } + } + + return immutableSet; + } + + @Override + public int size() { + return copiedSet.size(); + } + + @Override + public boolean isEmpty() { + return copiedSet.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return copiedSet.contains(o); + } + + @Override + public Iterator iterator() { + return copiedSet.iterator(); + } + + @Override + public Object[] toArray() { + return copiedSet.toArray(); + } + + @Override + public T[] toArray(T[] a) { + return copiedSet.toArray(a); + } + + @Override + public boolean add(E e) { + return copiedSet.add(e); + } + + @Override + public boolean remove(Object o) { + return copiedSet.remove(o); + } + + @Override + public boolean containsAll(Collection c) { + return copiedSet.containsAll(c); + } + + @Override + public boolean addAll(Collection c) { + return copiedSet.addAll(c); + } + + @Override + public boolean retainAll(Collection c) { + return copiedSet.retainAll(c); + } + + @Override + public boolean removeAll(Collection c) { + return copiedSet.removeAll(c); + } + + @Override + public void clear() { + copiedSet.clear(); + } + + @Override + public boolean equals(Object o) { + return copiedSet.equals(o); + } + + @Override + public int hashCode() { + return copiedSet.hashCode(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestSerializableTable.java b/core/src/test/java/org/apache/iceberg/TestSerializableTable.java new file mode 100644 index 000000000000..59097536dc51 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSerializableTable.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestSerializableTable { + + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.LongType.get()), + optional(2, "data", Types.StringType.get()), + required(3, "date", Types.StringType.get()), + optional(4, "double", Types.DoubleType.get())); + + private static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("date").build(); + + private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build(); + + @TempDir private File temp; + + @AfterAll + public static void clean() { + TestTables.clearTables(); + } + + @Test + public void testSerializableTableWithMetricsReporter() + throws IOException, ClassNotFoundException { + Map properties = + ImmutableMap.of( + CatalogProperties.METRICS_REPORTER_IMPL, + TestMetricsReporter.class.getName(), + "key1", + "value1"); + MetricsReporter reporter = CatalogUtil.loadMetricsReporter(properties); + Table table = TestTables.create(temp, "tbl_A", SCHEMA, SPEC, SORT_ORDER, 2, reporter); + Table serializableTable = TestHelpers.roundTripSerialize(SerializableTable.copyOf(table)); + assertThat(serializableTable.metricsReporter()) + .isNotNull() + .isInstanceOf(TestMetricsReporter.class); + + serializableTable = TestHelpers.KryoHelpers.roundTripSerialize(SerializableTable.copyOf(table)); + assertThat(serializableTable.metricsReporter()) + .isNotNull() + .isInstanceOf(TestMetricsReporter.class); + } + + public static class TestMetricsReporter implements MetricsReporter { + + @Override + public void report(MetricsReport report) {} + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 4695ac1b0f1a..f3958378716a 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -45,9 +45,11 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; @@ -70,6 +72,7 @@ import org.apache.iceberg.rest.auth.AuthSessionUtil; import org.apache.iceberg.rest.auth.OAuth2Properties; import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; @@ -97,6 +100,7 @@ public class TestRESTCatalog extends CatalogTests { private RESTCatalog restCatalog; private JdbcCatalog backendCatalog; + private RESTCatalogAdapter adaptor; private Server httpServer; @BeforeEach @@ -123,34 +127,35 @@ public void createCatalog() throws Exception { Map contextHeaders = ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=user"); - RESTCatalogAdapter adaptor = - new RESTCatalogAdapter(backendCatalog) { - @Override - public T execute( - RESTCatalogAdapter.HTTPMethod method, - String path, - Map queryParams, - Object body, - Class responseType, - Map headers, - Consumer errorHandler) { - // this doesn't use a Mockito spy because this is used for catalog tests, which have - // different method calls - if (!"v1/oauth/tokens".equals(path)) { - if ("v1/config".equals(path)) { - assertThat(headers).containsAllEntriesOf(catalogHeaders); - } else { - assertThat(headers).containsAllEntriesOf(contextHeaders); + adaptor = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + RESTCatalogAdapter.HTTPMethod method, + String path, + Map queryParams, + Object body, + Class responseType, + Map headers, + Consumer errorHandler) { + // this doesn't use a Mockito spy because this is used for catalog tests, which have + // different method calls + if (!"v1/oauth/tokens".equals(path)) { + if ("v1/config".equals(path)) { + assertThat(headers).containsAllEntriesOf(catalogHeaders); + } else { + assertThat(headers).containsAllEntriesOf(contextHeaders); + } + } + Object request = roundTripSerialize(body, "request"); + T response = + super.execute( + method, path, queryParams, request, responseType, headers, errorHandler); + T responseAfterSerialization = roundTripSerialize(response, "response"); + return responseAfterSerialization; } - } - Object request = roundTripSerialize(body, "request"); - T response = - super.execute( - method, path, queryParams, request, responseType, headers, errorHandler); - T responseAfterSerialization = roundTripSerialize(response, "response"); - return responseAfterSerialization; - } - }; + }); RESTCatalogServlet servlet = new RESTCatalogServlet(adaptor); ServletContextHandler servletContext = @@ -2003,4 +2008,47 @@ public void multipleDiffsAgainstMultipleTablesLastFails() { assertThat(schema2.findField("new-column")).isNull(); assertThat(schema2.columns()).hasSize(1); } + + public void testScanReportingOnSerializableTable() throws IOException, ClassNotFoundException { + Table table = catalog().buildTable(TABLE, SCHEMA).create(); + table + .newFastAppend() + .appendFile( + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withRecordCount(2) + .build()) + .commit(); + + table = catalog().loadTable(TABLE); + Mockito.reset(adaptor); + + try (CloseableIterable tasks = table.newScan().planFiles()) { + assertThat(tasks.iterator()).hasNext(); + } + + Table serializableTable = + TestHelpers.KryoHelpers.roundTripSerialize(SerializableTable.copyOf(table)); + try (CloseableIterable tasks = serializableTable.newScan().planFiles()) { + assertThat(tasks.iterator()).hasNext(); + } + + serializableTable = TestHelpers.roundTripSerialize(SerializableTable.copyOf(table)); + try (CloseableIterable tasks = serializableTable.newScan().planFiles()) { + assertThat(tasks.iterator()).hasNext(); + } + + // once the table is serialized, it should still be able to send metrics reports via REST + ResourcePaths paths = ResourcePaths.forCatalogProperties(Maps.newHashMap()); + verify(adaptor, times(3)) + .execute( + eq(HTTPMethod.POST), + eq(paths.metrics(TABLE)), + any(), + any(ReportMetricsRequest.class), + any(), + any(), + any()); + } }