Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,28 @@ acceptedBreaks:
new: "parameter void org.apache.iceberg.actions.BaseCommitService<T>::offer(===T===)\
\ @ org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService"
justification: "Backwards compatible parameterization of argument"
- code: "java.method.parameterTypeChanged"
old: "parameter void org.apache.iceberg.rest.RESTCatalog::<init>(===java.util.function.Function<java.util.Map<java.lang.String,\
\ java.lang.String>, org.apache.iceberg.rest.RESTClient>===)"
new: "parameter void org.apache.iceberg.rest.RESTCatalog::<init>(===org.apache.iceberg.util.SerializableFunction<java.util.Map<java.lang.String,\
\ java.lang.String>, org.apache.iceberg.rest.RESTClient>===)"
justification: "Switching to SerializableFunction"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we support both? I don't think we can make this change if it is a breaking one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can support both because there's no place where we could just cast from Function to SerializableFunction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue if we don't want to break the API with the Function -> SerializableFunction change, then the other alternative would be to make HTTPClient serializable. I've opened #8032 to do that

- code: "java.method.parameterTypeChanged"
old: "parameter void org.apache.iceberg.rest.RESTCatalog::<init>(org.apache.iceberg.catalog.SessionCatalog.SessionContext,\
\ ===java.util.function.Function<java.util.Map<java.lang.String, java.lang.String>,\
\ org.apache.iceberg.rest.RESTClient>===)"
new: "parameter void org.apache.iceberg.rest.RESTCatalog::<init>(org.apache.iceberg.catalog.SessionCatalog.SessionContext,\
\ ===org.apache.iceberg.util.SerializableFunction<java.util.Map<java.lang.String,\
\ java.lang.String>, org.apache.iceberg.rest.RESTClient>===)"
justification: "Switching to SerializableFunction"
- code: "java.method.parameterTypeChanged"
old: "parameter void org.apache.iceberg.rest.RESTSessionCatalog::<init>(===java.util.function.Function<java.util.Map<java.lang.String,\
\ java.lang.String>, org.apache.iceberg.rest.RESTClient>===, java.util.function.BiFunction<org.apache.iceberg.catalog.SessionCatalog.SessionContext,\
\ java.util.Map<java.lang.String, java.lang.String>, org.apache.iceberg.io.FileIO>)"
new: "parameter void org.apache.iceberg.rest.RESTSessionCatalog::<init>(===org.apache.iceberg.util.SerializableFunction<java.util.Map<java.lang.String,\
\ java.lang.String>, org.apache.iceberg.rest.RESTClient>===, java.util.function.BiFunction<org.apache.iceberg.catalog.SessionCatalog.SessionContext,\
\ java.util.Map<java.lang.String, java.lang.String>, org.apache.iceberg.io.FileIO>)"
justification: "Switching to SerializableFunction"
- code: "java.method.removed"
old: "method ThisT org.apache.iceberg.BaseScan<ThisT, T extends org.apache.iceberg.ScanTask,\
\ G extends org.apache.iceberg.ScanTaskGroup<T extends org.apache.iceberg.ScanTask>>::newRefinedScan(org.apache.iceberg.TableOperations,\
Expand Down
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.metrics.MetricsReporter;

