Skip to content

Use cluster Tarantool client instead of simple one #240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ SocketChannelProvider socketChannelProvider = new SingleSocketChannelProviderImp
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
```

You could implement your own `SocketChannelProvider`. It should return
You could implement your own `SocketChannelProvider`. It should return
a connected `SocketChannel`. Feel free to implement `get(int retryNumber, Throwable lastError)`
using your appropriate strategy to obtain the channel. The strategy can take into
account current attempt number (retryNumber) and the last transient error occurred on
Expand Down Expand Up @@ -177,7 +177,7 @@ Supported options are follow:
13. `retryCount` is a hint and can be passed to the socket providers which
implement `ConfigurableSocketChannelProvider` interface. This hint should be
interpreter as a maximal number of attempts to connect to Tarantool instance.
Default value is `3`.
Default value is `3`.
14. `operationExpiryTimeMillis` is a default request timeout in ms.
Default value is `1000` (1 second).

Expand Down Expand Up @@ -283,6 +283,31 @@ order in which they were added to the batch"
- The driver continues processing the remaining commands in a batch once execution
of a command fails.

### Connection Fail-over

To enable simple connection fail-over you can specify multiple nodes (host and port pairs) in the connection url. The
driver will try to once connect to each of them in order until the connection succeeds. If none succeed, a normal
connection exception is thrown.

The syntax for the connection url is:

jdbc:tarantool://[user-info@][nodes][?parameters]

where
* `user-info` is an optional colon separated username and password like `admin:secret`;
* `nodes` is a set of comma separated pairs like `host1[:port1][,host2[:port2] ... ]`;
* `parameters` is a set of optional cluster parameters (in addition to other ones) such as
`clusterDiscoveryEntryFunction` and `clusterDiscoveryDelayMillis` (see [Cluster support](#cluster-support) for more
details).

For instance,

jdbc:postgresql://tnt-node-1:3301,tnt-node2,tnt-node-3:3302?clusterDiscoveryEntryFunction=fetchNodes

will try to connect to the Tarantool servers using initial set of nodes in the order they were listed in the URL. Also,
there is `clusterDiscoveryEntryFunction` parameter specified to enable cluster nodes discovery that can refresh the list
of available nodes.

## Cluster support

To be more fault-tolerant the connector provides cluster extensions. In
Expand All @@ -307,7 +332,7 @@ connection to _one instance_ before failing an attempt. The provider requires
positive retry count to work properly. The socket timeout is used to limit
an interval between connections attempts per instance. In other words, the provider
follows a pattern _connection should succeed after N attempts with M interval between
them at max_.
them at max_.

### Basic cluster client usage

Expand All @@ -326,7 +351,7 @@ an initial list of nodes:
```java
String[] nodes = new String[] { "myHost1:3301", "myHost2:3302", "myHost3:3301" };
TarantoolClusterClient client = new TarantoolClusterClient(config, nodes);
```
```

3. Work with the client using same API as defined in `TarantoolClient`:

Expand All @@ -336,10 +361,10 @@ client.syncOps().insert(23, Arrays.asList(1, 1));

### Auto-discovery

Auto-discovery feature allows a cluster client to fetch addresses of
Auto-discovery feature allows a cluster client to fetch addresses of
cluster nodes to reflect changes related to the cluster topology. To achieve
this you have to create a Lua function on the server side which returns
a single array result. Client periodically polls the server to obtain a
this you have to create a Lua function on the server side which returns
a single array result. Client periodically polls the server to obtain a
fresh list and apply it if its content changes.

