diff --git a/src/main/java/com/rabbitmq/client/AddressResolver.java b/src/main/java/com/rabbitmq/client/AddressResolver.java new file mode 100644 index 0000000000..f16672e4b9 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/AddressResolver.java @@ -0,0 +1,12 @@ +package com.rabbitmq.client; + +import java.util.List; + +/** + * Strategy interface to get the potential servers to connect to. + */ +public interface AddressResolver { + + List
getAddresses(); + +} diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index a0251f1b7c..3743d200ba 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -661,6 +661,23 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce return newConnection(this.sharedExecutor, Arrays.asList(addrs), null); } + /** + * Create a new broker connection, picking the first available address from + * the list provided by the {@link AddressResolver}. + * + * If automatic connection recovery + * is enabled, the connection returned by this method will be {@link Recoverable}. Future + * reconnection attempts will pick a random accessible address provided by the {@link AddressResolver}. + * + * @param addressResolver discovery service to list potential addresses (hostname/port pairs) to connect to + * @return an interface to the connection + * @throws IOException if it encounters a problem + * @see Automatic Recovery + */ + public Connection newConnection(AddressResolver addressResolver) throws IOException, TimeoutException { + return newConnection(this.sharedExecutor, addressResolver, null); + } + /** * Create a new broker connection with a client-provided name, picking the first available address from @@ -780,6 +797,24 @@ public Connection newConnection(ExecutorService executor, List addrs) t return newConnection(executor, addrs, null); } + /** + * Create a new broker connection, picking the first available address from + * the list provided by the {@link AddressResolver}. + * + * If automatic connection recovery + * is enabled, the connection returned by this method will be {@link Recoverable}. Future + * reconnection attempts will pick a random accessible address provided by the {@link AddressResolver}. + * + * @param executor thread execution service for consumers on the connection + * @param addressResolver discovery service to list potential addresses (hostname/port pairs) to connect to + * @return an interface to the connection + * @throws java.io.IOException if it encounters a problem + * @see Automatic Recovery + */ + public Connection newConnection(ExecutorService executor, AddressResolver addressResolver) throws IOException, TimeoutException { + return newConnection(executor, addressResolver, null); + } + /** * Create a new broker connection with a client-provided name, picking the first available address from * the list. @@ -801,6 +836,30 @@ public Connection newConnection(ExecutorService executor, List addrs) t */ public Connection newConnection(ExecutorService executor, List addrs, String clientProvidedName) throws IOException, TimeoutException { + return newConnection(executor, new ListAddressResolver(addrs), clientProvidedName); + } + + /** + * Create a new broker connection with a client-provided name, picking the first available address from + * the list provided by the {@link AddressResolver}. + * + * If automatic connection recovery + * is enabled, the connection returned by this method will be {@link Recoverable}. Future + * reconnection attempts will pick a random accessible address provided by the {@link AddressResolver}. + * + * @param executor thread execution service for consumers on the connection + * @param addressResolver discovery service to list potential addresses (hostname/port pairs) to connect to + * @param clientProvidedName application-specific connection name, will be displayed + * in the management UI if RabbitMQ server supports it. + * This value doesn't have to be unique and cannot be used + * as a connection identifier e.g. in HTTP API requests. + * This value is supposed to be human-readable. + * @return an interface to the connection + * @throws java.io.IOException if it encounters a problem + * @see Automatic Recovery + */ + public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName) + throws IOException, TimeoutException { // make sure we respect the provided thread factory FrameHandlerFactory fhFactory = createFrameHandlerFactory(); ConnectionParams params = params(executor); @@ -813,11 +872,12 @@ public Connection newConnection(ExecutorService executor, List addrs, S if (isAutomaticRecoveryEnabled()) { // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection - AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs); + AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver); conn.init(); return conn; } else { + List addrs = addressResolver.getAddresses(); IOException lastException = null; for (Address addr : addrs) { try { diff --git a/src/main/java/com/rabbitmq/client/ListAddressResolver.java b/src/main/java/com/rabbitmq/client/ListAddressResolver.java new file mode 100644 index 0000000000..5dc0c70a58 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/ListAddressResolver.java @@ -0,0 +1,20 @@ +package com.rabbitmq.client; + +import java.util.List; + +/** + * Simple implementation of {@link AddressResolver} that returns a fixed list. + */ +public class ListAddressResolver implements AddressResolver { + + private final List addresses; + + public ListAddressResolver(List addresses) { + this.addresses = addresses; + } + + @Override + public List getAddresses() { + return addresses; + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 99c331e699..aac212b683 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -15,21 +15,9 @@ package com.rabbitmq.client.impl.recovery; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.BlockedListener; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.MissedHeartbeatException; -import com.rabbitmq.client.Recoverable; -import com.rabbitmq.client.RecoveryListener; -import com.rabbitmq.client.ShutdownListener; -import com.rabbitmq.client.ShutdownSignalException; -import com.rabbitmq.client.TopologyRecoveryException; +import com.rabbitmq.client.*; import com.rabbitmq.client.impl.AMQConnection; import com.rabbitmq.client.impl.ConnectionParams; -import com.rabbitmq.client.ExceptionHandler; import com.rabbitmq.client.impl.FrameHandlerFactory; import com.rabbitmq.client.impl.NetworkConnection; @@ -94,7 +82,11 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ private final Object recoveryLock = new Object(); public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List addrs) { - this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addrs); + this(params, f, new ListAddressResolver(addrs)); + } + + public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, AddressResolver addressResolver) { + this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addressResolver); this.params = params; this.channels = new ConcurrentHashMap