Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/HttpClientProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.util.PropertyUtil;
import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;

public class HttpClientProperties implements Serializable {
Expand Down Expand Up @@ -167,6 +168,90 @@ public class HttpClientProperties implements Serializable {
public static final String APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED =
"http-client.apache.use-idle-connection-reaper-enabled";

/**
* If this is set under {@link #CLIENT_TYPE}, {@link
* software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient} will be used as the HTTP Client
*/
public static final String HTTP_CLIENT_TYPE_NETTYNIO = "nettynio";

/**
* Used to configure connection maximum idle time {@link
* software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
* when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
*
* <p>For more details, see
* https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
*/
public static final String NETTYNIO_CONNECTION_MAX_IDLE_TIME_MS =
"http-client.nettynio.connection-max-idle-time-ms";
/**
* Used to configure connection acquisition timeout {@link
* software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
* when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
*
* <p>For more details, see
* https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
*/
public static final String NETTYNIO_ACQUISITION_TIMEOUT_MS =
"http-client.nettynio.connection-acquisition-timeout-ms";
/**
* Used to configure connection timeout {@link
* software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
* when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
*
* <p>For more details, see
* https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
*/
public static final String NETTYNIO_CONNECTION_TIMEOUT_MS =
"http-client.nettynio.connection-timeout-ms";
/**
* Used to configure connection time to live {@link
* software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
* when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
*
* <p>For more details, see
* https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
*/
public static final String NETTYNIO_CONNECTION_TIME_TO_LIVE_MS =
"http-client.nettynio.connection-time-to-live-ms";
/**
* Used to configure maximum concurrency {@link
* software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
* when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
*
* <p>For more details, see
* https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
*/
public static final String NETTYNIO_MAX_CONCURRENCY = "http-client.nettynio.max-concurrency";
/**
* Used to configure read timeout {@link
* software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
* when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
*
* <p>For more details, see
* https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
*/
public static final String NETTYNIO_READ_TIMEOUT = "http-client.nettynio.read-timeout";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you add time units to this, like -ms?

/**
* Used to configure maximum pending connection acquires {@link
* software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
* when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
*
* <p>For more details, see
* https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
*/
public static final String NETTYNIO_MAX_PENDING_CONNECTION_ACQUIRES =
"http-client.nettynio.max-pending-connection-acquires";
/**
* Used to configure write timeout {@link
* software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works
* when {@link #CLIENT_TYPE} is set to {@link #HTTP_CLIENT_TYPE_NETTYNIO}.
*
* <p>For more details, see
* https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html
*/
public static final String NETTYNIO_WRITE_TIMEOUT = "http-client.nettynio.write-timeout";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you add time units to this, like -ms?


private String httpClientType;
private final Map<String, String> httpClientProperties;

Expand Down Expand Up @@ -213,6 +298,31 @@ public <T extends AwsSyncClientBuilder> void applyHttpClientConfigurations(T bui
}
}

/**
* Configure the httpClient for a client according to the HttpClientType. The supported
* HttpClientTypes are nettynio
*
* <p>Sample usage:
*
* <pre>
* S3Client.builder().applyMutation(awsProperties::applyAsyncHttpClientConfigurations)
* </pre>
*/
public <T extends AwsAsyncClientBuilder> void applyAsyncHttpClientConfigurations(T builder) {
if (Strings.isNullOrEmpty(httpClientType)) {
httpClientType = HTTP_CLIENT_TYPE_NETTYNIO;
}
switch (httpClientType) {
case HTTP_CLIENT_TYPE_NETTYNIO:
NettyNioAsyncHttpClientConfigurations nettyNioAsyncHttpClientConfigurations =
loadHttpClientConfigurations(NettyNioAsyncHttpClientConfigurations.class.getName());
nettyNioAsyncHttpClientConfigurations.configureHttpClientBuilder(builder);
break;
default:
throw new IllegalArgumentException("Unrecognized HTTP client type " + httpClientType);
}
}

/**
* Dynamically load the http client builder to avoid runtime deps requirements of both {@link
* software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient} and {@link
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws;

import java.time.Duration;
import java.util.Map;
import org.apache.iceberg.util.PropertyUtil;
import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;

class NettyNioAsyncHttpClientConfigurations {

private Long connectionMaxIdleTime;
private Long connectionAcquisitionTimeout;
private Long connectionTimeout;
private Long connectionTimeToLive;
private Integer maxConcurrency;
private Long readTimeout;
private Integer maxPendingConnectionAcquires;
private Long writeTimeout;

private NettyNioAsyncHttpClientConfigurations() {}

public <T extends AwsAsyncClientBuilder> void configureHttpClientBuilder(T awsClientBuilder) {
NettyNioAsyncHttpClient.Builder nettyNioAsyncHttpClientBuilder =
NettyNioAsyncHttpClient.builder();
configureNettyNioAsyncHttpClientBuilder(nettyNioAsyncHttpClientBuilder);
awsClientBuilder.httpClientBuilder(nettyNioAsyncHttpClientBuilder);
}

private void initialize(Map<String, String> httpClientProperties) {
this.connectionMaxIdleTime =
PropertyUtil.propertyAsNullableLong(
httpClientProperties, HttpClientProperties.NETTYNIO_CONNECTION_MAX_IDLE_TIME_MS);
this.connectionAcquisitionTimeout =
PropertyUtil.propertyAsNullableLong(
httpClientProperties, HttpClientProperties.NETTYNIO_ACQUISITION_TIMEOUT_MS);
this.connectionTimeout =
PropertyUtil.propertyAsNullableLong(
httpClientProperties, HttpClientProperties.NETTYNIO_CONNECTION_TIMEOUT_MS);
this.connectionTimeToLive =
PropertyUtil.propertyAsNullableLong(
httpClientProperties, HttpClientProperties.NETTYNIO_CONNECTION_TIME_TO_LIVE_MS);
this.maxConcurrency =
PropertyUtil.propertyAsNullableInt(
httpClientProperties, HttpClientProperties.NETTYNIO_MAX_CONCURRENCY);
this.readTimeout =
PropertyUtil.propertyAsNullableLong(
httpClientProperties, HttpClientProperties.NETTYNIO_READ_TIMEOUT);
this.maxPendingConnectionAcquires =
PropertyUtil.propertyAsNullableInt(
httpClientProperties, HttpClientProperties.NETTYNIO_MAX_PENDING_CONNECTION_ACQUIRES);
this.writeTimeout =
PropertyUtil.propertyAsNullableLong(
httpClientProperties, HttpClientProperties.NETTYNIO_WRITE_TIMEOUT);
}

void configureNettyNioAsyncHttpClientBuilder(
NettyNioAsyncHttpClient.Builder nettyNioHttpClientBuilder) {
if (connectionMaxIdleTime != null) {
nettyNioHttpClientBuilder.connectionMaxIdleTime(Duration.ofMillis(connectionMaxIdleTime));
}
if (connectionAcquisitionTimeout != null) {
nettyNioHttpClientBuilder.connectionAcquisitionTimeout(
Duration.ofMillis(connectionAcquisitionTimeout));
}
if (connectionTimeout != null) {
nettyNioHttpClientBuilder.connectionTimeout(Duration.ofMillis(connectionTimeout));
}
if (connectionTimeToLive != null) {
nettyNioHttpClientBuilder.connectionTimeToLive(Duration.ofMillis(connectionTimeToLive));
}
if (maxConcurrency != null) {
nettyNioHttpClientBuilder.maxConcurrency(maxConcurrency);
}
if (readTimeout != null) {
nettyNioHttpClientBuilder.readTimeout(Duration.ofMillis(readTimeout));
}
if (maxPendingConnectionAcquires != null) {
nettyNioHttpClientBuilder.maxPendingConnectionAcquires(maxPendingConnectionAcquires);
}
if (writeTimeout != null) {
nettyNioHttpClientBuilder.writeTimeout(Duration.ofMillis(writeTimeout));
}
}

public static NettyNioAsyncHttpClientConfigurations create(
Map<String, String> httpClientProperties) {
NettyNioAsyncHttpClientConfigurations configurations =
new NettyNioAsyncHttpClientConfigurations();
configurations.initialize(httpClientProperties);
return configurations;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws;

import java.util.Map;
import org.apache.iceberg.aws.metrics.DefaultSqsMetricsReporterAwsClientFactory;
import org.apache.iceberg.aws.metrics.SqsMetricsReporterAwsClientFactory;
import org.apache.iceberg.aws.metrics.SqsMetricsReporterProperties;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.util.PropertyUtil;

public class SqsMetricsReporterAwsClientFactories {

private SqsMetricsReporterAwsClientFactories() {}

/**
* Attempts to load an AWS client factory class for SQS Metrics Reporter defined in the catalog
* property {@link SqsMetricsReporterProperties#METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL}. If the
* property wasn't set, fallback to {@link AwsClientFactories#from(Map) to intialize an AWS client
* factory class}
*
* @param properties catalog properties
* @return an instance of a factory class
*/
@SuppressWarnings("unchecked")
public static <T> T initialize(Map<String, String> properties) {
String factoryImpl =
PropertyUtil.propertyAsString(
properties,
SqsMetricsReporterProperties.METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL,
DefaultSqsMetricsReporterAwsClientFactory.class.getName());
Copy link
Contributor

@jackye1995 jackye1995 Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to the old client factory, we should be able to make DefaultSqsMetricsReporterAwsClientFactory a singleton, and directly return that if METRICS_REPORTER_SQS_CLIENT_FACTORY_IMPL is not set.

return (T) loadClientFactory(factoryImpl, properties);
}

private static SqsMetricsReporterAwsClientFactory loadClientFactory(
String impl, Map<String, String> properties) {
DynConstructors.Ctor<SqsMetricsReporterAwsClientFactory> ctor;
try {
ctor =
DynConstructors.builder(SqsMetricsReporterAwsClientFactory.class)
.loader(SqsMetricsReporterAwsClientFactories.class.getClassLoader())
.hiddenImpl(impl)
.buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(
String.format(
"Cannot initialize SqsMetricsReporterAwsClientFactory, missing no-arg constructor: %s",
impl),
e);
}

SqsMetricsReporterAwsClientFactory factory;
try {
factory = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format(
"Cannot initialize SqsMetricsReporterAwsClientFactory, %s does not implement SqsMetricsReporterAwsClientFactory.",
impl),
e);
}

factory.initialize(properties);
return factory;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws.metrics;

import java.util.Map;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.HttpClientProperties;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

public class DefaultSqsMetricsReporterAwsClientFactory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be an inner package protected class in the factories class.

implements SqsMetricsReporterAwsClientFactory {

private SqsMetricsReporterProperties sqsMetricsReporterProperties;
private HttpClientProperties httpClientProperties;
private AwsClientProperties awsClientProperties;

DefaultSqsMetricsReporterAwsClientFactory() {
this.sqsMetricsReporterProperties = new SqsMetricsReporterProperties();
this.httpClientProperties = new HttpClientProperties();
this.awsClientProperties = new AwsClientProperties();
}

@Override
public void initialize(Map<String, String> properties) {
Map<String, String> catalogProperties = Maps.newHashMap(properties);
catalogProperties.put(
HttpClientProperties.CLIENT_TYPE, HttpClientProperties.HTTP_CLIENT_TYPE_NETTYNIO);
this.sqsMetricsReporterProperties = new SqsMetricsReporterProperties(catalogProperties);
this.awsClientProperties = new AwsClientProperties(catalogProperties);
this.httpClientProperties = new HttpClientProperties(catalogProperties);
}

@Override
public SqsAsyncClient sqs() {
return SqsAsyncClient.builder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(httpClientProperties::applyAsyncHttpClientConfigurations)
.build();
}
}
Loading