1. On the server side create a function which returns nodes:
Expand All @@ -356,20 +381,20 @@ You need to pay attention to a function contract we are currently supporting:
and an optional colon followed by digits of the port). Also, the port must be
in a range between 1 and 65535 if one is presented.
* A discovery function _may_ return multi-results but the client takes
into account only first of them (i.e. `return {'host:3301'}, discovery_delay`, where
into account only first of them (i.e. `return {'host:3301'}, discovery_delay`, where
the second result is unused). Even more, any extra results __are reserved__ by the client
in order to extend its contract with a backward compatibility.
* A discovery function _should NOT_ return no results, empty result, wrong type result,
and Lua errors. The client discards such kinds of results but it does not affect the discovery
process for next scheduled tasks.
process for next scheduled tasks.

2. On the client side configure discovery settings in `TarantoolClusterClientConfig`:

```java
TarantoolClusterClientConfig config = new TarantoolClusterClientConfig();
// fill other settings
config.clusterDiscoveryEntryFunction = "get_cluster_nodes"; // discovery function used to fetch nodes
config.clusterDiscoveryDelayMillis = 60_000; // how often client polls the discovery server
config.clusterDiscoveryEntryFunction = "get_cluster_nodes"; // discovery function used to fetch nodes
config.clusterDiscoveryDelayMillis = 60_000; // how often client polls the discovery server
```

3. Create a client using the config made above.
Expand All @@ -383,21 +408,21 @@ client.syncOps().insert(45, Arrays.asList(1, 1));

* You need to set _not empty_ value to `clusterDiscoveryEntryFunction` to enable auto-discovery feature.
* There are only two cases when a discovery task runs: just after initialization of the cluster
client and a periodical scheduler timeout defined in `TarantoolClusterClientConfig.clusterDiscoveryDelayMillis`.
client and a periodical scheduler timeout defined in `TarantoolClusterClientConfig.clusterDiscoveryDelayMillis`.
* A discovery task always uses an active client connection to get the nodes list.
It's in your responsibility to provide a function availability as well as a consistent
nodes list on all instances you initially set or obtain from the task.
* Every address which is unmatched with `host[:port]` pattern will be filtered out from
the target addresses list.
* If some error occurs while a discovery task is running then this task
will be aborted without any after-effects for next task executions. These cases, for instance, are
a wrong function result (see discovery function contract) or a broken connection.
will be aborted without any after-effects for next task executions. These cases, for instance, are
a wrong function result (see discovery function contract) or a broken connection.
There is an exception if the client is closed then discovery process will stop permanently.
* It's possible to obtain a list which does NOT contain the node we are currently
connected to. It leads the client to try to reconnect to another node from the
connected to. It leads the client to try to reconnect to another node from the
new list. It may take some time to graceful disconnect from the current node.
The client does its best to catch the moment when there are no pending responses
and perform a reconnection.
and perform a reconnection.

### Cluster client config options

Expand Down Expand Up @@ -425,7 +450,7 @@ directly via SLF4J interface.
The logging facade offers several ways in integrate its internal logging with foreign one in order:

* Using system property `org.tarantool.logging.provider`. Supported values are *jdk* and *slf4j*
for the java util logging and SLF4J/Logback respectively. For instance, use
for the java util logging and SLF4J/Logback respectively. For instance, use
`java -Dorg.tarantool.logging.provider=slf4j <...>`.

* Using Java SPI mechanism. Implement your own provider org.tarantool.logging.LoggerProvider
Expand All @@ -437,7 +462,7 @@ cat META-INF/services/org.tarantool.logging.LoggerProvider
org.mydomain.MySimpleLoggerProvider
```

* CLASSPATH exploring. Now, the connector will use SLF4J if Logback is also in use.
* CLASSPATH exploring. Now, the connector will use SLF4J if Logback is also in use.

* If nothing is successful JUL will be used by default.

Expand All @@ -452,10 +477,10 @@ org.mydomain.MySimpleLoggerProvider
## Building

To run unit tests use:

```bash
./mvnw clean test
```
```

To run integration tests use:

Expand Down
13 changes: 12 additions & 1 deletion src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,18 @@ public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider impl
* @throws IllegalArgumentException if addresses aren't provided
*/
public RoundRobinSocketProviderImpl(String... addresses) {
updateAddressList(Arrays.asList(addresses));
this(Arrays.asList(addresses));
}

/**
* Constructs an instance.
*
* @param addresses optional list of addresses in a form of host[:port]
*
* @throws IllegalArgumentException if addresses aren't provided
*/
public RoundRobinSocketProviderImpl(List<String> addresses) {
updateAddressList(addresses);
setRetriesLimit(DEFAULT_RETRIES_PER_CONNECTION);
}

Expand Down
14 changes: 13 additions & 1 deletion src/main/java/org/tarantool/TarantoolClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -55,6 +57,16 @@ public class TarantoolClusterClient extends TarantoolClientImpl {
* @param addresses Array of addresses in the form of host[:port].
*/
public TarantoolClusterClient(TarantoolClusterClientConfig config, String... addresses) {
this(config, makeClusterSocketProvider(Arrays.asList(addresses)));
}

/**
* Constructs a new cluster client.
*
* @param config Configuration.
* @param addresses List of addresses in the form of host[:port].
*/
public TarantoolClusterClient(TarantoolClusterClientConfig config, List<String> addresses) {
this(config, makeClusterSocketProvider(addresses));
}

Expand Down Expand Up @@ -270,7 +282,7 @@ public void refreshInstances() {
}
}

private static RoundRobinSocketProviderImpl makeClusterSocketProvider(String[] addresses) {
private static RoundRobinSocketProviderImpl makeClusterSocketProvider(List<String> addresses) {
return new RoundRobinSocketProviderImpl(addresses);
}

Expand Down
74 changes: 53 additions & 21 deletions src/main/java/org/tarantool/jdbc/SQLConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import org.tarantool.Key;
import org.tarantool.SocketChannelProvider;
import org.tarantool.SqlProtoUtils;
import org.tarantool.TarantoolClientConfig;
import org.tarantool.TarantoolClientImpl;
import org.tarantool.TarantoolClusterClient;
import org.tarantool.TarantoolClusterClientConfig;
import org.tarantool.protocol.TarantoolPacket;
import org.tarantool.util.JdbcConstants;
import org.tarantool.util.NodeSpec;
import org.tarantool.util.SQLStates;

import java.io.IOException;
Expand Down Expand Up @@ -43,6 +44,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Tarantool {@link Connection} implementation.
Expand All @@ -60,35 +62,65 @@ public class SQLConnection implements TarantoolConnection {
private DatabaseMetaData cachedMetadata;
private int resultSetHoldability = UNSET_HOLDABILITY;

public SQLConnection(String url, Properties properties) throws SQLException {
this.url = url;
this.properties = properties;
/**
* Creates a new connection to Tarantool server.
*
* @param originUrl raw URL string that was used to parse connection parameters
* @param properties extra parameters to configure a connection
*
* @deprecated use {@link #SQLConnection(String, List, Properties)} instead
*/
@Deprecated
public SQLConnection(String originUrl, Properties properties) throws SQLException {
this(originUrl, Collections.emptyList(), properties);
}

/**
* Creates a new connection to Tarantool server.
*
* @param originUrl raw URL string that was used to parse connection parameters
* @param nodes initial set of Tarantool nodes
* @param properties extra parameters to configure a connection
*
* @throws SQLException if any errors occur during the connecting
*/
public SQLConnection(String originUrl,
List<NodeSpec> nodes,
Properties properties) throws SQLException {
this.url = originUrl;
this.properties = properties;
try {
client = makeSqlClient(makeAddress(properties), makeConfigFromProperties(properties));
client = makeSqlClient(makeAddresses(nodes, properties), makeConfigFromProperties(properties));
} catch (Exception e) {
throw new SQLException("Couldn't initiate connection using " + SQLDriver.diagProperties(properties), e);
}
}

protected SQLTarantoolClientImpl makeSqlClient(String address, TarantoolClientConfig config) {
return new SQLTarantoolClientImpl(address, config);
protected SQLTarantoolClientImpl makeSqlClient(List<String> addresses, TarantoolClusterClientConfig config) {
return new SQLTarantoolClientImpl(addresses, config);
}

private String makeAddress(Properties properties) throws SQLException {
String host = SQLProperty.HOST.getString(properties);
int port = SQLProperty.PORT.getInt(properties);
return host + ":" + port;
private List<String> makeAddresses(List<NodeSpec> nodes, Properties properties) throws SQLException {
List<String> addresses = nodes.stream()
.map(NodeSpec::toString)
.collect(Collectors.toList());
if (addresses.isEmpty()) {
addresses.add(SQLProperty.HOST.getString(properties) + ":" + SQLProperty.PORT.getString(properties));
}
return addresses;
}

private TarantoolClientConfig makeConfigFromProperties(Properties properties) throws SQLException {
TarantoolClientConfig clientConfig = new TarantoolClientConfig();
private TarantoolClusterClientConfig makeConfigFromProperties(Properties properties) throws SQLException {
TarantoolClusterClientConfig clientConfig = new TarantoolClusterClientConfig();
clientConfig.username = SQLProperty.USER.getString(properties);
clientConfig.password = SQLProperty.PASSWORD.getString(properties);

clientConfig.operationExpiryTimeMillis = SQLProperty.QUERY_TIMEOUT.getInt(properties);
clientConfig.initTimeoutMillis = SQLProperty.LOGIN_TIMEOUT.getInt(properties);

clientConfig.clusterDiscoveryEntryFunction = SQLProperty.CLUSTER_DISCOVERY_ENTRY_FUNCTION.getString(properties);
clientConfig.clusterDiscoveryDelayMillis = SQLProperty.CLUSTER_DISCOVERY_DELAY_MILLIS.getInt(properties);

return clientConfig;
}

Expand Down Expand Up @@ -538,8 +570,8 @@ public SQLBatchResultHolder executeBatch(long timeout, List<SQLQueryHolder> quer
checkNotClosed();
SQLTarantoolClientImpl.SQLRawOps sqlOps = client.sqlRawOps();
SQLBatchResultHolder batchResult = useNetworkTimeout(timeout)
? sqlOps.executeBatch(queries)
: sqlOps.executeBatch(timeout, queries);
? sqlOps.executeBatch(queries)
: sqlOps.executeBatch(timeout, queries);

return batchResult;
}
Expand Down Expand Up @@ -731,7 +763,7 @@ private static String formatError(SQLQueryHolder query) {
return "Failed to execute SQL: " + query.getQuery() + ", params: " + query.getParams();
}

static class SQLTarantoolClientImpl extends TarantoolClientImpl {
static class SQLTarantoolClientImpl extends TarantoolClusterClient {

private Future<?> executeQuery(SQLQueryHolder queryHolder) {
return exec(Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams());
Expand Down Expand Up @@ -794,13 +826,13 @@ private SQLBatchResultHolder executeInternal(List<SQLQueryHolder> queries,
}
};

SQLTarantoolClientImpl(String address, TarantoolClientConfig config) {
super(address, config);
SQLTarantoolClientImpl(List<String> addresses, TarantoolClusterClientConfig config) {
super(config, addresses);
msgPackLite = SQLMsgPackLite.INSTANCE;
}

SQLTarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClientConfig config) {
super(socketProvider, config);
SQLTarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClusterClientConfig config) {
super(config, socketProvider);
msgPackLite = SQLMsgPackLite.INSTANCE;
}

Expand Down
Loading