From d7980da089a7717ff120d5584b4fe2138e74c13c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 22 Aug 2016 16:08:35 +0200 Subject: [PATCH 1/2] Add strategy interface to resolve list of hosts Issue #153. Could help for #104 and #138. --- .../com/rabbitmq/client/AddressResolver.java | 12 +++++++++ .../rabbitmq/client/ConnectionFactory.java | 27 ++++++++++++++++++- .../rabbitmq/client/ListAddressResolver.java | 20 ++++++++++++++ .../recovery/AutorecoveringConnection.java | 20 +++++--------- .../RecoveryAwareAMQConnectionFactory.java | 12 ++++++--- 5 files changed, 73 insertions(+), 18 deletions(-) create mode 100644 src/main/java/com/rabbitmq/client/AddressResolver.java create mode 100644 src/main/java/com/rabbitmq/client/ListAddressResolver.java diff --git a/src/main/java/com/rabbitmq/client/AddressResolver.java b/src/main/java/com/rabbitmq/client/AddressResolver.java new file mode 100644 index 0000000000..f16672e4b9 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/AddressResolver.java @@ -0,0 +1,12 @@ +package com.rabbitmq.client; + +import java.util.List; + +/** + * Strategy interface to get the potential servers to connect to. + */ +public interface AddressResolver { + + List
getAddresses(); + +} diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index a0251f1b7c..868768b7be 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -801,6 +801,30 @@ public Connection newConnection(ExecutorService executor, List
addrs) t */ public Connection newConnection(ExecutorService executor, List
addrs, String clientProvidedName) throws IOException, TimeoutException { + return newConnection(executor, new ListAddressResolver(addrs), clientProvidedName); + } + + /** + * Create a new broker connection with a client-provided name, picking the first available address from + * the list provided by the {@link AddressResolver}. + * + * If automatic connection recovery + * is enabled, the connection returned by this method will be {@link Recoverable}. Future + * reconnection attempts will pick a random accessible address provided by the {@link AddressResolver}. + * + * @param executor thread execution service for consumers on the connection + * @param addressResolver discovery service to list potential addresses (hostname/port pairs) to connect to + * @param clientProvidedName application-specific connection name, will be displayed + * in the management UI if RabbitMQ server supports it. + * This value doesn't have to be unique and cannot be used + * as a connection identifier e.g. in HTTP API requests. + * This value is supposed to be human-readable. + * @return + * @throws IOException + * @throws TimeoutException + */ + public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName) + throws IOException, TimeoutException { // make sure we respect the provided thread factory FrameHandlerFactory fhFactory = createFrameHandlerFactory(); ConnectionParams params = params(executor); @@ -813,11 +837,12 @@ public Connection newConnection(ExecutorService executor, List
addrs, S if (isAutomaticRecoveryEnabled()) { // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection - AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs); + AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver); conn.init(); return conn; } else { + List
addrs = addressResolver.getAddresses(); IOException lastException = null; for (Address addr : addrs) { try { diff --git a/src/main/java/com/rabbitmq/client/ListAddressResolver.java b/src/main/java/com/rabbitmq/client/ListAddressResolver.java new file mode 100644 index 0000000000..5dc0c70a58 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/ListAddressResolver.java @@ -0,0 +1,20 @@ +package com.rabbitmq.client; + +import java.util.List; + +/** + * Simple implementation of {@link AddressResolver} that returns a fixed list. + */ +public class ListAddressResolver implements AddressResolver { + + private final List
addresses; + + public ListAddressResolver(List
addresses) { + this.addresses = addresses; + } + + @Override + public List
getAddresses() { + return addresses; + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 99c331e699..aac212b683 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -15,21 +15,9 @@ package com.rabbitmq.client.impl.recovery; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.BlockedListener; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.MissedHeartbeatException; -import com.rabbitmq.client.Recoverable; -import com.rabbitmq.client.RecoveryListener; -import com.rabbitmq.client.ShutdownListener; -import com.rabbitmq.client.ShutdownSignalException; -import com.rabbitmq.client.TopologyRecoveryException; +import com.rabbitmq.client.*; import com.rabbitmq.client.impl.AMQConnection; import com.rabbitmq.client.impl.ConnectionParams; -import com.rabbitmq.client.ExceptionHandler; import com.rabbitmq.client.impl.FrameHandlerFactory; import com.rabbitmq.client.impl.NetworkConnection; @@ -94,7 +82,11 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ private final Object recoveryLock = new Object(); public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List
addrs) { - this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addrs); + this(params, f, new ListAddressResolver(addrs)); + } + + public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, AddressResolver addressResolver) { + this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addressResolver); this.params = params; this.channels = new ConcurrentHashMap(); diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java index d4e87fbf1c..5c8c6250dd 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java @@ -16,6 +16,8 @@ package com.rabbitmq.client.impl.recovery; import com.rabbitmq.client.Address; +import com.rabbitmq.client.AddressResolver; +import com.rabbitmq.client.ListAddressResolver; import com.rabbitmq.client.impl.ConnectionParams; import com.rabbitmq.client.impl.FrameHandler; import com.rabbitmq.client.impl.FrameHandlerFactory; @@ -29,12 +31,16 @@ public class RecoveryAwareAMQConnectionFactory { private final ConnectionParams params; private final FrameHandlerFactory factory; - private final List
addrs; + private final AddressResolver addressResolver; public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, List
addrs) { + this(params, factory, new ListAddressResolver(addrs)); + } + + public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, AddressResolver addressResolver) { this.params = params; this.factory = factory; - this.addrs = addrs; + this.addressResolver = addressResolver; } /** @@ -43,7 +49,7 @@ public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFa */ RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException { IOException lastException = null; - List
shuffled = shuffle(addrs); + List
shuffled = shuffle(addressResolver.getAddresses()); for (Address addr : shuffled) { try { From 7fa68ec2c34ad63eb9e9c089d6c1a680aace18bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 22 Aug 2016 16:49:40 +0200 Subject: [PATCH 2/2] Add 2 constructors with AddressResolver parameter Issue #153 --- .../rabbitmq/client/ConnectionFactory.java | 41 +++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index 868768b7be..3743d200ba 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -661,6 +661,23 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce return newConnection(this.sharedExecutor, Arrays.asList(addrs), null); } + /** + * Create a new broker connection, picking the first available address from + * the list provided by the {@link AddressResolver}. + * + * If automatic connection recovery + * is enabled, the connection returned by this method will be {@link Recoverable}. Future + * reconnection attempts will pick a random accessible address provided by the {@link AddressResolver}. + * + * @param addressResolver discovery service to list potential addresses (hostname/port pairs) to connect to + * @return an interface to the connection + * @throws IOException if it encounters a problem + * @see Automatic Recovery + */ + public Connection newConnection(AddressResolver addressResolver) throws IOException, TimeoutException { + return newConnection(this.sharedExecutor, addressResolver, null); + } + /** * Create a new broker connection with a client-provided name, picking the first available address from @@ -780,6 +797,24 @@ public Connection newConnection(ExecutorService executor, List
addrs) t return newConnection(executor, addrs, null); } + /** + * Create a new broker connection, picking the first available address from + * the list provided by the {@link AddressResolver}. + * + * If automatic connection recovery + * is enabled, the connection returned by this method will be {@link Recoverable}. Future + * reconnection attempts will pick a random accessible address provided by the {@link AddressResolver}. + * + * @param executor thread execution service for consumers on the connection + * @param addressResolver discovery service to list potential addresses (hostname/port pairs) to connect to + * @return an interface to the connection + * @throws java.io.IOException if it encounters a problem + * @see Automatic Recovery + */ + public Connection newConnection(ExecutorService executor, AddressResolver addressResolver) throws IOException, TimeoutException { + return newConnection(executor, addressResolver, null); + } + /** * Create a new broker connection with a client-provided name, picking the first available address from * the list. @@ -819,9 +854,9 @@ public Connection newConnection(ExecutorService executor, List
addrs, S * This value doesn't have to be unique and cannot be used * as a connection identifier e.g. in HTTP API requests. * This value is supposed to be human-readable. - * @return - * @throws IOException - * @throws TimeoutException + * @return an interface to the connection + * @throws java.io.IOException if it encounters a problem + * @see Automatic Recovery */ public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName) throws IOException, TimeoutException {