diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTMetricsReporter.java b/core/src/main/java/org/apache/iceberg/rest/RESTMetricsReporter.java new file mode 100644 index 000000000000..4ceb86f32c4a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RESTMetricsReporter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; +import org.apache.iceberg.rest.requests.ReportMetricsRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class RESTMetricsReporter implements MetricsReporter { + private static final Logger LOG = LoggerFactory.getLogger(RESTMetricsReporter.class); + + private final RESTClient client; + private final String metricsEndpoint; + private final Supplier> headers; + + RESTMetricsReporter( + RESTClient client, String metricsEndpoint, Supplier> headers) { + this.client = client; + this.metricsEndpoint = metricsEndpoint; + this.headers = headers; + } + + @Override + public void report(MetricsReport report) { + if (null == report) { + LOG.warn("Received invalid metrics report: null"); + return; + } + + try { + client.post( + metricsEndpoint, + ReportMetricsRequest.of(report), + null, + headers, + ErrorHandlers.defaultErrorHandler()); + } catch (Exception e) { + LOG.warn("Failed to report metrics to REST endpoint {}", metricsEndpoint, e); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index bd36ad154c5a..e819bcd19fa0 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -57,8 +57,8 @@ import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.ResolvingFileIO; -import org.apache.iceberg.metrics.MetricsReport; import org.apache.iceberg.metrics.MetricsReporter; +import org.apache.iceberg.metrics.MetricsReporters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -68,7 +68,6 @@ import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; -import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.CreateNamespaceResponse; @@ -347,12 +346,11 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { tableFileIO(context, response.config()), tableMetadata); - TableIdentifier tableIdentifier = loadedIdent; BaseTable table = new BaseTable( ops, fullTableName(loadedIdent), - report -> reportMetrics(tableIdentifier, report, session::headers)); + metricsReporter(paths.metrics(loadedIdent), session::headers)); if (metadataType != null) { return MetadataTableUtils.createMetadataTableInstance(table, metadataType); } @@ -360,22 +358,14 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { return table; } - private void reportMetrics( - TableIdentifier tableIdentifier, - MetricsReport report, - Supplier> headers) { - try { - reporter.report(report); - if (reportingViaRestEnabled) { - client.post( - paths.metrics(tableIdentifier), - ReportMetricsRequest.of(report), - null, - headers, - ErrorHandlers.defaultErrorHandler()); - } - } catch (Exception e) { - LOG.warn("Failed to report metrics to REST endpoint for table {}", tableIdentifier, e); + private MetricsReporter metricsReporter( + String metricsEndpoint, Supplier> headers) { + if (reportingViaRestEnabled) { + RESTMetricsReporter restMetricsReporter = + new RESTMetricsReporter(client, metricsEndpoint, headers); + return MetricsReporters.combine(reporter, restMetricsReporter); + } else { + return this.reporter; } } @@ -603,7 +593,7 @@ public Table create() { response.tableMetadata()); return new BaseTable( - ops, fullTableName(ident), report -> reportMetrics(ident, report, session::headers)); + ops, fullTableName(ident), metricsReporter(paths.metrics(ident), session::headers)); } @Override @@ -625,7 +615,7 @@ public Transaction createTransaction() { meta); return Transactions.createTableTransaction( - fullName, ops, meta, report -> reportMetrics(ident, report, session::headers)); + fullName, ops, meta, metricsReporter(paths.metrics(ident), session::headers)); } @Override @@ -676,7 +666,7 @@ public Transaction replaceTransaction() { base); return Transactions.replaceTableTransaction( - fullName, ops, replacement, report -> reportMetrics(ident, report, session::headers)); + fullName, ops, replacement, metricsReporter(paths.metrics(ident), session::headers)); } @Override