diff --git a/src/main/java/org/tarantool/ReconnectingSocketProvider.java b/src/main/java/org/tarantool/ReconnectingSocketProvider.java new file mode 100644 index 00000000..895886c9 --- /dev/null +++ b/src/main/java/org/tarantool/ReconnectingSocketProvider.java @@ -0,0 +1,101 @@ +package org.tarantool; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.nio.channels.SocketChannel; + +/** + * Basic SocketChannelProvider implementation with the ability of reconnecting after failure. + * To be used with {@link TarantoolClientImpl}. + */ +public abstract class ReconnectingSocketProvider implements SocketChannelProvider { + /** Timeout to establish socket connection with an individual server. */ + private int timeout; // 0 is infinite. + /** Limit of retries. */ + private int retriesLimit = -1; // No-limit. + + /** + * @return Maximum amount of time to wait for a socket connection establishment + * with an individual server. + */ + public int getTimeout() { + return timeout; + } + + /** + * Sets maximum amount of time to wait for a socket connection establishment + * with an individual server. + * + * Zero means infinite timeout. + * + * @param timeout Timeout value, ms. + * @return {@code this}. + * @throws IllegalArgumentException If timeout is negative. + */ + public ReconnectingSocketProvider setTimeout(int timeout) { + if (timeout < 0) { + throw new IllegalArgumentException("timeout is negative"); + } + this.timeout = timeout; + return this; + } + + /** + * @return Maximum reconnect attempts to make before raising exception. + */ + public int getRetriesLimit() { + return retriesLimit; + } + + /** + * Sets maximum amount of reconnect attempts to be made before an exception is raised. + * The retry count is maintained by a {@link #get(int, Throwable)} caller + * when a socket level connection was established. + * + * Negative value means unlimited. + * + * @param retriesLimit Limit of retries to use. + * @return {@code this}. + */ + public ReconnectingSocketProvider setRetriesLimit(int retriesLimit) { + this.retriesLimit = retriesLimit; + return this; + } + + /** + * Provides a decision on whether retries limit is hit. + * + * @param retries Current count of retries. + * @return {@code true} if retries are exhausted. + */ + private boolean areRetriesExhausted(int retries) { + int limit = getRetriesLimit(); + if (limit < 0) + return false; + return retries >= limit; + } + + /** + * Return a configured socket address where a Tarantool instance is listening to + * @return {@link java.net.InetSocketAddress} + */ + abstract InetSocketAddress getSocketAddress(); + + /** {@inheritDoc} */ + @Override + public SocketChannel get(int retryNumber, Throwable lastError) { + if (areRetriesExhausted(retryNumber)) { + throw new CommunicationException("Connection retries exceeded.", lastError); + } + try (SocketChannel channel = SocketChannel.open()) { + InetSocketAddress addr = getSocketAddress(); + channel.socket().connect(addr, timeout); + return channel; + } catch (SocketTimeoutException e) { + throw new CommunicationException("Connection timed out", e); + } catch (IOException e) { + throw new CommunicationException("Failed to establish a connection", e); + } + } +} diff --git a/src/main/java/org/tarantool/ReconnectingSocketProviderImpl.java b/src/main/java/org/tarantool/ReconnectingSocketProviderImpl.java new file mode 100644 index 00000000..c0ccb81e --- /dev/null +++ b/src/main/java/org/tarantool/ReconnectingSocketProviderImpl.java @@ -0,0 +1,59 @@ +package org.tarantool; + +import java.net.InetSocketAddress; + +public class ReconnectingSocketProviderImpl extends ReconnectingSocketProvider { + + private String host; + private int port; + + /** + * Returns the IP address or hostname of a Tarantool server + * @return {@link java.lang.String} + */ + public String getHost() { + return host; + } + + public ReconnectingSocketProviderImpl setHost(String host) { + if (host == null || host.isEmpty()) { + throw new IllegalArgumentException("Tarantool server host is empty"); + } + this.host = host; + return this; + } + + /** + * Returns the Tarantool server port + * @return {@code int} + */ + public int getPort() { + return port; + } + + public void setPort(int port) { + if (port <= 0) { + throw new IllegalArgumentException("Tarantool server port is less or equal to 0"); + } + this.port = port; + } + + public ReconnectingSocketProviderImpl(String host, int port) { + if (host == null || host.isEmpty()) { + throw new IllegalArgumentException("Tarantool server host is empty"); + } + if (port <= 0) { + throw new IllegalArgumentException("Tarantool server port is less or equal to 0"); + } + this.host = host; + this.port = port; + } + + /** + * {@inheritDoc} + */ + @Override + public InetSocketAddress getSocketAddress() { + return new InetSocketAddress(this.host, this.port); + } +} diff --git a/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java b/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java index d16c6bf4..5ffecb1e 100644 --- a/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java +++ b/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java @@ -1,19 +1,13 @@ package org.tarantool; -import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; import java.util.Arrays; /** * Basic reconnection strategy that changes addresses in a round-robin fashion. * To be used with {@link TarantoolClientImpl}. */ -public class RoundRobinSocketProviderImpl implements SocketChannelProvider { - /** Timeout to establish socket connection with an individual server. */ - private int timeout; // 0 is infinite. - /** Limit of retries. */ - private int retriesLimit = -1; // No-limit. +public class RoundRobinSocketProviderImpl extends ReconnectingSocketProvider { /** Server addresses as configured. */ private final String[] addrs; /** Socket addresses. */ @@ -47,107 +41,24 @@ public String[] getAddresses() { } /** - * Sets maximum amount of time to wait for a socket connection establishment - * with an individual server. - * - * Zero means infinite timeout. - * - * @param timeout Timeout value, ms. - * @return {@code this}. - * @throws IllegalArgumentException If timeout is negative. - */ - public RoundRobinSocketProviderImpl setTimeout(int timeout) { - if (timeout < 0) - throw new IllegalArgumentException("timeout is negative."); - - this.timeout = timeout; - - return this; - } - - /** - * @return Maximum amount of time to wait for a socket connection establishment - * with an individual server. - */ - public int getTimeout() { - return timeout; - } - - /** - * Sets maximum amount of reconnect attempts to be made before an exception is raised. - * The retry count is maintained by a {@link #get(int, Throwable)} caller - * when a socket level connection was established. - * - * Negative value means unlimited. - * - * @param retriesLimit Limit of retries to use. - * @return {@code this}. + * @return Number of configured addresses. */ - public RoundRobinSocketProviderImpl setRetriesLimit(int retriesLimit) { - this.retriesLimit = retriesLimit; - - return this; + protected int getAddressCount() { + return sockAddrs.length; } /** - * @return Maximum reconnect attempts to make before raising exception. + * {@inheritDoc} */ - public int getRetriesLimit() { - return retriesLimit; - } - - /** {@inheritDoc} */ @Override - public SocketChannel get(int retryNumber, Throwable lastError) { - if (areRetriesExhausted(retryNumber)) { - throw new CommunicationException("Connection retries exceeded.", lastError); - } - int attempts = getAddressCount(); - long deadline = System.currentTimeMillis() + timeout * attempts; - while (!Thread.currentThread().isInterrupted()) { - SocketChannel channel = null; - try { - channel = SocketChannel.open(); - InetSocketAddress addr = getNextSocketAddress(); - channel.socket().connect(addr, timeout); - return channel; - } catch (IOException e) { - if (channel != null) { - try { - channel.close(); - } catch (IOException ignored) { - // No-op. - } - } - long now = System.currentTimeMillis(); - if (deadline <= now) { - throw new CommunicationException("Connection time out.", e); - } - if (--attempts == 0) { - // Tried all addresses without any lack, but still have time. - attempts = getAddressCount(); - try { - Thread.sleep((deadline - now) / attempts); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - } - } - } - throw new CommunicationException("Thread interrupted.", new InterruptedException()); - } - - /** - * @return Number of configured addresses. - */ - protected int getAddressCount() { - return sockAddrs.length; + public int getRetriesLimit() { + return super.getRetriesLimit() * getAddressCount(); } /** * @return Socket address to use for the next reconnection attempt. */ - protected InetSocketAddress getNextSocketAddress() { + protected InetSocketAddress getSocketAddress() { InetSocketAddress res = sockAddrs[pos]; pos = (pos + 1) % sockAddrs.length; return res; @@ -166,17 +77,4 @@ protected InetSocketAddress parseAddress(String addr) { int port = (idx < 0) ? 3301 : Integer.parseInt(addr.substring(idx + 1)); return new InetSocketAddress(host, port); } - - /** - * Provides a decision on whether retries limit is hit. - * - * @param retries Current count of retries. - * @return {@code true} if retries are exhausted. - */ - private boolean areRetriesExhausted(int retries) { - int limit = getRetriesLimit(); - if (limit < 0) - return false; - return retries >= limit; - } }