/** Represents a table. */
public interface Table {
Expand Down Expand Up @@ -346,4 +347,13 @@ default Snapshot snapshot(String name) {

return null;
}

/**
* Returns the metrics reporter for this table.
*
* @return the metrics reporter for this table.
*/
default MetricsReporter metricsReporter() {
throw new UnsupportedOperationException("Accessing metrics reporter is not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
*/
package org.apache.iceberg.metrics;

import java.io.Serializable;
import java.util.Map;

/** This interface defines the basic API for reporting metrics for operations to a Table. */
@FunctionalInterface
public interface MetricsReporter {
public interface MetricsReporter extends Serializable {

/**
* A custom MetricsReporter implementation must have a no-arg constructor, which will be called
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -204,6 +205,11 @@ public String toString() {
return name();
}

@Override
public MetricsReporter metricsReporter() {
return table().metricsReporter();
}

final Object writeReplace() {
return SerializableTable.copyOf(this);
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,9 @@ public String toString() {
Object writeReplace() {
return SerializableTable.copyOf(this);
}

@Override
public MetricsReporter metricsReporter() {
return reporter;
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,11 @@ public Map<String, SnapshotRef> refs() {
return current.refs();
}

@Override
public MetricsReporter metricsReporter() {
return BaseTransaction.this.reporter;
}

@Override
public String toString() {
return name();
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class SerializableTable implements Table, Serializable {
private final EncryptionManager encryption;
private final LocationProvider locationProvider;
private final Map<String, SnapshotRef> refs;
private final MetricsReporter metricsReporter;

private transient volatile Table lazyTable = null;
private transient volatile Schema lazySchema = null;
Expand All @@ -83,6 +85,7 @@ protected SerializableTable(Table table) {
this.encryption = table.encryption();
this.locationProvider = table.locationProvider();
this.refs = SerializableMap.copyOf(table.refs());
this.metricsReporter = table.metricsReporter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we expose this through BaseTable instead of the Table interface? I'm not sure that we want this in the Table interface and it would be best to avoid it.

}

/**
Expand Down Expand Up @@ -136,7 +139,7 @@ private Table lazyTable() {
}

protected Table newTable(TableOperations ops, String tableName) {
return new BaseTable(ops, tableName);
return new BaseTable(ops, tableName, metricsReporter());
}

@Override
Expand Down Expand Up @@ -247,6 +250,11 @@ public Map<String, SnapshotRef> refs() {
return refs;
}

@Override
public MetricsReporter metricsReporter() {
return metricsReporter;
}

@Override
public void refresh() {
throw new UnsupportedOperationException(errorMsg("refresh"));
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -131,6 +132,9 @@ private FileIO io(String location) {
String impl = implFromLocation(location);
FileIO io = ioInstances.get(impl);
if (io != null) {
if (io instanceof HadoopFileIO && ((HadoopFileIO) io).conf() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to change IO classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, forgot to mention why I added this. It appears that the underlying conf of HadoopFileIO wasn't initialized when calling StaticTableOperations.current():

hadoop_io_conf_missing

((HadoopFileIO) io).setConf(hadoopConf.get());
}
return io;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
*/
package org.apache.iceberg.metrics;

import java.util.Collections;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.SerializableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -52,14 +52,14 @@ public static MetricsReporter combine(MetricsReporter first, MetricsReporter sec
reporters.add(second);
}

return new CompositeMetricsReporter(reporters);
return new CompositeMetricsReporter(SerializableSet.copyOf(reporters));
}

@VisibleForTesting
static class CompositeMetricsReporter implements MetricsReporter {
private final Set<MetricsReporter> reporters;
private final SerializableSet<MetricsReporter> reporters;

private CompositeMetricsReporter(Set<MetricsReporter> reporters) {
private CompositeMetricsReporter(SerializableSet<MetricsReporter> reporters) {
this.reporters = reporters;
}

Expand All @@ -79,7 +79,7 @@ public void report(MetricsReport report) {
}

Set<MetricsReporter> reporters() {
return Collections.unmodifiableSet(reporters);
return reporters.immutableSet();
}
}
}
6 changes: 3 additions & 3 deletions core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -40,6 +39,7 @@
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.util.SerializableFunction;

public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Object>, Closeable {
private final RESTSessionCatalog sessionCatalog;
Expand All @@ -53,13 +53,13 @@ public RESTCatalog() {
config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
}

public RESTCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
public RESTCatalog(SerializableFunction<Map<String, String>, RESTClient> clientBuilder) {
this(SessionCatalog.SessionContext.createEmpty(), clientBuilder);
}

public RESTCatalog(
SessionCatalog.SessionContext context,
Function<Map<String, String>, RESTClient> clientBuilder) {
SerializableFunction<Map<String, String>, RESTClient> clientBuilder) {
this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null);
this.delegate = sessionCatalog.asCatalog(context);
this.nsDelegate = (SupportsNamespaces) delegate;
Expand Down
52 changes: 42 additions & 10 deletions core/src/main/java/org/apache/iceberg/rest/RESTMetricsReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,35 @@
package org.apache.iceberg.rest;

import java.util.Map;
import java.util.function.Supplier;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.util.SerializableFunction;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RESTMetricsReporter implements MetricsReporter {
private static final Logger LOG = LoggerFactory.getLogger(RESTMetricsReporter.class);

private final RESTClient client;
private transient volatile RESTClient client;
private final String metricsEndpoint;
private final Supplier<Map<String, String>> headers;
private final SerializableSupplier<Map<String, String>> headers;
private final SerializableFunction<Map<String, String>, RESTClient> clientBuilder;
private final SerializableMap<String, String> properties;

RESTMetricsReporter(
RESTClient client, String metricsEndpoint, Supplier<Map<String, String>> headers) {
RESTClient client,
String metricsEndpoint,
SerializableSupplier<Map<String, String>> headers,
SerializableFunction<Map<String, String>, RESTClient> clientBuilder,
SerializableMap<String, String> properties) {
this.client = client;
this.metricsEndpoint = metricsEndpoint;
this.headers = headers;
this.clientBuilder = clientBuilder;
this.properties = properties;
}

@Override
Expand All @@ -48,14 +58,36 @@ public void report(MetricsReport report) {
}

try {
client.post(
metricsEndpoint,
ReportMetricsRequest.of(report),
null,
headers,
ErrorHandlers.defaultErrorHandler());
client()
.post(
metricsEndpoint,
ReportMetricsRequest.of(report),
null,
headers,
ErrorHandlers.defaultErrorHandler());
} catch (Exception e) {
LOG.warn("Failed to report metrics to REST endpoint {}", metricsEndpoint, e);
}
}

private RESTClient client() {
// lazy init the client in case RESTMetricsReporter was deserialized
if (null == client) {
synchronized (this) {
if (null == client) {
client = clientBuilder.apply(properties);
}
}
}

return client;
}

Object writeReplace() {
// fetch the latest headers from the AuthSession and carry them over in a separate supplier so
// that AuthSession doesn't have to be Serializable
Map<String, String> authHeaders = headers.get();
return new RESTMetricsReporter(
client, metricsEndpoint, () -> authHeaders, clientBuilder, properties);
}
}
20 changes: 14 additions & 6 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
Expand Down Expand Up @@ -84,6 +83,9 @@
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.util.EnvironmentUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SerializableFunction;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -102,7 +104,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
OAuth2Properties.SAML2_TOKEN_TYPE,
OAuth2Properties.SAML1_TOKEN_TYPE);

private final Function<Map<String, String>, RESTClient> clientBuilder;
private final SerializableFunction<Map<String, String>, RESTClient> clientBuilder;
private final BiFunction<SessionContext, Map<String, String>, FileIO> ioBuilder;
private Cache<String, AuthSession> sessions = null;
private Cache<TableOperations, FileIO> fileIOCloser;
Expand Down Expand Up @@ -134,7 +136,7 @@ public RESTSessionCatalog() {
}

public RESTSessionCatalog(
Function<Map<String, String>, RESTClient> clientBuilder,
SerializableFunction<Map<String, String>, RESTClient> clientBuilder,
BiFunction<SessionContext, Map<String, String>, FileIO> ioBuilder) {
Preconditions.checkNotNull(clientBuilder, "Invalid client builder: null");
this.clientBuilder = clientBuilder;
Expand Down Expand Up @@ -382,11 +384,17 @@ private void trackFileIO(RESTTableOperations ops) {
}

private MetricsReporter metricsReporter(
String metricsEndpoint, Supplier<Map<String, String>> headers) {
String metricsEndpoint, SerializableSupplier<Map<String, String>> headers) {
if (reportingViaRestEnabled) {
RESTMetricsReporter restMetricsReporter =
new RESTMetricsReporter(client, metricsEndpoint, headers);
return MetricsReporters.combine(reporter, restMetricsReporter);
new RESTMetricsReporter(
client,
metricsEndpoint,
headers,
clientBuilder,
SerializableMap.copyOf(super.properties()));

return MetricsReporters.combine(this.reporter, restMetricsReporter);
} else {
return this.reporter;
}
Expand Down
Loading