From 25b90364303ba7b478334e6ce9ff44b8b2ec3334 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 29 Jan 2024 15:13:42 +0100 Subject: [PATCH] Start spike on fluent ConnectionFactory configuration API The ConnectionFactory class has dozens of parameters. It can be overwhelming to configure, especially for tricky topics like TLS where the parameters are among the other dozens, without clear way to find them. This commit introduces an API to simplify the configuration of ConnectionFactory. It is fluent, uses modern API (e.g. Duration for timeout), and groups common settings in sub-API. The configuration API will be introduced in 5.x, marked as experimental, and refined in minor releases. The traditional setter-based API will be marked deprecated 6.x and removed in 7.x. Benefits of the new configuration API: * fluent, the method calls can be chained and formatted in a logical way (it does not have to be 1 line = 1 parameter). * the different timeout settings use the Duration type, instead of int. It is no longer necessary to know the unit (seconds or milliseconds). * parameters for the same topic are grouped into dedicated configuration API (TLS, NIO, OAuth2, recovery, etc). It makes it much easier to configure those parts, as available settings will show up automatically in the IDE auto-completion, and not among the other dozens of settings. * more opinionated but easier configuration. OAuth2 is an example: no need to use builder classes with very long names, the refresh service part is also in the #oauth2() sub-configuration, whereas it's a separate setter in ConnectionFactory. References #608, #1139 --- .../rabbitmq/client/ConnectionFactory.java | 16 +- .../ConnectionFactoryConfiguration.java | 207 ++++++++++++++++++ .../ConnectionFactoryConfigurationDemo.java | 78 +++++++ 3 files changed, 294 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/rabbitmq/client/ConnectionFactoryConfiguration.java create mode 100644 src/test/java/com/rabbitmq/client/ConnectionFactoryConfigurationDemo.java diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index dc7ea88c72..f6f43f8eb0 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -322,9 +322,9 @@ public ConnectionFactory setVirtualHost(String virtualHost) { public ConnectionFactory setUri(URI uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException { - if ("amqp".equals(uri.getScheme().toLowerCase())) { + if ("amqp".equalsIgnoreCase(uri.getScheme())) { // nothing special to do - } else if ("amqps".equals(uri.getScheme().toLowerCase())) { + } else if ("amqps".equalsIgnoreCase(uri.getScheme())) { setPort(DEFAULT_AMQP_OVER_SSL_PORT); // SSL context factory not set yet, we use the default one if (this.sslContextFactory == null) { @@ -1253,7 +1253,7 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres ConnectionParams params = params(executor); // set client-provided via a client property if (clientProvidedName != null) { - Map properties = new HashMap(params.getClientProperties()); + Map properties = new HashMap<>(params.getClientProperties()); properties.put("connection_name", clientProvidedName); params.setClientProperties(properties); } @@ -1277,16 +1277,14 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres conn.start(); this.metricsCollector.newConnection(conn); return conn; - } catch (IOException e) { + } catch (IOException | TimeoutException e) { lastException = e; - } catch (TimeoutException te) { - lastException = te; } } if (lastException != null) { if (lastException instanceof IOException) { throw (IOException) lastException; - } else if (lastException instanceof TimeoutException) { + } else { throw (TimeoutException) lastException; } } @@ -1762,4 +1760,8 @@ public ConnectionFactory setTrafficListener(TrafficListener trafficListener) { this.trafficListener = trafficListener; return this; } + + public static ConnectionFactoryConfiguration configure() { + return null; + } } diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactoryConfiguration.java b/src/main/java/com/rabbitmq/client/ConnectionFactoryConfiguration.java new file mode 100644 index 0000000000..853253e77b --- /dev/null +++ b/src/main/java/com/rabbitmq/client/ConnectionFactoryConfiguration.java @@ -0,0 +1,207 @@ +package com.rabbitmq.client; + +import com.rabbitmq.client.impl.CredentialsProvider; +import com.rabbitmq.client.impl.CredentialsRefreshService; +import com.rabbitmq.client.impl.ErrorOnWriteListener; +import com.rabbitmq.client.impl.nio.ByteBufferFactory; +import com.rabbitmq.client.impl.nio.NioContext; +import com.rabbitmq.client.impl.nio.NioQueue; +import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier; +import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; +import com.rabbitmq.client.observation.ObservationCollector; +import java.net.HttpURLConnection; +import java.net.URI; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import javax.net.SocketFactory; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; + +public interface ConnectionFactoryConfiguration { + + ConnectionFactoryConfiguration host(String name); + ConnectionFactoryConfiguration port(int port); + ConnectionFactoryConfiguration username(String username); + ConnectionFactoryConfiguration password(String username); + ConnectionFactoryConfiguration virtualHost(String virtualHost); + ConnectionFactoryConfiguration uri(URI uri); + ConnectionFactoryConfiguration uri(String uri); + + ConnectionFactoryConfiguration requestedChannelMax(int requestedChannelMax); + ConnectionFactoryConfiguration requestedFrameMax(int requestedFrameMax); + ConnectionFactoryConfiguration requestedHeartbeat(Duration heartbeat); + ConnectionFactoryConfiguration connectionTimeout(Duration timeout); + ConnectionFactoryConfiguration handshakeTimeout(Duration timeout); + ConnectionFactoryConfiguration shutdownTimeout(Duration timeout); + ConnectionFactoryConfiguration channelRpcTimeout(Duration timeout); + + ConnectionFactoryConfiguration maxInboundMessageBodySize(int maxInboundMessageBodySize); + ConnectionFactoryConfiguration channelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType); + ConnectionFactoryConfiguration workPoolTimeout(Duration timeout); + + ConnectionFactoryConfiguration errorOnWriteListener(ErrorOnWriteListener errorOnWriteListener); + + ConnectionFactoryConfiguration trafficListener(TrafficListener trafficListener); + + // TODO provide helper for client properties + ConnectionFactoryConfiguration clientProperties(Map clientProperties); + ConnectionFactoryConfiguration clientProperty(String name, Object value); + + ConnectionFactoryConfiguration saslConfig(SaslConfig saslConfig); + + ConnectionFactoryConfiguration socketFactory(SocketFactory socketFactory); + + ConnectionFactoryConfiguration socketConfigurator(SocketConfigurator socketConfigurator); + + ConnectionFactoryConfiguration sharedExecutor(ExecutorService executorService); + ConnectionFactoryConfiguration shutdownExecutor(ExecutorService executorService); + ConnectionFactoryConfiguration heartbeatExecutor(ExecutorService executorService); + ConnectionFactoryConfiguration threadFactory(ThreadFactory threadFactory); + + ConnectionFactoryConfiguration exceptionHandler(ExceptionHandler exceptionHandler); + + ConnectionFactoryConfiguration metricsCollector(MetricsCollector metricsCollector); + ConnectionFactoryConfiguration observationCollector(ObservationCollector observationCollector); + + // TODO special configuration for credentials, especially for OAuth? + ConnectionFactoryConfiguration credentialsProvider(CredentialsProvider credentialsProvider); + ConnectionFactoryConfiguration credentialsRefreshService(CredentialsRefreshService credentialsRefreshService); + + NioConfiguration nio(); + + TlsConfiguration tls(); + + OAuth2Configuration oauth2(); + + ConnectionFactory create(); + + interface NioConfiguration { + + NioConfiguration readByteBufferSize(int readByteBufferSize); + + NioConfiguration writeByteBufferSize(int writeByteBufferSize); + + NioConfiguration nbIoThreads(int nbIoThreads); + + NioConfiguration writeEnqueuingTimeout(Duration writeEnqueuingTimeout); + + NioConfiguration writeQueueCapacity(int writeQueueCapacity); + + NioConfiguration executor(ExecutorService executorService); + + NioConfiguration threadFactory(ThreadFactory threadFactory); + + NioConfiguration socketChannelConfigurator(SocketChannelConfigurator configurator); + + NioConfiguration sslEngineConfigurator(SslEngineConfigurator configurator); + + NioConfiguration connectionShutdownExecutor(ExecutorService executorService); + + NioConfiguration byteBufferFactory(ByteBufferFactory byteBufferFactory); + + NioConfiguration writeQueueFactory(Function writeQueueFactory); + + ConnectionFactoryConfiguration configuration(); + + + } + + interface TlsConfiguration { + + TlsConfiguration hostnameVerification(); + + TlsConfiguration hostnameVerification(boolean hostnameVerification); + + TlsConfiguration sslContextFactory(SslContextFactory sslContextFactory); + + TlsConfiguration protocol(String protocol); + + TlsConfiguration trustManager(TrustManager trustManager); + + TlsConfiguration trustEverything(); + + TlsConfiguration sslContext(SSLContext sslContext); + + ConnectionFactoryConfiguration configuration(); + + } + + interface RecoveryConfiguration { + + RecoveryConfiguration enableConnectionRecovery(); + RecoveryConfiguration enableConnectionRecovery(boolean connectionRecovery); + + RecoveryConfiguration enableTopologyRecovery(); + RecoveryConfiguration enableTopologyRecovery(boolean connectionRecovery); + + RecoveryConfiguration topologyRecoveryExecutor(ExecutorService executorService); + + RecoveryConfiguration recoveryInterval(Duration interval); + + RecoveryConfiguration recoveryDelayHandler(RecoveryDelayHandler recoveryDelayHandler); + + RecoveryConfiguration topologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter); + + RecoveryConfiguration recoveryTriggeringCondition(Predicate connectionRecoveryTriggeringCondition); + + RecoveryConfiguration recoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier); + + ConnectionFactoryConfiguration configuration(); + } + + interface OAuth2Configuration { + + OAuth2Configuration tokenEndpointUri(String tokenEndpointUri); + + OAuth2Configuration clientId(String clientId); + + OAuth2Configuration clientSecret(String clientSecret); + + OAuth2Configuration grantType(String grantType); + + OAuth2Configuration parameter(String name, String value); + + OAuth2Configuration connectionConfigurator(Consumer connectionConfigurator); + + OAuth2TlsConfiguration tls(); + + OAuth2CredentialsRefreshConfiguration refresh(); + + ConnectionFactoryConfiguration configuration(); + } + + interface OAuth2TlsConfiguration { + + OAuth2TlsConfiguration hostnameVerifier(HostnameVerifier hostnameVerifier); + + OAuth2TlsConfiguration sslSocketFactory(SSLSocketFactory sslSocketFactory); + + OAuth2TlsConfiguration sslContext(SSLContext sslContext); + + OAuth2TlsConfiguration trustEverything(); + + OAuth2Configuration oauth2(); + + } + + interface OAuth2CredentialsRefreshConfiguration { + + OAuth2CredentialsRefreshConfiguration refreshDelayStrategy(Function refreshDelayStrategy); + + OAuth2CredentialsRefreshConfiguration approachingExpirationStrategy(Function approachingExpirationStrategy); + + OAuth2CredentialsRefreshConfiguration scheduler(ScheduledThreadPoolExecutor scheduler); + + OAuth2Configuration oauth2(); + + } + +} diff --git a/src/test/java/com/rabbitmq/client/ConnectionFactoryConfigurationDemo.java b/src/test/java/com/rabbitmq/client/ConnectionFactoryConfigurationDemo.java new file mode 100644 index 0000000000..1db2c7e514 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/ConnectionFactoryConfigurationDemo.java @@ -0,0 +1,78 @@ +package com.rabbitmq.client; + +import com.rabbitmq.client.impl.CredentialsProvider; +import com.rabbitmq.client.impl.CredentialsRefreshService; +import com.rabbitmq.client.impl.DefaultCredentialsRefreshService; +import com.rabbitmq.client.impl.OAuth2ClientCredentialsGrantCredentialsProvider; +import com.rabbitmq.client.impl.nio.NioParams; + +import javax.net.ssl.SSLContext; +import java.time.Duration; +import java.util.Collections; +import java.util.Map; + +import static com.rabbitmq.client.impl.DefaultCredentialsRefreshService.ratioRefreshDelayStrategy; + +public class ConnectionFactoryConfigurationDemo { + + public static void main(String[] args) throws Exception { + SSLContext sslContext = SSLContext.getDefault(); + + // historical configuration with ConnectionFactory setters + ConnectionFactory cf = new ConnectionFactory(); + cf.setUri("amqp://rabbitmq-1:5672/foo"); + cf.setChannelRpcTimeout(10_000); // unit? + Map clientProperties = Collections.singletonMap("foo", "bar"); + cf.setClientProperties(clientProperties); + cf.useSslProtocol("TLSv1.3", new TrustEverythingTrustManager()); + NioParams nioParams = new NioParams(); + nioParams.setNbIoThreads(4); + cf.setNioParams(nioParams); + + CredentialsProvider credentialsProvider = + new OAuth2ClientCredentialsGrantCredentialsProvider.OAuth2ClientCredentialsGrantCredentialsProviderBuilder() + .tokenEndpointUri("http://localhost:8080/uaa/oauth/token/") + .clientId("rabbit_client").clientSecret("rabbit_secret") + .grantType("password") + .parameter("username", "rabbit_super") + .parameter("password", "rabbit_super") + .tls() + .sslContext(sslContext) + .builder() + .build(); + cf.setCredentialsProvider(credentialsProvider); + CredentialsRefreshService refreshService = + new DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder() + .refreshDelayStrategy(ratioRefreshDelayStrategy(0.8)) + .build(); + cf.setCredentialsRefreshService(refreshService); + + // configuration with new configuration API + ConnectionFactory.configure() + .uri("amqp://rabbitmq-1:5672/foo") + .channelRpcTimeout(Duration.ofSeconds(10)) // Duration class instead of int + .clientProperty("foo", "bar") + .tls() // TLS configuration API + .protocol("TLSv1.3") + .trustEverything() + .configuration() // back to main configuration + .nio() // NIO configuration API + .nbIoThreads(4) + .configuration() // back to main configuration + .oauth2() // OAuth 2 configuration API + .tokenEndpointUri("http://localhost:8080/uaa/oauth/token/") + .clientId("rabbit_client").clientSecret("rabbit_secret") + .grantType("password") + .parameter("username", "rabbit_super") + .parameter("password", "rabbit_super") + .tls() // OAuth 2 TLS + .sslContext(sslContext) + .oauth2() + .refresh() // OAuth refresh configuration + .refreshDelayStrategy(ratioRefreshDelayStrategy(0.8)) + .oauth2() + .configuration() // back to main configuration + .create(); + } + +}