diff --git a/README.md b/README.md
index 9600bfb2..07943436 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,7 @@ To get the Java connector for Tarantool 1.6.9, visit
## Table of contents
* [Getting started](#getting-started)
+* [Cluster support](#cluster-support)
* [Where to get help](#where-to-get-help)
## Getting started
@@ -156,6 +157,102 @@ System.out.println(template.query("select * from hello_world where hello=:id", C
For more implementation details, see [API documentation](http://tarantool.github.io/tarantool-java/apidocs/index.html).
+## Cluster support
+
+To be more fault-tolerant the connector provides cluster extensions. In
+particular `TarantoolClusterClient` and built-in `RoundRobinSocketProviderImpl`
+used as a default `SocketProvider` implementation. When currently connected
+instance is down then the client will try to reconnect to the first available
+instance using strategy defined in a socket provider. You need to supply
+a list of nodes which will be used by the cluster client to provide such
+ability. Also you can prefer to use a [discovery mechanism](#auto-discovery)
+in order to dynamically fetch and apply the node list.
+
+### Basic cluster client usage
+
+1. Configure `TarantoolClusterClientConfig`:
+
+```java
+TarantoolClusterClientConfig config = new TarantoolClusterClientConfig();
+// fill other settings
+config.operationExpiryTimeMillis = 2000;
+config.executor = Executors.newSingleThreadExecutor();
+```
+
+2. Create an instance of `TarantoolClusterClientImpl`. You need to provide
+an initial list of nodes:
+
+```java
+String[] nodes = new String[] { "myHost1:3301", "myHost2:3302", "myHost3:3301" };
+TarantoolClusterClient client = new TarantoolClusterClient(config, nodes);
+```
+
+3. Work with the client using same API as defined in `TarantoolClient`:
+
+```java
+client.syncOps().insert(23, Arrays.asList(1, 1));
+```
+
+### Auto-discovery
+
+Auto-discovery feature allows a cluster client to fetch addresses of
+cluster nodes to reflect changes related to the cluster topology. To achieve
+this you have to create a Lua function on the server side which returns
+a single array result. Client periodically pools the server to obtain a
+fresh list and apply it if its content changes.
+
+1. On the server side create a function which returns nodes:
+
+```bash
+tarantool> function get_cluster_nodes() return { 'host1:3301', 'host2:3302', 'host3:3301' } end
+```
+
+You need to pay attention to a function contract we are currently supporting:
+* The client never passes any arguments to a discovery function.
+* A discovery function _should_ return a single result of strings (i.e. single
+ string `return 'host:3301'` or array of strings `return {'host1:3301', 'host2:3301'}`).
+* A discovery function _may_ return multi-results but the client takes
+ into account only first of them (i.e. `return {'host:3301'}, discovery_delay`, where
+ the second result is unused). Even more, any extra results __are reserved__ by the client
+ in order to extend its contract with a backward compatibility.
+* A discovery function _should NOT_ return no results, empty result, wrong type result,
+ and Lua errors. The client discards such kinds of results but it does not affect the discovery
+ process for next scheduled tasks.
+
+2. On the client side configure discovery settings in `TarantoolClusterClientConfig`:
+
+```java
+TarantoolClusterClientConfig config = new TarantoolClusterClientConfig();
+// fill other settings
+config.clusterDiscoveryEntryFunction = "get_cluster_nodes"; // discovery function used to fetch nodes
+config.clusterDiscoveryDelayMillis = 60_000; // how often client polls the discovery server
+```
+
+3. Create a client using the config made above.
+
+```java
+TarantoolClusterClient client = new TarantoolClusterClient(config);
+client.syncOps().insert(45, Arrays.asList(1, 1));
+```
+
+### Auto-discovery caveats
+
+* You need to set _not empty_ value to `clusterDiscoveryEntryFunction` to enable auto-discovery feature.
+* There are only two cases when a discovery task runs: just after initialization of the cluster
+ client and a periodical scheduler timeout defined in `TarantoolClusterClientConfig.clusterDiscoveryDelayMillis`.
+* A discovery task always uses an active client connection to get the nodes list.
+ It's in your responsibility to provide a function availability as well as a consistent
+ nodes list on all instances you initially set or obtain from the task.
+* If some error occurs while a discovery task is running then this task
+ will be aborted without any after-effects for next task executions. These cases, for instance, are
+ a wrong function result (see discovery function contract) or a broken connection.
+ There is an exception if the client is closed then discovery process will stop permanently.
+* It's possible to obtain a list which does NOT contain the node we are currently
+ connected to. It leads the client to try to reconnect to another node from the
+ new list. It may take some time to graceful disconnect from the current node.
+ The client does its best to catch the moment when there are no pending responses
+ and perform a reconnection.
+
## Where to get help
Got problems or questions? Post them on
@@ -164,6 +261,7 @@ Got problems or questions? Post them on
base for possible answers and solutions.
## Building
+
To run tests
```
./mvnw clean test
diff --git a/src/main/java/org/tarantool/BaseSocketChannelProvider.java b/src/main/java/org/tarantool/BaseSocketChannelProvider.java
new file mode 100644
index 00000000..a532bcdf
--- /dev/null
+++ b/src/main/java/org/tarantool/BaseSocketChannelProvider.java
@@ -0,0 +1,169 @@
+package org.tarantool;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+
+public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
+
+ /**
+ * Limit of retries.
+ */
+ private int retriesLimit = RETRY_NO_LIMIT;
+
+ /**
+ * Timeout to establish socket connection with an individual server.
+ */
+ private int timeout = NO_TIMEOUT;
+
+ /**
+ * Tries to establish a new connection to the Tarantool instances.
+ *
+ * @param retryNumber number of current retry. Reset after successful connect.
+ * @param lastError the last error occurs when reconnecting
+ *
+ * @return connected socket channel
+ *
+ * @throws CommunicationException if any I/O errors happen or there are
+ * no addresses available
+ */
+ @Override
+ public final SocketChannel get(int retryNumber, Throwable lastError) {
+ if (areRetriesExhausted(retryNumber)) {
+ throw new CommunicationException("Connection retries exceeded.", lastError);
+ }
+
+ long deadline = System.currentTimeMillis() + timeout;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ InetSocketAddress address = getAddress(retryNumber, lastError);
+ return openChannel(address);
+ } catch (IOException e) {
+ checkTimeout(deadline, e);
+ }
+ }
+ throw new CommunicationException("Thread interrupted.", new InterruptedException());
+ }
+
+ private void checkTimeout(long deadline, Exception e) {
+ long timeLeft = deadline - System.currentTimeMillis();
+ if (timeLeft <= 0) {
+ throw new CommunicationException("Connection time out.", e);
+ }
+ try {
+ Thread.sleep(timeLeft / 10);
+ } catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Gets address to be used to establish a new connection
+ * Address can be null.
+ *
+ * @param retryNumber reconnection attempt number
+ * @param lastError reconnection reason
+ *
+ * @return available address which is depended on implementation
+ *
+ * @throws IOException if any I/O errors occur
+ */
+ protected abstract InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException;
+
+ /**
+ * Sets maximum amount of reconnect attempts to be made before an exception is raised.
+ * The retry count is maintained by a {@link #get(int, Throwable)} caller
+ * when a socket level connection was established.
+ *
+ * Negative value means unlimited attempts.
+ *
+ * @param retriesLimit Limit of retries to use.
+ */
+ public void setRetriesLimit(int retriesLimit) {
+ this.retriesLimit = retriesLimit;
+ }
+
+ /**
+ * Gets limit of attempts to establish connection.
+ *
+ * @return Maximum reconnect attempts to make before raising exception.
+ */
+ public int getRetriesLimit() {
+ return retriesLimit;
+ }
+
+ /**
+ * Parse a string address in the form of host[:port]
+ * and builds a socket address.
+ *
+ * @param address Server address.
+ *
+ * @return Socket address.
+ */
+ protected InetSocketAddress parseAddress(String address) {
+ int separatorPosition = address.indexOf(':');
+ String host = (separatorPosition < 0) ? address : address.substring(0, separatorPosition);
+ int port = (separatorPosition < 0) ? 3301 : Integer.parseInt(address.substring(separatorPosition + 1));
+ return new InetSocketAddress(host, port);
+ }
+
+ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOException {
+ SocketChannel channel = null;
+ try {
+ channel = SocketChannel.open();
+ channel.socket().connect(socketAddress, timeout);
+ return channel;
+ } catch (IOException e) {
+ if (channel != null) {
+ try {
+ channel.close();
+ } catch (IOException ignored) {
+ // No-op.
+ }
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Sets maximum amount of time to wait for a socket connection establishment
+ * with an individual server.
+ *
+ * Zero means infinite timeout.
+ *
+ * @param timeout timeout value, ms.
+ *
+ * @throws IllegalArgumentException if timeout is negative.
+ */
+ public void setTimeout(int timeout) {
+ if (timeout < 0) {
+ throw new IllegalArgumentException("timeout is negative.");
+ }
+ this.timeout = timeout;
+ }
+
+ /**
+ * Gest maximum amount of time to wait for a socket
+ * connection establishment with an individual server.
+ *
+ * @return timeout
+ */
+ public int getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Provides a decision on whether retries limit is hit.
+ *
+ * @param retries Current count of retries.
+ *
+ * @return {@code true} if retries are exhausted.
+ */
+ private boolean areRetriesExhausted(int retries) {
+ int limit = getRetriesLimit();
+ if (limit < 0) {
+ return false;
+ }
+ return retries >= limit;
+ }
+}
diff --git a/src/main/java/org/tarantool/RefreshableSocketProvider.java b/src/main/java/org/tarantool/RefreshableSocketProvider.java
new file mode 100644
index 00000000..a3ccc56f
--- /dev/null
+++ b/src/main/java/org/tarantool/RefreshableSocketProvider.java
@@ -0,0 +1,12 @@
+package org.tarantool;
+
+import java.net.SocketAddress;
+import java.util.Collection;
+
+public interface RefreshableSocketProvider {
+
+ Collection getAddresses();
+
+ void refreshAddresses(Collection addresses);
+
+}
diff --git a/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java b/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java
index 4cb9c5f0..24820a03 100644
--- a/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java
+++ b/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java
@@ -2,163 +2,123 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
+import java.net.SocketAddress;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
/**
* Basic reconnection strategy that changes addresses in a round-robin fashion.
* To be used with {@link TarantoolClientImpl}.
*/
-public class RoundRobinSocketProviderImpl implements SocketChannelProvider {
- /**
- * Timeout to establish socket connection with an individual server.
- * 0 is infinite.
- */
- private int timeout;
-
- /**
- * Limit of retries (-1 = no limit).
- */
- private int retriesLimit = -1;
-
- /**
- * Server addresses as configured.
- */
- private final String[] addrs;
+public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider implements RefreshableSocketProvider {
- /**
- * Socket addresses.
- */
- private final InetSocketAddress[] sockAddrs;
+ private static final int UNSET_POSITION = -1;
/**
- * Current position within {@link #sockAddrs} array.
+ * Socket addresses pool.
*/
- private int pos;
+ private final List socketAddresses = new ArrayList<>();
/**
- * Constructs an instance.
+ * Current position within {@link #socketAddresses} list.
+ *
+ * It is {@link #UNSET_POSITION} when no addresses from
+ * the {@link #socketAddresses} pool have been processed yet.
+ *
+ * When this provider receives new addresses it tries
+ * to look for a new position for the last used address or
+ * sets the position to {@link #UNSET_POSITION} otherwise.
*
- * @param addrs Array of addresses in a form of [host]:[port].
+ * @see #getLastObtainedAddress()
+ * @see #refreshAddresses(Collection)
*/
- public RoundRobinSocketProviderImpl(String... addrs) {
- if (addrs == null || addrs.length == 0) {
- throw new IllegalArgumentException("addrs is null or empty.");
- }
-
- this.addrs = Arrays.copyOf(addrs, addrs.length);
-
- sockAddrs = new InetSocketAddress[this.addrs.length];
-
- for (int i = 0; i < this.addrs.length; i++) {
- sockAddrs[i] = parseAddress(this.addrs[i]);
- }
- }
+ private AtomicInteger currentPosition = new AtomicInteger(UNSET_POSITION);
/**
- * Gets raw addresses list.
+ * Address list lock for a thread-safe access to it
+ * when a refresh operation occurs.
*
- * @return Configured addresses in a form of [host]:[port].
+ * @see RefreshableSocketProvider#refreshAddresses(Collection)
*/
- public String[] getAddresses() {
- return this.addrs;
- }
+ private ReadWriteLock addressListLock = new ReentrantReadWriteLock();
/**
- * Sets maximum amount of time to wait for a socket connection establishment
- * with an individual server.
+ * Constructs an instance.
*
- * Zero means infinite timeout.
+ * @param addresses optional array of addresses in a form of host[:port]
*
- * @param timeout Timeout value, ms.
- * @return {@code this}.
- * @throws IllegalArgumentException If timeout is negative.
+ * @throws IllegalArgumentException if addresses aren't provided
*/
- public RoundRobinSocketProviderImpl setTimeout(int timeout) {
- if (timeout < 0) {
- throw new IllegalArgumentException("timeout is negative.");
- }
-
- this.timeout = timeout;
-
- return this;
+ public RoundRobinSocketProviderImpl(String... addresses) {
+ updateAddressList(Arrays.asList(addresses));
}
- /**
- * Gets maximum amount of time to wait for a socket connection establishment
- * with an individual server.
- *
- * @return timeout
- */
- public int getTimeout() {
- return timeout;
+ private void updateAddressList(Collection addresses) {
+ if (addresses == null || addresses.isEmpty()) {
+ throw new IllegalArgumentException("At least one address must be provided");
+ }
+ Lock writeLock = addressListLock.writeLock();
+ writeLock.lock();
+ try {
+ InetSocketAddress lastAddress = getLastObtainedAddress();
+ socketAddresses.clear();
+ addresses.stream()
+ .map(this::parseAddress)
+ .collect(Collectors.toCollection(() -> socketAddresses));
+ if (lastAddress != null) {
+ int recoveredPosition = socketAddresses.indexOf(lastAddress);
+ currentPosition.set(recoveredPosition);
+ } else {
+ currentPosition.set(UNSET_POSITION);
+ }
+ } finally {
+ writeLock.unlock();
+ }
}
/**
- * Sets maximum amount of reconnect attempts to be made before an exception is raised.
- * The retry count is maintained by a {@link #get(int, Throwable)} caller
- * when a socket level connection was established.
- *
- * Negative value means unlimited.
+ * Gets parsed and resolved internet addresses.
*
- * @param retriesLimit Limit of retries to use.
- * @return {@code this}.
- */
- public RoundRobinSocketProviderImpl setRetriesLimit(int retriesLimit) {
- this.retriesLimit = retriesLimit;
- return this;
+ * @return socket addresses
+ */
+ public List getAddresses() {
+ Lock readLock = addressListLock.readLock();
+ readLock.lock();
+ try {
+ return Collections.unmodifiableList(this.socketAddresses);
+ } finally {
+ readLock.unlock();
+ }
}
/**
- * Gets number of maximum attempts to establish connection.
+ * Gets last used address from the pool if it exists.
*
- * @return max attempts number.
- */
- public int getRetriesLimit() {
- return retriesLimit;
+ * @return last obtained address or null
+ * if {@link #currentPosition} has {@link #UNSET_POSITION} value
+ */
+ protected InetSocketAddress getLastObtainedAddress() {
+ Lock readLock = addressListLock.readLock();
+ readLock.lock();
+ try {
+ int index = currentPosition.get();
+ return index != UNSET_POSITION ? socketAddresses.get(index) : null;
+ } finally {
+ readLock.unlock();
+ }
}
- /**
- * {@inheritDoc}
- */
@Override
- public SocketChannel get(int retryNumber, Throwable lastError) {
- if (areRetriesExhausted(retryNumber)) {
- throw new CommunicationException("Connection retries exceeded.", lastError);
- }
- int attempts = getAddressCount();
- long deadline = System.currentTimeMillis() + timeout * attempts;
- while (!Thread.currentThread().isInterrupted()) {
- SocketChannel channel = null;
- try {
- channel = SocketChannel.open();
- InetSocketAddress addr = getNextSocketAddress();
- channel.socket().connect(addr, timeout);
- return channel;
- } catch (IOException e) {
- if (channel != null) {
- try {
- channel.close();
- } catch (IOException ignored) {
- // No-op.
- }
- }
- long now = System.currentTimeMillis();
- if (deadline <= now) {
- throw new CommunicationException("Connection time out.", e);
- }
- if (--attempts == 0) {
- // Tried all addresses without any lack, but still have time.
- attempts = getAddressCount();
- try {
- Thread.sleep((deadline - now) / attempts);
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- throw new CommunicationException("Thread interrupted.", new InterruptedException());
+ protected InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException {
+ return getNextSocketAddress();
}
/**
@@ -167,45 +127,40 @@ public SocketChannel get(int retryNumber, Throwable lastError) {
* @return Number of configured addresses.
*/
protected int getAddressCount() {
- return sockAddrs.length;
+ Lock readLock = addressListLock.readLock();
+ readLock.lock();
+ try {
+ return socketAddresses.size();
+ } finally {
+ readLock.unlock();
+ }
}
/**
* Gets next address from the pool to be used to connect.
*
- * @return Socket address to use for the next reconnection attempt.
+ * @return Socket address to use for the next reconnection attempt
*/
protected InetSocketAddress getNextSocketAddress() {
- InetSocketAddress res = sockAddrs[pos];
- pos = (pos + 1) % sockAddrs.length;
- return res;
+ Lock readLock = addressListLock.readLock();
+ readLock.lock();
+ try {
+ int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
+ return socketAddresses.get(position);
+ } finally {
+ readLock.unlock();
+ }
}
/**
- * Parse a string address in the form of [host]:[port]
- * and builds a socket address.
+ * Update addresses pool by new list.
*
- * @param addr Server address.
- * @return Socket address.
- */
- protected InetSocketAddress parseAddress(String addr) {
- int idx = addr.indexOf(':');
- String host = (idx < 0) ? addr : addr.substring(0, idx);
- int port = (idx < 0) ? 3301 : Integer.parseInt(addr.substring(idx + 1));
- return new InetSocketAddress(host, port);
- }
-
- /**
- * Provides a decision on whether retries limit is hit.
+ * @param addresses list of addresses to be applied
*
- * @param retries Current count of retries.
- * @return {@code true} if retries are exhausted.
+ * @throws IllegalArgumentException if addresses list is empty
*/
- private boolean areRetriesExhausted(int retries) {
- int limit = getRetriesLimit();
- if (limit < 0) {
- return false;
- }
- return retries >= limit;
+ public void refreshAddresses(Collection addresses) {
+ updateAddressList(addresses);
}
+
}
diff --git a/src/main/java/org/tarantool/SingleSocketChannelProviderImpl.java b/src/main/java/org/tarantool/SingleSocketChannelProviderImpl.java
new file mode 100644
index 00000000..0829bdc2
--- /dev/null
+++ b/src/main/java/org/tarantool/SingleSocketChannelProviderImpl.java
@@ -0,0 +1,43 @@
+package org.tarantool;
+
+import org.tarantool.util.StringUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+/**
+ * Simple provider that produces a single connection.
+ * To be used with {@link TarantoolClientImpl}.
+ */
+public class SingleSocketChannelProviderImpl extends BaseSocketChannelProvider {
+
+ private InetSocketAddress address;
+
+ /**
+ * Creates a simple provider.
+ *
+ * @param address instance address
+ */
+ public SingleSocketChannelProviderImpl(String address) {
+ setAddress(address);
+ }
+
+ public SocketAddress getAddress() {
+ return address;
+ }
+
+ @Override
+ protected InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ if (StringUtils.isBlank(address)) {
+ throw new IllegalArgumentException("address must not be empty");
+ }
+
+ this.address = parseAddress(address);
+ }
+
+}
diff --git a/src/main/java/org/tarantool/SocketChannelProvider.java b/src/main/java/org/tarantool/SocketChannelProvider.java
index 8ae52476..e4f26d79 100644
--- a/src/main/java/org/tarantool/SocketChannelProvider.java
+++ b/src/main/java/org/tarantool/SocketChannelProvider.java
@@ -3,6 +3,10 @@
import java.nio.channels.SocketChannel;
public interface SocketChannelProvider {
+
+ int RETRY_NO_LIMIT = -1;
+ int NO_TIMEOUT = 0;
+
/**
* Provides socket channel to init restore connection.
* You could change hosts on fail and sleep between retries in this method
diff --git a/src/main/java/org/tarantool/TarantoolClientConfig.java b/src/main/java/org/tarantool/TarantoolClientConfig.java
index a863304c..43f06746 100644
--- a/src/main/java/org/tarantool/TarantoolClientConfig.java
+++ b/src/main/java/org/tarantool/TarantoolClientConfig.java
@@ -3,47 +3,49 @@
public class TarantoolClientConfig {
/**
- * Username and password for authorization.
+ * Auth-related data.
*/
public String username;
public String password;
/**
- * Default ByteArrayOutputStream size when make query serialization.
+ * Default request size when make query serialization.
*/
public int defaultRequestSize = 4096;
/**
- * Initial size for map which holds futures of sent request.
+ * Initial capacity for the map which holds futures of sent request.
*/
public int predictedFutures = (int) ((1024 * 1024) / 0.75) + 1;
-
public int writerThreadPriority = Thread.NORM_PRIORITY;
-
public int readerThreadPriority = Thread.NORM_PRIORITY;
-
/**
- * shared buffer is place where client collect requests when socket is busy on write.
+ * Shared buffer size (place where client collects requests
+ * when socket is busy on write).
*/
public int sharedBufferSize = 8 * 1024 * 1024;
+
/**
- * not put request into the shared buffer if request size is ge directWriteFactor * sharedBufferSize.
+ * Factor to calculate a threshold whether request will be accommodated
+ * in the shared buffer.
+ *
+ * if request size exceeds directWriteFactor * sharedBufferSize
+ * request is sent directly.
*/
public double directWriteFactor = 0.5d;
/**
- * Use old call command https://github.com/tarantool/doc/issues/54,
- * please ensure that you server supports new call command.
+ * Use old call command https://github.com/tarantool/doc/issues/54,
+ * please ensure that you server supports new call command.
*/
public boolean useNewCall = false;
/**
- * Any blocking ops timeout.
+ * Limits for synchronous operations.
*/
public long initTimeoutMillis = 60 * 1000L;
-
public long writeTimeoutMillis = 60 * 1000L;
}
diff --git a/src/main/java/org/tarantool/TarantoolClientImpl.java b/src/main/java/org/tarantool/TarantoolClientImpl.java
index c1d40b79..12d8f676 100644
--- a/src/main/java/org/tarantool/TarantoolClientImpl.java
+++ b/src/main/java/org/tarantool/TarantoolClientImpl.java
@@ -25,6 +25,7 @@
import java.util.concurrent.locks.ReentrantLock;
public class TarantoolClientImpl extends TarantoolBase> implements TarantoolClient {
+
public static final CommunicationException NOT_INIT_EXCEPTION
= new CommunicationException("Not connected, initializing connection");
@@ -34,22 +35,23 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar
* External.
*/
protected SocketChannelProvider socketProvider;
+ protected SocketChannel channel;
+ protected ReadableViaSelectorChannel readChannel;
+
protected volatile Exception thumbstone;
protected Map> futures;
- protected AtomicInteger wait = new AtomicInteger();
+ protected AtomicInteger pendingResponsesCount = new AtomicInteger();
/**
* Write properties.
*/
- protected SocketChannel channel;
- protected ReadableViaSelectorChannel readChannel;
-
protected ByteBuffer sharedBuffer;
- protected ByteBuffer writerBuffer;
protected ReentrantLock bufferLock = new ReentrantLock(false);
protected Condition bufferNotEmpty = bufferLock.newCondition();
protected Condition bufferEmpty = bufferLock.newCondition();
+
+ protected ByteBuffer writerBuffer;
protected ReentrantLock writeLock = new ReentrantLock(true);
/**
@@ -81,6 +83,10 @@ public void run() {
}
});
+ public TarantoolClientImpl(String address, TarantoolClientConfig config) {
+ this(new SingleSocketChannelProviderImpl(address), config);
+ }
+
public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClientConfig config) {
super();
this.thumbstone = NOT_INIT_EXCEPTION;
@@ -102,6 +108,11 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient
this.fireAndForgetOps.setCallCode(Code.CALL);
this.composableAsyncOps.setCallCode(Code.CALL);
}
+
+ startConnector(config);
+ }
+
+ private void startConnector(TarantoolClientConfig config) {
connector.start();
try {
if (!waitAlive(config.initTimeoutMillis, TimeUnit.MILLISECONDS)) {
@@ -254,6 +265,7 @@ protected synchronized void die(String message, Exception cause) {
iterator.remove();
}
}
+ pendingResponsesCount.set(0);
bufferLock.lock();
try {
@@ -306,7 +318,7 @@ protected void sharedWrite(ByteBuffer buffer) throws InterruptedException, Timeo
}
}
sharedBuffer.put(buffer);
- wait.incrementAndGet();
+ pendingResponsesCount.incrementAndGet();
bufferNotEmpty.signalAll();
stats.buffered++;
} finally {
@@ -333,7 +345,7 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx
}
writeFully(channel, buffer);
stats.directWrite++;
- wait.incrementAndGet();
+ pendingResponsesCount.incrementAndGet();
} finally {
writeLock.unlock();
}
@@ -360,7 +372,7 @@ protected void readThread() {
Long syncId = (Long) headers.get(Key.SYNC.getId());
TarantoolOp> future = futures.remove(syncId);
stats.received++;
- wait.decrementAndGet();
+ pendingResponsesCount.decrementAndGet();
complete(packet, future);
} catch (Exception e) {
die("Cant read answer", e);
@@ -431,9 +443,9 @@ protected void completeSql(CompletableFuture> future, TarantoolPacket pack) {
}
}
- protected T syncGet(Future r) {
+ protected T syncGet(Future result) {
try {
- return r.get();
+ return result.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof CommunicationException) {
throw (CommunicationException) e.getCause();
@@ -464,7 +476,6 @@ public void close() {
protected void close(Exception e) {
if (state.close()) {
connector.interrupt();
-
die(e.getMessage(), e);
}
}
@@ -564,9 +575,11 @@ public List exec(Code code, Object... args) {
public void close() {
throw new IllegalStateException("You should close TarantoolClient instead.");
}
+
}
protected class FireAndForgetOps extends AbstractTarantoolOps, Object, Long> {
+
@Override
public Long exec(Code code, Object... args) {
if (thumbstone == null) {
@@ -586,6 +599,7 @@ public Long exec(Code code, Object... args) {
public void close() {
throw new IllegalStateException("You should close TarantoolClient instead.");
}
+
}
protected class ComposableAsyncOps
@@ -600,10 +614,11 @@ public CompletionStage> exec(Code code, Object... args) {
public void close() {
TarantoolClientImpl.this.close();
}
+
}
protected boolean isDead(CompletableFuture> q) {
- if (TarantoolClientImpl.this.thumbstone != null) {
+ if (this.thumbstone != null) {
fail(q, new CommunicationException("Connection is dead", thumbstone));
return true;
}
@@ -630,6 +645,7 @@ public TarantoolClientStats getStats() {
* Manages state changes.
*/
protected final class StateHelper {
+
static final int UNINITIALIZED = 0;
static final int READING = 1;
static final int WRITING = 2;
@@ -640,14 +656,14 @@ protected final class StateHelper {
private final AtomicInteger state;
private final AtomicReference nextAliveLatch =
- new AtomicReference<>(new CountDownLatch(1));
+ new AtomicReference<>(new CountDownLatch(1));
private final CountDownLatch closedLatch = new CountDownLatch(1);
/**
* The condition variable to signal a reconnection is needed from reader /
* writer threads and waiting for that signal from the reconnection thread.
- *
+ *
* The lock variable to access this condition.
*
* @see #awaitReconnection()
@@ -685,7 +701,7 @@ protected boolean close() {
/**
* Move from a current state to a give one.
- *
+ *
* Some moves are forbidden.
*/
protected boolean acquire(int mask) {
@@ -805,6 +821,7 @@ private void trySignalForReconnection() {
}
}
}
+
}
protected static class TarantoolOp extends CompletableFuture {
diff --git a/src/main/java/org/tarantool/TarantoolClusterClient.java b/src/main/java/org/tarantool/TarantoolClusterClient.java
index 4b200fcc..49fdb4af 100644
--- a/src/main/java/org/tarantool/TarantoolClusterClient.java
+++ b/src/main/java/org/tarantool/TarantoolClusterClient.java
@@ -1,13 +1,23 @@
package org.tarantool;
-import static org.tarantool.TarantoolClientImpl.StateHelper.CLOSED;
+import org.tarantool.cluster.TarantoolClusterDiscoverer;
+import org.tarantool.cluster.TarantoolClusterStoredFunctionDiscoverer;
+import org.tarantool.protocol.TarantoolPacket;
+import org.tarantool.util.StringUtils;
+import java.io.IOException;
+import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.StampedLock;
/**
* Basic implementation of a client that may work with the cluster
@@ -17,20 +27,32 @@
* unless the configured expiration time is over.
*/
public class TarantoolClusterClient extends TarantoolClientImpl {
- /* Need some execution context to retry writes. */
+
+ /**
+ * Need some execution context to retry writes.
+ */
private Executor executor;
- /* Collection of operations to be retried. */
- private ConcurrentHashMap> retries = new ConcurrentHashMap>();
+ /**
+ * Discovery activity.
+ */
+ private ScheduledExecutorService instancesDiscoveryExecutor;
+ private Runnable instancesDiscovererTask;
+ private StampedLock discoveryLock = new StampedLock();
+
+ /**
+ * Collection of operations to be retried.
+ */
+ private ConcurrentHashMap> retries = new ConcurrentHashMap<>();
/**
* Constructs a new cluster client.
*
- * @param config Configuration.
- * @param addrs Array of addresses in the form of [host]:[port].
+ * @param config Configuration.
+ * @param addresses Array of addresses in the form of host[:port].
*/
- public TarantoolClusterClient(TarantoolClusterClientConfig config, String... addrs) {
- this(config, new RoundRobinSocketProviderImpl(addrs).setTimeout(config.operationExpiryTimeMillis));
+ public TarantoolClusterClient(TarantoolClusterClientConfig config, String... addresses) {
+ this(config, makeClusterSocketProvider(addresses, config.operationExpiryTimeMillis));
}
/**
@@ -41,12 +63,33 @@ public TarantoolClusterClient(TarantoolClusterClientConfig config, String... add
*/
public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannelProvider provider) {
super(provider, config);
- this.executor = config.executor == null ? Executors.newSingleThreadExecutor() : config.executor;
+
+ this.executor = config.executor == null
+ ? Executors.newSingleThreadExecutor()
+ : config.executor;
+
+ if (StringUtils.isNotBlank(config.clusterDiscoveryEntryFunction)) {
+ this.instancesDiscovererTask =
+ createDiscoveryTask(new TarantoolClusterStoredFunctionDiscoverer(config, this));
+ this.instancesDiscoveryExecutor
+ = Executors.newSingleThreadScheduledExecutor(new TarantoolThreadDaemonFactory("tarantoolDiscoverer"));
+ int delay = config.clusterDiscoveryDelayMillis > 0
+ ? config.clusterDiscoveryDelayMillis
+ : TarantoolClusterClientConfig.DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS;
+
+ // todo: it's better to start a job later (out of ctor)
+ this.instancesDiscoveryExecutor.scheduleWithFixedDelay(
+ this.instancesDiscovererTask,
+ 0,
+ delay,
+ TimeUnit.MILLISECONDS
+ );
+ }
}
@Override
protected boolean isDead(CompletableFuture> q) {
- if ((state.getState() & CLOSED) != 0) {
+ if ((state.getState() & StateHelper.CLOSED) != 0) {
q.completeExceptionally(new CommunicationException("Connection is dead", thumbstone));
return true;
}
@@ -62,22 +105,41 @@ protected CompletableFuture> doExec(Code code, Object[] args) {
validateArgs(args);
long sid = syncId.incrementAndGet();
ExpirableOp> future = makeFuture(sid, code, args);
+ return registerOperation(future);
+ }
- if (isDead(future)) {
- return future;
- }
- futures.put(sid, future);
- if (isDead(future)) {
- futures.remove(sid);
- return future;
- }
+ /**
+ * Registers a new async operation which will be resolved later.
+ * Registration is discovery-aware in term of synchronization and
+ * it may be blocked util the discovery finishes its work.
+ *
+ * @param future operation to be performed
+ *
+ * @return registered operation
+ */
+ private CompletableFuture> registerOperation(ExpirableOp> future) {
+ long stamp = discoveryLock.readLock();
try {
- write(code, sid, null, args);
- } catch (Exception e) {
- futures.remove(sid);
- fail(future, e);
+ if (isDead(future)) {
+ return future;
+ }
+ futures.put(future.getId(), future);
+ if (isDead(future)) {
+ futures.remove(future.getId());
+ return future;
+ }
+
+ try {
+ write(future.getCode(), future.getId(), null, future.getArgs());
+ } catch (Exception e) {
+ futures.remove(future.getId());
+ fail(future, e);
+ }
+
+ return future;
+ } finally {
+ discoveryLock.unlock(stamp);
}
- return future;
}
@Override
@@ -101,6 +163,10 @@ protected boolean checkFail(CompletableFuture> q, Exception e) {
protected void close(Exception e) {
super.close(e);
+ if (instancesDiscoveryExecutor != null) {
+ instancesDiscoveryExecutor.shutdownNow();
+ }
+
if (retries == null) {
// May happen within constructor.
return;
@@ -135,27 +201,100 @@ protected void onReconnect() {
// First call is before the constructor finished. Skip it.
return;
}
- Collection> futuresToRetry = new ArrayList>(retries.values());
+ Collection> futuresToRetry = new ArrayList<>(retries.values());
retries.clear();
long now = System.currentTimeMillis();
for (final ExpirableOp> future : futuresToRetry) {
if (!future.hasExpired(now)) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- futures.put(future.getId(), future);
- try {
- write(future.getCode(), future.getId(), null, future.getArgs());
- } catch (Exception e) {
- futures.remove(future.getId());
- fail(future, e);
- }
- }
- });
+ executor.execute(() -> registerOperation(future));
}
}
}
+ @Override
+ protected void complete(TarantoolPacket packet, TarantoolOp> future) {
+ super.complete(packet, future);
+ RefreshableSocketProvider provider = getRefreshableSocketProvider();
+ if (provider != null) {
+ renewConnectionIfRequired(provider.getAddresses());
+ }
+ }
+
+ protected void onInstancesRefreshed(Set instances) {
+ RefreshableSocketProvider provider = getRefreshableSocketProvider();
+ if (provider != null) {
+ provider.refreshAddresses(instances);
+ renewConnectionIfRequired(provider.getAddresses());
+ }
+ }
+
+ private RefreshableSocketProvider getRefreshableSocketProvider() {
+ return socketProvider instanceof RefreshableSocketProvider
+ ? (RefreshableSocketProvider) socketProvider
+ : null;
+ }
+
+ private void renewConnectionIfRequired(Collection addresses) {
+ if (pendingResponsesCount.get() > 0 || !isAlive()) {
+ return;
+ }
+ SocketAddress addressInUse = getCurrentAddressOrNull();
+ if (!(addressInUse == null || addresses.contains(addressInUse))) {
+ long stamp = discoveryLock.tryWriteLock();
+ if (!discoveryLock.validate(stamp)) {
+ return;
+ }
+ try {
+ if (pendingResponsesCount.get() == 0) {
+ stopIO();
+ }
+ } finally {
+ discoveryLock.unlock(stamp);
+ }
+ }
+ }
+
+ private SocketAddress getCurrentAddressOrNull() {
+ try {
+ return channel.getRemoteAddress();
+ } catch (IOException ignored) {
+ return null;
+ }
+ }
+
+ public void refreshInstances() {
+ if (instancesDiscovererTask != null) {
+ instancesDiscovererTask.run();
+ }
+ }
+
+ private static RoundRobinSocketProviderImpl makeClusterSocketProvider(String[] addresses,
+ int connectionTimeout) {
+ RoundRobinSocketProviderImpl socketProvider = new RoundRobinSocketProviderImpl(addresses);
+ socketProvider.setTimeout(connectionTimeout);
+ return socketProvider;
+ }
+
+ private Runnable createDiscoveryTask(TarantoolClusterDiscoverer serviceDiscoverer) {
+ return new Runnable() {
+
+ private Set lastInstances;
+
+ @Override
+ public synchronized void run() {
+ try {
+ Set freshInstances = serviceDiscoverer.getInstances();
+ if (!(freshInstances.isEmpty() || Objects.equals(lastInstances, freshInstances))) {
+ lastInstances = freshInstances;
+ onInstancesRefreshed(lastInstances);
+ }
+ } catch (Exception ignored) {
+ // no-op
+ }
+ }
+ };
+ }
+
/**
* Holds operation code and arguments for retry.
*/
@@ -202,6 +341,6 @@ public long getId() {
public Object[] getArgs() {
return args;
}
-
}
+
}
diff --git a/src/main/java/org/tarantool/TarantoolClusterClientConfig.java b/src/main/java/org/tarantool/TarantoolClusterClientConfig.java
index 423896b3..81f67cbb 100644
--- a/src/main/java/org/tarantool/TarantoolClusterClientConfig.java
+++ b/src/main/java/org/tarantool/TarantoolClusterClientConfig.java
@@ -6,9 +6,30 @@
* Configuration for the {@link TarantoolClusterClient}.
*/
public class TarantoolClusterClientConfig extends TarantoolClientConfig {
- /* Amount of time (in milliseconds) the operation is eligible for retry. */
- public int operationExpiryTimeMillis = 500;
- /* Executor service that will be used as a thread of execution to retry writes. */
- public Executor executor = null;
+ public static final int DEFAULT_OPERATION_EXPIRY_TIME_MILLIS = 500;
+ public static final int DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS = 60_000;
+
+ /**
+ * Period for the operation is eligible for retry.
+ */
+ public int operationExpiryTimeMillis = DEFAULT_OPERATION_EXPIRY_TIME_MILLIS;
+
+ /**
+ * Executor that will be used as a thread of
+ * execution to retry writes.
+ */
+ public Executor executor;
+
+ /**
+ * Gets a name of the stored function to be used
+ * to fetch list of instances.
+ */
+ public String clusterDiscoveryEntryFunction;
+
+ /**
+ * Scan period for refreshing a new list of instances.
+ */
+ public int clusterDiscoveryDelayMillis = DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS;
+
}
diff --git a/src/main/java/org/tarantool/TarantoolThreadDaemonFactory.java b/src/main/java/org/tarantool/TarantoolThreadDaemonFactory.java
new file mode 100644
index 00000000..f1b4dfb5
--- /dev/null
+++ b/src/main/java/org/tarantool/TarantoolThreadDaemonFactory.java
@@ -0,0 +1,23 @@
+package org.tarantool;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TarantoolThreadDaemonFactory implements ThreadFactory {
+
+ private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String namePrefix;
+
+ public TarantoolThreadDaemonFactory(String namePrefix) {
+ this.namePrefix = namePrefix + "-" + POOL_NUMBER.incrementAndGet() + "-thread-";
+ }
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, namePrefix + threadNumber.incrementAndGet());
+ thread.setDaemon(true);
+
+ return thread;
+ }
+}
diff --git a/src/main/java/org/tarantool/cluster/IllegalDiscoveryFunctionResult.java b/src/main/java/org/tarantool/cluster/IllegalDiscoveryFunctionResult.java
new file mode 100644
index 00000000..41f8dd82
--- /dev/null
+++ b/src/main/java/org/tarantool/cluster/IllegalDiscoveryFunctionResult.java
@@ -0,0 +1,13 @@
+package org.tarantool.cluster;
+
+/**
+ * Raised when {@link TarantoolClusterStoredFunctionDiscoverer} validates
+ * a function result as unsupported.
+ */
+public class IllegalDiscoveryFunctionResult extends RuntimeException {
+
+ public IllegalDiscoveryFunctionResult(String message) {
+ super(message);
+ }
+
+}
diff --git a/src/main/java/org/tarantool/cluster/TarantoolClusterDiscoverer.java b/src/main/java/org/tarantool/cluster/TarantoolClusterDiscoverer.java
new file mode 100644
index 00000000..26e5da3c
--- /dev/null
+++ b/src/main/java/org/tarantool/cluster/TarantoolClusterDiscoverer.java
@@ -0,0 +1,21 @@
+package org.tarantool.cluster;
+
+import java.util.Set;
+
+/**
+ * Discovery strategy to obtain a list of the cluster nodes.
+ * This one can be used by {@link org.tarantool.RefreshableSocketProvider}
+ * to provide support for fault tolerance property.
+ *
+ * @see org.tarantool.RefreshableSocketProvider
+ */
+public interface TarantoolClusterDiscoverer {
+
+ /**
+ * Gets nodes addresses in host[:port] format.
+ *
+ * @return list of the cluster nodes
+ */
+ Set getInstances();
+
+}
diff --git a/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java b/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java
new file mode 100644
index 00000000..c25b578d
--- /dev/null
+++ b/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java
@@ -0,0 +1,63 @@
+package org.tarantool.cluster;
+
+import org.tarantool.TarantoolClient;
+import org.tarantool.TarantoolClientOps;
+import org.tarantool.TarantoolClusterClientConfig;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A cluster nodes discoverer based on calling a predefined function
+ * which returns list of nodes.
+ *
+ * The function has to have no arguments and return list of
+ * the strings which follow host[:port] format
+ */
+public class TarantoolClusterStoredFunctionDiscoverer implements TarantoolClusterDiscoverer {
+
+ private TarantoolClient client;
+ private String entryFunction;
+
+ public TarantoolClusterStoredFunctionDiscoverer(TarantoolClusterClientConfig clientConfig, TarantoolClient client) {
+ this.client = client;
+ this.entryFunction = clientConfig.clusterDiscoveryEntryFunction;
+ }
+
+ @Override
+ public Set getInstances() {
+ TarantoolClientOps, Object, List>> syncOperations = client.syncOps();
+
+ List> list = syncOperations.call(entryFunction);
+ // discoverer expects a single array result from the function now;
+ // in order to protect this contract the discoverer does a strict
+ // validation against the data returned;
+ // this strict-mode allows us to extend the contract in a non-breaking
+ // way for old clients just reserve an extra return value in
+ // terms of LUA multi-result support.
+ checkResult(list);
+
+ List