diff --git a/aws/src/main/java/org/apache/iceberg/aws/HttpClientProperties.java b/aws/src/main/java/org/apache/iceberg/aws/HttpClientProperties.java
index 2a5ca2ece8e0..986742e48a7e 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/HttpClientProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/HttpClientProperties.java
@@ -24,6 +24,7 @@
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.util.PropertyUtil;
+import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
public class HttpClientProperties implements Serializable {
@@ -167,6 +168,90 @@ public class HttpClientProperties implements Serializable {
public static final String APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED =
"http-client.apache.use-idle-connection-reaper-enabled";
+ /**
+ * If this is set under {@link #CLIENT_TYPE}, {@link
+ * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient} will be used as the HTTP Client
+ */
+ public static final String HTTP_CLIENT_TYPE_NETTYNIO = "nettynio";
+
+ /**
+ * Used to configure connection maximum idle time {@link
+ * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+ * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+ *
+ *
For more details, see
+ * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+ */
+ public static final String NETTYNIO_CONNECTION_MAX_IDLE_TIME_MS =
+ "http-client.nettynio.connection-max-idle-time-ms";
+ /**
+ * Used to configure connection acquisition timeout {@link
+ * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+ * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+ *
+ *
For more details, see
+ * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+ */
+ public static final String NETTYNIO_ACQUISITION_TIMEOUT_MS =
+ "http-client.nettynio.connection-acquisition-timeout-ms";
+ /**
+ * Used to configure connection timeout {@link
+ * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+ * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+ *
+ *
For more details, see
+ * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+ */
+ public static final String NETTYNIO_CONNECTION_TIMEOUT_MS =
+ "http-client.nettynio.connection-timeout-ms";
+ /**
+ * Used to configure connection time to live {@link
+ * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+ * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+ *
+ *
For more details, see
+ * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+ */
+ public static final String NETTYNIO_CONNECTION_TIME_TO_LIVE_MS =
+ "http-client.nettynio.connection-time-to-live-ms";
+ /**
+ * Used to configure maximum concurrency {@link
+ * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+ * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+ *
+ *
For more details, see
+ * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+ */
+ public static final String NETTYNIO_MAX_CONCURRENCY = "http-client.nettynio.max-concurrency";
+ /**
+ * Used to configure read timeout {@link
+ * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+ * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+ *
+ *
For more details, see
+ * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+ */
+ public static final String NETTYNIO_READ_TIMEOUT = "http-client.nettynio.read-timeout";
+ /**
+ * Used to configure maximum pending connection acquires {@link
+ * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+ * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+ *
+ *
For more details, see
+ * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+ */
+ public static final String NETTYNIO_MAX_PENDING_CONNECTION_ACQUIRES =
+ "http-client.nettynio.max-pending-connection-acquires";
+ /**
+ * Used to configure write timeout {@link
+ * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
+ * when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
+ *
+ *
For more details, see
+ * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
+ */
+ public static final String NETTYNIO_WRITE_TIMEOUT = "http-client.nettynio.write-timeout";
+
private String httpClientType;
private final Map httpClientProperties;
@@ -213,6 +298,31 @@ public void applyHttpClientConfigurations(T bui
}
}
+ /**
+ * Configure the httpClient for a client according to the HttpClientType. The supported
+ * HttpClientTypes are nettynio
+ *
+ * Sample usage:
+ *
+ *
+ * S3Client.builder().applyMutation(awsProperties::applyAsyncHttpClientConfigurations)
+ *
+ */
+ public void applyAsyncHttpClientConfigurations(T builder) {
+ if (Strings.isNullOrEmpty(httpClientType)) {
+ httpClientType = HTTP_CLIENT_TYPE_NETTYNIO;
+ }
+ switch (httpClientType) {
+ case HTTP_CLIENT_TYPE_NETTYNIO:
+ NettyNioAsyncHttpClientConfigurations nettyNioAsyncHttpClientConfigurations =
+ loadHttpClientConfigurations(NettyNioAsyncHttpClientConfigurations.class.getName());
+ nettyNioAsyncHttpClientConfigurations.configureHttpClientBuilder(builder);
+ break;
+ default:
+ throw new IllegalArgumentException("Unrecognized HTTP client type " + httpClientType);
+ }
+ }
+
/**
* Dynamically load the http client builder to avoid runtime deps requirements of both {@link
* software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient} and {@link
diff --git a/aws/src/main/java/org/apache/iceberg/aws/NettyNioAsyncHttpClientConfigurations.java b/aws/src/main/java/org/apache/iceberg/aws/NettyNioAsyncHttpClientConfigurations.java
new file mode 100644
index 000000000000..4ab5f4698c23
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/NettyNioAsyncHttpClientConfigurations.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.time.Duration;
+import java.util.Map;
+import org.apache.iceberg.util.PropertyUtil;
+import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+
+class NettyNioAsyncHttpClientConfigurations {
+
+ private Long connectionMaxIdleTime;
+ private Long connectionAcquisitionTimeout;
+ private Long connectionTimeout;
+ private Long connectionTimeToLive;
+ private Integer maxConcurrency;
+ private Long readTimeout;
+ private Integer maxPendingConnectionAcquires;
+ private Long writeTimeout;
+
+ private NettyNioAsyncHttpClientConfigurations() {}
+
+ public void configureHttpClientBuilder(T awsClientBuilder) {
+ NettyNioAsyncHttpClient.Builder nettyNioAsyncHttpClientBuilder =
+ NettyNioAsyncHttpClient.builder();
+ configureNettyNioAsyncHttpClientBuilder(nettyNioAsyncHttpClientBuilder);
+ awsClientBuilder.httpClientBuilder(nettyNioAsyncHttpClientBuilder);
+ }
+
+ private void initialize(Map httpClientProperties) {
+ this.connectionMaxIdleTime =
+ PropertyUtil.propertyAsNullableLong(
+ httpClientProperties, HttpClientProperties.NETTYNIO_CONNECTION_MAX_IDLE_TIME_MS);
+ this.connectionAcquisitionTimeout =
+ PropertyUtil.propertyAsNullableLong(
+ httpClientProperties, HttpClientProperties.NETTYNIO_ACQUISITION_TIMEOUT_MS);
+ this.connectionTimeout =
+ PropertyUtil.propertyAsNullableLong(
+ httpClientProperties, HttpClientProperties.NETTYNIO_CONNECTION_TIMEOUT_MS);
+ this.connectionTimeToLive =
+ PropertyUtil.propertyAsNullableLong(
+ httpClientProperties, HttpClientProperties.NETTYNIO_CONNECTION_TIME_TO_LIVE_MS);
+ this.maxConcurrency =
+ PropertyUtil.propertyAsNullableInt(
+ httpClientProperties, HttpClientProperties.NETTYNIO_MAX_CONCURRENCY);
+ this.readTimeout =
+ PropertyUtil.propertyAsNullableLong(
+ httpClientProperties, HttpClientProperties.NETTYNIO_READ_TIMEOUT);
+ this.maxPendingConnectionAcquires =
+ PropertyUtil.propertyAsNullableInt(
+ httpClientProperties, HttpClientProperties.NETTYNIO_MAX_PENDING_CONNECTION_ACQUIRES);
+ this.writeTimeout =
+ PropertyUtil.propertyAsNullableLong(
+ httpClientProperties, HttpClientProperties.NETTYNIO_WRITE_TIMEOUT);
+ }
+
+ void configureNettyNioAsyncHttpClientBuilder(
+ NettyNioAsyncHttpClient.Builder nettyNioHttpClientBuilder) {
+ if (connectionMaxIdleTime != null) {
+ nettyNioHttpClientBuilder.connectionMaxIdleTime(Duration.ofMillis(connectionMaxIdleTime));
+ }
+ if (connectionAcquisitionTimeout != null) {
+ nettyNioHttpClientBuilder.connectionAcquisitionTimeout(
+ Duration.ofMillis(connectionAcquisitionTimeout));
+ }
+ if (connectionTimeout != null) {
+ nettyNioHttpClientBuilder.connectionTimeout(Duration.ofMillis(connectionTimeout));
+ }
+ if (connectionTimeToLive != null) {
+ nettyNioHttpClientBuilder.connectionTimeToLive(Duration.ofMillis(connectionTimeToLive));
+ }
+ if (maxConcurrency != null) {
+ nettyNioHttpClientBuilder.maxConcurrency(maxConcurrency);
+ }
+ if (readTimeout != null) {
+ nettyNioHttpClientBuilder.readTimeout(Duration.ofMillis(readTimeout));
+ }
+ if (maxPendingConnectionAcquires != null) {
+ nettyNioHttpClientBuilder.maxPendingConnectionAcquires(maxPendingConnectionAcquires);
+ }
+ if (writeTimeout != null) {
+ nettyNioHttpClientBuilder.writeTimeout(Duration.ofMillis(writeTimeout));
+ }
+ }
+
+ public static NettyNioAsyncHttpClientConfigurations create(
+ Map httpClientProperties) {
+ NettyNioAsyncHttpClientConfigurations configurations =
+ new NettyNioAsyncHttpClientConfigurations();
+ configurations.initialize(httpClientProperties);
+ return configurations;
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/SqsMetricsReporterAwsClientFactories.java b/aws/src/main/java/org/apache/iceberg/aws/SqsMetricsReporterAwsClientFactories.java
new file mode 100644
index 000000000000..b18564be513b
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/SqsMetricsReporterAwsClientFactories.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.util.Map;
+import org.apache.iceberg.aws.metrics.DefaultSqsMetricsReporterAwsClientFactory;
+import org.apache.iceberg.aws.metrics.SqsMetricsReporterAwsClientFactory;
+import org.apache.iceberg.aws.metrics.SqsMetricsReporterProperties;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class SqsMetricsReporterAwsClientFactories {
+
+ private SqsMetricsReporterAwsClientFactories() {}
+
+ /**
+ * Attempts to load an AWS client factory class for SQS Metrics Reporter defined in the catalog
+ * property {@link SqsMetricsReporterProperties#METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL}. If the
+ * property wasn't set, fallback to {@link AwsClientFactories#from(Map) to intialize an AWS client
+ * factory class}
+ *
+ * @param properties catalog properties
+ * @return an instance of a factory class
+ */
+ @SuppressWarnings("unchecked")
+ public static T initialize(Map properties) {
+ String factoryImpl =
+ PropertyUtil.propertyAsString(
+ properties,
+ SqsMetricsReporterProperties.METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL,
+ DefaultSqsMetricsReporterAwsClientFactory.class.getName());
+ return (T) loadClientFactory(factoryImpl, properties);
+ }
+
+ private static SqsMetricsReporterAwsClientFactory loadClientFactory(
+ String impl, Map properties) {
+ DynConstructors.Ctor ctor;
+ try {
+ ctor =
+ DynConstructors.builder(SqsMetricsReporterAwsClientFactory.class)
+ .loader(SqsMetricsReporterAwsClientFactories.class.getClassLoader())
+ .hiddenImpl(impl)
+ .buildChecked();
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot initialize SqsMetricsReporterAwsClientFactory, missing no-arg constructor: %s",
+ impl),
+ e);
+ }
+
+ SqsMetricsReporterAwsClientFactory factory;
+ try {
+ factory = ctor.newInstance();
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot initialize SqsMetricsReporterAwsClientFactory, %s does not implement SqsMetricsReporterAwsClientFactory.",
+ impl),
+ e);
+ }
+
+ factory.initialize(properties);
+ return factory;
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/metrics/DefaultSqsMetricsReporterAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/metrics/DefaultSqsMetricsReporterAwsClientFactory.java
new file mode 100644
index 000000000000..91143b65d4b7
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/metrics/DefaultSqsMetricsReporterAwsClientFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.aws.AwsClientProperties;
+import org.apache.iceberg.aws.HttpClientProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+
+public class DefaultSqsMetricsReporterAwsClientFactory
+ implements SqsMetricsReporterAwsClientFactory {
+
+ private SqsMetricsReporterProperties sqsMetricsReporterProperties;
+ private HttpClientProperties httpClientProperties;
+ private AwsClientProperties awsClientProperties;
+
+ DefaultSqsMetricsReporterAwsClientFactory() {
+ this.sqsMetricsReporterProperties = new SqsMetricsReporterProperties();
+ this.httpClientProperties = new HttpClientProperties();
+ this.awsClientProperties = new AwsClientProperties();
+ }
+
+ @Override
+ public void initialize(Map properties) {
+ Map catalogProperties = Maps.newHashMap(properties);
+ catalogProperties.put(
+ HttpClientProperties.CLIENT_TYPE, HttpClientProperties.HTTP_CLIENT_TYPE_NETTYNIO);
+ this.sqsMetricsReporterProperties = new SqsMetricsReporterProperties(catalogProperties);
+ this.awsClientProperties = new AwsClientProperties(catalogProperties);
+ this.httpClientProperties = new HttpClientProperties(catalogProperties);
+ }
+
+ @Override
+ public SqsAsyncClient sqs() {
+ return SqsAsyncClient.builder()
+ .applyMutation(awsClientProperties::applyClientRegionConfiguration)
+ .applyMutation(httpClientProperties::applyAsyncHttpClientConfigurations)
+ .build();
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java b/aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
new file mode 100644
index 000000000000..b9946c02ab20
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.iceberg.aws.SqsMetricsReporterAwsClientFactories;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.ScanReportParser;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
+
+/** An implementation of {@link MetricsReporter} which reports {@link MetricsReport} to SQS */
+public class SqsMetricsReporter implements MetricsReporter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SqsMetricsReporter.class);
+
+ private SerializableSupplier sqs;
+
+ private transient volatile SqsAsyncClient client;
+
+ private String sqsQueueUrl;
+
+ public SqsMetricsReporter() {}
+
+ public SqsMetricsReporter(SerializableSupplier sqs, String sqsQueueUrl) {
+ this.sqs = sqs;
+ this.client = sqs.get();
+ this.sqsQueueUrl = sqsQueueUrl;
+ }
+
+ @Override
+ public void initialize(Map properties) {
+ SqsMetricsReporterProperties sqsMetricsReporterProperties =
+ new SqsMetricsReporterProperties(properties);
+ Object clientFactory = SqsMetricsReporterAwsClientFactories.initialize(properties);
+ if (clientFactory instanceof SqsMetricsReporterAwsClientFactory) {
+ this.sqs = ((SqsMetricsReporterAwsClientFactory) clientFactory)::sqs;
+ }
+ this.client = sqs.get();
+ this.sqsQueueUrl = sqsMetricsReporterProperties.sqsQueueUrl();
+ }
+
+ @Override
+ public void report(MetricsReport report) {
+ if (null == report) {
+ LOG.warn("Received invalid metrics report: null");
+ return;
+ }
+
+ try {
+ String message = null;
+ if (report instanceof CommitReport) {
+ message = CommitReportParser.toJson((CommitReport) report);
+ } else if (report instanceof ScanReport) {
+ message = ScanReportParser.toJson((ScanReport) report);
+ }
+
+ if (null == message) {
+ LOG.warn("Received unknown MetricsReport type");
+ return;
+ }
+
+ CompletableFuture future =
+ client.sendMessage(
+ SendMessageRequest.builder().messageBody(message).queueUrl(sqsQueueUrl).build());
+ future.whenComplete(
+ (response, error) -> {
+ if (response != null) {
+ LOG.info("Metrics {} reported to: {}", response, sqsQueueUrl);
+ } else {
+ if (error != null) {
+ LOG.error("Failed to report metrics to SQS queue: {}", error.getMessage());
+ }
+ }
+ });
+ } catch (Exception e) {
+ LOG.warn("Failed to report metrics to SQS queue {}", sqsQueueUrl, e);
+ }
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterAwsClientFactory.java
new file mode 100644
index 000000000000..07008a629cc1
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterAwsClientFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.io.Serializable;
+import java.util.Map;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+
+public interface SqsMetricsReporterAwsClientFactory extends Serializable {
+ /**
+ * create a Amazon SQS Async client
+ *
+ * @return SQS Async client
+ */
+ SqsAsyncClient sqs();
+ /**
+ * Initialize AWS client factory from catalog properties.
+ *
+ * @param properties catalog properties
+ */
+ void initialize(Map properties);
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterProperties.java b/aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterProperties.java
new file mode 100644
index 000000000000..4dac83783f1f
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterProperties.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Class to store the properties used by {@link SqsMetricsReporter} */
+public class SqsMetricsReporterProperties {
+
+ /**
+ * This property is used to pass in the aws client factory implementation class for SQS Metrics
+ * Reporter. The class should implement {@link SqsMetricsReporterAwsClientFactory}. For example,
+ * {@link DefaultSqsMetricsReporterAwsClientFactory} implements {@link
+ * SqsMetricsReporterAwsClientFactory}. If this property wasn't set, will load one of {@link
+ * org.apache.iceberg.aws.AwsClientFactory} factory classes to provide backward compatibility.
+ */
+ public static final String METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL =
+ "metrics-reporter.sqs.client-factory-impl";
+
+ public static final String METRICS_REPORTER_SQS_QUEUE_URL = "metrics-reporter.sqs.queue-url";
+
+ private String sqsQueueUrl;
+
+ public SqsMetricsReporterProperties() {
+ this.sqsQueueUrl = null;
+ }
+
+ public SqsMetricsReporterProperties(Map properties) {
+ this.sqsQueueUrl = properties.get(METRICS_REPORTER_SQS_QUEUE_URL);
+ Preconditions.checkArgument(
+ null != sqsQueueUrl, "%s should be be set", METRICS_REPORTER_SQS_QUEUE_URL);
+ }
+
+ public String sqsQueueUrl() {
+ return sqsQueueUrl;
+ }
+
+ public void setSqsQueueUrl(String sqsQueueUrl) {
+ this.sqsQueueUrl = sqsQueueUrl;
+ }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterTest.java b/aws/src/test/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterTest.java
new file mode 100644
index 000000000000..d0bd2c3cefec
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/metrics/SqsMetricsReporterTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.metrics;
+
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.aws.HttpClientProperties;
+import org.apache.iceberg.metrics.CommitMetrics;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableCommitReport;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class SqsMetricsReporterTest {
+
+ public SerializableSupplier sqs;
+
+ private SqsMetricsReporter sqsMetricsReporter;
+
+ private CommitReport commitReport;
+
+ @BeforeAll
+ public void before() {
+ Map properties = Maps.newHashMap();
+ properties.put("metrics-reporter.sqs.queue-url", "test-sqs-queue");
+ properties.put(
+ HttpClientProperties.CLIENT_TYPE, HttpClientProperties.HTTP_CLIENT_TYPE_NETTYNIO);
+ sqs = () -> mock(SqsAsyncClient.class);
+ sqsMetricsReporter = new SqsMetricsReporter(sqs, "test-queue-url");
+ commitReport =
+ ImmutableCommitReport.builder()
+ .tableName("tableName")
+ .snapshotId(123L)
+ .operation(DataOperations.APPEND)
+ .sequenceNumber(123L)
+ .commitMetrics(
+ CommitMetricsResult.from(
+ CommitMetrics.of(new DefaultMetricsContext()), Maps.newHashMap()))
+ .build();
+ }
+
+ @Test
+ public void report() {
+ sqsMetricsReporter.report(commitReport);
+ when(sqs.get().sendMessage(isA(SendMessageRequest.class)))
+ .thenReturn(
+ CompletableFuture.completedFuture(
+ SendMessageResponse.builder().messageId("test-id").build()));
+ }
+}
diff --git a/build.gradle b/build.gradle
index a8951c1dfe9f..73546c185853 100644
--- a/build.gradle
+++ b/build.gradle
@@ -473,6 +473,7 @@ project(':iceberg-aws') {
compileOnly 'software.amazon.awssdk:url-connection-client'
compileOnly 'software.amazon.awssdk:apache-client'
+ compileOnly 'software.amazon.awssdk:netty-nio-client'
compileOnly 'software.amazon.awssdk:auth'
compileOnly 'software.amazon.awssdk:s3'
compileOnly 'software.amazon.awssdk:kms'
@@ -480,6 +481,7 @@ project(':iceberg-aws') {
compileOnly 'software.amazon.awssdk:sts'
compileOnly 'software.amazon.awssdk:dynamodb'
compileOnly 'software.amazon.awssdk:lakeformation'
+ compileOnly 'software.amazon.awssdk:sqs'
compileOnly("org.apache.hadoop:hadoop-common") {
exclude group: 'org.apache.avro', module: 'avro'