diff --git a/.travis.yml b/.travis.yml index 662fe9b1..5842eb9f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,7 @@ script: else mvn verify fi - - cat testroot/jdk-testing.log + - head -n -0 testroot/*.log after_success: - | diff --git a/pom.xml b/pom.xml index a8fc63a2..3c1a5c97 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,9 @@ + + false + org.jacoco diff --git a/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java b/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java new file mode 100644 index 00000000..d16c6bf4 --- /dev/null +++ b/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java @@ -0,0 +1,182 @@ +package org.tarantool; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.util.Arrays; + +/** + * Basic reconnection strategy that changes addresses in a round-robin fashion. + * To be used with {@link TarantoolClientImpl}. + */ +public class RoundRobinSocketProviderImpl implements SocketChannelProvider { + /** Timeout to establish socket connection with an individual server. */ + private int timeout; // 0 is infinite. + /** Limit of retries. */ + private int retriesLimit = -1; // No-limit. + /** Server addresses as configured. */ + private final String[] addrs; + /** Socket addresses. */ + private final InetSocketAddress[] sockAddrs; + /** Current position within {@link #sockAddrs} array. */ + private int pos; + + /** + * Constructs an instance. + * + * @param addrs Array of addresses in a form of [host]:[port]. + */ + public RoundRobinSocketProviderImpl(String... addrs) { + if (addrs == null || addrs.length == 0) + throw new IllegalArgumentException("addrs is null or empty."); + + this.addrs = Arrays.copyOf(addrs, addrs.length); + + sockAddrs = new InetSocketAddress[this.addrs.length]; + + for (int i = 0; i < this.addrs.length; i++) { + sockAddrs[i] = parseAddress(this.addrs[i]); + } + } + + /** + * @return Configured addresses in a form of [host]:[port]. + */ + public String[] getAddresses() { + return this.addrs; + } + + /** + * Sets maximum amount of time to wait for a socket connection establishment + * with an individual server. + * + * Zero means infinite timeout. + * + * @param timeout Timeout value, ms. + * @return {@code this}. + * @throws IllegalArgumentException If timeout is negative. + */ + public RoundRobinSocketProviderImpl setTimeout(int timeout) { + if (timeout < 0) + throw new IllegalArgumentException("timeout is negative."); + + this.timeout = timeout; + + return this; + } + + /** + * @return Maximum amount of time to wait for a socket connection establishment + * with an individual server. + */ + public int getTimeout() { + return timeout; + } + + /** + * Sets maximum amount of reconnect attempts to be made before an exception is raised. + * The retry count is maintained by a {@link #get(int, Throwable)} caller + * when a socket level connection was established. + * + * Negative value means unlimited. + * + * @param retriesLimit Limit of retries to use. + * @return {@code this}. + */ + public RoundRobinSocketProviderImpl setRetriesLimit(int retriesLimit) { + this.retriesLimit = retriesLimit; + + return this; + } + + /** + * @return Maximum reconnect attempts to make before raising exception. + */ + public int getRetriesLimit() { + return retriesLimit; + } + + /** {@inheritDoc} */ + @Override + public SocketChannel get(int retryNumber, Throwable lastError) { + if (areRetriesExhausted(retryNumber)) { + throw new CommunicationException("Connection retries exceeded.", lastError); + } + int attempts = getAddressCount(); + long deadline = System.currentTimeMillis() + timeout * attempts; + while (!Thread.currentThread().isInterrupted()) { + SocketChannel channel = null; + try { + channel = SocketChannel.open(); + InetSocketAddress addr = getNextSocketAddress(); + channel.socket().connect(addr, timeout); + return channel; + } catch (IOException e) { + if (channel != null) { + try { + channel.close(); + } catch (IOException ignored) { + // No-op. + } + } + long now = System.currentTimeMillis(); + if (deadline <= now) { + throw new CommunicationException("Connection time out.", e); + } + if (--attempts == 0) { + // Tried all addresses without any lack, but still have time. + attempts = getAddressCount(); + try { + Thread.sleep((deadline - now) / attempts); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + } + } + throw new CommunicationException("Thread interrupted.", new InterruptedException()); + } + + /** + * @return Number of configured addresses. + */ + protected int getAddressCount() { + return sockAddrs.length; + } + + /** + * @return Socket address to use for the next reconnection attempt. + */ + protected InetSocketAddress getNextSocketAddress() { + InetSocketAddress res = sockAddrs[pos]; + pos = (pos + 1) % sockAddrs.length; + return res; + } + + /** + * Parse a string address in the form of [host]:[port] + * and builds a socket address. + * + * @param addr Server address. + * @return Socket address. + */ + protected InetSocketAddress parseAddress(String addr) { + int idx = addr.indexOf(':'); + String host = (idx < 0) ? addr : addr.substring(0, idx); + int port = (idx < 0) ? 3301 : Integer.parseInt(addr.substring(idx + 1)); + return new InetSocketAddress(host, port); + } + + /** + * Provides a decision on whether retries limit is hit. + * + * @param retries Current count of retries. + * @return {@code true} if retries are exhausted. + */ + private boolean areRetriesExhausted(int retries) { + int limit = getRetriesLimit(); + if (limit < 0) + return false; + return retries >= limit; + } +} diff --git a/src/main/java/org/tarantool/TarantoolClientImpl.java b/src/main/java/org/tarantool/TarantoolClientImpl.java index 13553493..38435a8d 100644 --- a/src/main/java/org/tarantool/TarantoolClientImpl.java +++ b/src/main/java/org/tarantool/TarantoolClientImpl.java @@ -246,14 +246,15 @@ protected synchronized void die(String message, Exception cause) { if (thumbstone != null) { return; } - this.thumbstone = new CommunicationException(message, cause); + final CommunicationException err = new CommunicationException(message, cause); + this.thumbstone = err; while (!futures.isEmpty()) { Iterator>> iterator = futures.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry> elem = iterator.next(); if (elem != null) { FutureImpl future = elem.getValue(); - fail(future, cause); + fail(future, err); } iterator.remove(); } @@ -606,6 +607,14 @@ protected boolean isDead(FutureImpl q) { return false; } + /** + * A subclass may use this as a trigger to start retries. + * This method is called when state becomes ALIVE. + */ + protected void onReconnect() { + // No-op, override. + } + public Exception getThumbstone() { return thumbstone; } @@ -679,6 +688,7 @@ protected boolean compareAndSet(int expect, int update) { if (update == ALIVE) { CountDownLatch latch = nextAliveLatch.getAndSet(new CountDownLatch(1)); latch.countDown(); + onReconnect(); } else if (update == CLOSED) { closedLatch.countDown(); } @@ -706,7 +716,9 @@ private CountDownLatch getStateLatch(int state) { throw new IllegalStateException("State is CLOSED."); } CountDownLatch latch = nextAliveLatch.get(); - return (getState() == ALIVE) ? null : latch; + /* It may happen so that an error is detected but the state is still alive. + Wait for the 'next' alive state in such cases. */ + return (getState() == ALIVE && thumbstone == null) ? null : latch; } return null; } diff --git a/src/main/java/org/tarantool/TarantoolClusterClient.java b/src/main/java/org/tarantool/TarantoolClusterClient.java new file mode 100644 index 00000000..b0b67968 --- /dev/null +++ b/src/main/java/org/tarantool/TarantoolClusterClient.java @@ -0,0 +1,188 @@ +package org.tarantool; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.tarantool.TarantoolClientImpl.StateHelper.CLOSED; + +/** + * Basic implementation of a client that may work with the cluster + * of tarantool instances in fault-tolerant way. + * + * Failed operations will be retried once connection is re-established + * unless the configured expiration time is over. + */ +public class TarantoolClusterClient extends TarantoolClientImpl { + /* Need some execution context to retry writes. */ + private Executor executor; + + /* Collection of operations to be retried. */ + private ConcurrentHashMap> retries = new ConcurrentHashMap>(); + + /** + * @param config Configuration. + * @param addrs Array of addresses in the form of [host]:[port]. + */ + public TarantoolClusterClient(TarantoolClusterClientConfig config, String... addrs) { + this(config, new RoundRobinSocketProviderImpl(addrs).setTimeout(config.operationExpiryTimeMillis)); + } + + /** + * @param provider Socket channel provider. + * @param config Configuration. + */ + public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannelProvider provider) { + super(provider, config); + + this.executor = config.executor == null ? + Executors.newSingleThreadExecutor() : config.executor; + } + + @Override + protected boolean isDead(FutureImpl q) { + if ((state.getState() & CLOSED) != 0) { + q.setError(new CommunicationException("Connection is dead", thumbstone)); + return true; + } + Exception err = thumbstone; + if (err != null) { + return checkFail(q, err); + } + return false; + } + + @Override + public Future exec(Code code, Object... args) { + validateArgs(args); + FutureImpl q = makeFuture(syncId.incrementAndGet(), code, args); + if (isDead(q)) { + return q; + } + futures.put(q.getId(), q); + if (isDead(q)) { + futures.remove(q.getId()); + return q; + } + try { + write(code, q.getId(), null, args); + } catch (Exception e) { + futures.remove(q.getId()); + fail(q, e); + } + return q; + } + + @Override + protected void fail(FutureImpl q, Exception e) { + checkFail(q, e); + } + + protected boolean checkFail(FutureImpl q, Exception e) { + assert q instanceof ExpirableOp; + if (!isTransientError(e) || ((ExpirableOp)q).hasExpired(System.currentTimeMillis())) { + q.setError(e); + return true; + } else { + assert retries != null; + retries.put(q.id, (ExpirableOp)q); + return false; + } + } + + @Override + protected void close(Exception e) { + super.close(e); + + if (retries == null) { + // May happen within constructor. + return; + } + + for (ExpirableOp op : retries.values()) { + op.setError(e); + } + } + + protected boolean isTransientError(Exception e) { + if (e instanceof CommunicationException) { + return true; + } + if (e instanceof TarantoolException) { + return ((TarantoolException)e).isTransient(); + } + return false; + } + + protected FutureImpl makeFuture(long id, Code code, Object...args) { + return new ExpirableOp(id, + ((TarantoolClusterClientConfig)config).operationExpiryTimeMillis, + code, + args); + } + + /** + * Reconnect is over, schedule retries. + */ + @Override + protected void onReconnect() { + if (retries == null || executor == null) { + // First call is before the constructor finished. Skip it. + return; + } + Collection> futsToRetry = new ArrayList>(retries.values()); + retries.clear(); + long now = System.currentTimeMillis(); + for (final ExpirableOp fut : futsToRetry) { + if (!fut.hasExpired(now)) { + executor.execute(new Runnable() { + @Override + public void run() { + futures.put(fut.getId(), fut); + try { + write(fut.getCode(), fut.getId(), null, fut.getArgs()); + } catch (Exception e) { + futures.remove(fut.getId()); + fail(fut, e); + } + } + }); + } + } + } + + /** + * Holds operation code and arguments for retry. + */ + private class ExpirableOp extends FutureImpl { + /** Moment in time when operation is not considered for retry. */ + final private long deadline; + + /** Arguments of operation. */ + final private Object[] args; + + /** + * + * @param id Sync. + * @param expireTime Expiration time (relative) in ms. + * @param code Tarantool operation code. + * @param args Operation arguments. + */ + ExpirableOp(long id, int expireTime, Code code, Object...args) { + super(id, code); + this.deadline = System.currentTimeMillis() + expireTime; + this.args = args; + } + + boolean hasExpired(long now) { + return now > deadline; + } + + public Object[] getArgs() { + return args; + } + } +} diff --git a/src/main/java/org/tarantool/TarantoolClusterClientConfig.java b/src/main/java/org/tarantool/TarantoolClusterClientConfig.java new file mode 100644 index 00000000..423896b3 --- /dev/null +++ b/src/main/java/org/tarantool/TarantoolClusterClientConfig.java @@ -0,0 +1,14 @@ +package org.tarantool; + +import java.util.concurrent.Executor; + +/** + * Configuration for the {@link TarantoolClusterClient}. + */ +public class TarantoolClusterClientConfig extends TarantoolClientConfig { + /* Amount of time (in milliseconds) the operation is eligible for retry. */ + public int operationExpiryTimeMillis = 500; + + /* Executor service that will be used as a thread of execution to retry writes. */ + public Executor executor = null; +} diff --git a/src/main/java/org/tarantool/TarantoolException.java b/src/main/java/org/tarantool/TarantoolException.java index 5570e825..3778bccf 100644 --- a/src/main/java/org/tarantool/TarantoolException.java +++ b/src/main/java/org/tarantool/TarantoolException.java @@ -7,6 +7,11 @@ * @version $Id: $ */ public class TarantoolException extends RuntimeException { + /* taken from src/box/errcode.h */ + public final static int ERR_READONLY = 7; + public final static int ERR_TIMEOUT = 78; + public final static int ERR_LOADING = 116; + public final static int ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY = 128; private static final long serialVersionUID = 1L; long code; @@ -56,4 +61,19 @@ public TarantoolException(long code, String message) { } + /** + * + * @return {@code true} if retry can possibly help to overcome this error. + */ + boolean isTransient() { + switch ((int)code) { + case ERR_READONLY: + case ERR_TIMEOUT: + case ERR_LOADING: + case ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY: + return true; + default: + return false; + } + } } diff --git a/src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java b/src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java index cff735eb..b8ef5a50 100644 --- a/src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java +++ b/src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java @@ -18,6 +18,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.tarantool.TestUtils.makeInstanceEnv; + /** * Abstract test. Provides environment control and frequently used functions. */ @@ -28,6 +30,9 @@ public abstract class AbstractTarantoolConnectorIT { protected static final String username = System.getProperty("tntUser", "test_admin"); protected static final String password = System.getProperty("tntPass", "4pWBZmLEgkmKK5WP"); + protected static final String LUA_FILE = "jdk-testing.lua"; + protected static final int LISTEN = 3301; + protected static final int ADMIN = 3313; protected static final int TIMEOUT = 500; protected static final int RESTART_TIMEOUT = 2000; @@ -82,6 +87,7 @@ public abstract class AbstractTarantoolConnectorIT { @BeforeAll public static void setupEnv() { control = new TarantoolControl(); + control.createInstance("jdk-testing", LUA_FILE, makeInstanceEnv(LISTEN, ADMIN)); startTarantool("jdk-testing"); console = openConsole(); @@ -128,13 +134,23 @@ protected TarantoolClient makeClient() { return new TarantoolClientImpl(socketChannelProvider, makeClientConfig()); } - protected TarantoolClientConfig makeClientConfig() { - TarantoolClientConfig config = new TarantoolClientConfig(); + protected static TarantoolClientConfig makeClientConfig() { + return fillClientConfig(new TarantoolClientConfig()); + } + + protected static TarantoolClusterClientConfig makeClusterClientConfig() { + TarantoolClusterClientConfig config = fillClientConfig(new TarantoolClusterClientConfig()); + config.executor = null; + config.operationExpiryTimeMillis = TIMEOUT; + return config; + } + + private static T fillClientConfig(TarantoolClientConfig config) { config.username = username; config.password = password; config.initTimeoutMillis = RESTART_TIMEOUT; config.sharedBufferSize = 128; - return config; + return (T)config; } protected static TarantoolConsole openConsole() { diff --git a/src/test/java/org/tarantool/ClientReconnectClusterIT.java b/src/test/java/org/tarantool/ClientReconnectClusterIT.java new file mode 100644 index 00000000..35737a32 --- /dev/null +++ b/src/test/java/org/tarantool/ClientReconnectClusterIT.java @@ -0,0 +1,117 @@ +package org.tarantool; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.tarantool.AbstractTarantoolConnectorIT.makeClusterClientConfig; +import static org.tarantool.TestUtils.makeInstanceEnv; + +public class ClientReconnectClusterIT { + private static final int TIMEOUT = 500; + private static final String LUA_FILE = "jdk-testing.lua"; + private static final String SRV1 = "replica1"; + private static final String SRV2 = "replica2"; + private static final String SRV3 = "replica3"; + private static final int[] PORTS = {3302, 3303, 3304}; + private static final int[] CONSOLE_PORTS = {3312, 3313, 3314}; + private static TarantoolControl control; + + private static String REPLICATION_CONFIG = TestUtils.makeReplicationString( + AbstractTarantoolConnectorIT.username, + AbstractTarantoolConnectorIT.password, + "localhost:" + PORTS[0], + "localhost:" + PORTS[1], + "localhost:" + PORTS[2]); + + // Resume replication faster in case of temporary failure to fit TIMEOUT. + private static double REPLICATION_TIMEOUT = 0.1; + + @BeforeAll + public static void setupEnv() { + control = new TarantoolControl(); + int idx = 0; + for (String name: Arrays.asList(SRV1, SRV2, SRV3)) { + control.createInstance(name, LUA_FILE, + makeInstanceEnv(PORTS[idx], CONSOLE_PORTS[idx], REPLICATION_CONFIG, + REPLICATION_TIMEOUT)); + idx++; + } + } + + @AfterAll + public static void tearDownEnv() { + for (String name : Arrays.asList(SRV1, SRV2, SRV3)) { + control.stop(name); + /* + * Don't cleanup instance directory to allow further investigation + * of xlog / snap files in case of the test failure. + */ + } + } + + @Test + public void testRoundRobinReconnect() { + control.start(SRV1); + control.start(SRV2); + control.start(SRV3); + + control.waitStarted(SRV1); + control.waitStarted(SRV2); + control.waitStarted(SRV3); + + final TarantoolClientImpl client = makeClient( + "localhost:" + PORTS[0], + "127.0.0.1:" + PORTS[1], + "localhost:" + PORTS[2]); + + List ids = client.syncOps().eval( + "return box.schema.space.create('rr_test').id, " + + "box.space.rr_test:create_index('primary').id"); + + final int spaceId = ((Number)ids.get(0)).intValue(); + final int pkId = ((Number)ids.get(1)).intValue(); + + final List key = Collections.singletonList(1); + final List tuple = Arrays.asList(1, 1); + + client.syncOps().insert(spaceId, tuple); + control.waitReplication(SRV1, TIMEOUT); + + List res = client.syncOps().select(spaceId, pkId, key, 0, 1, Iterator.EQ); + assertEquals(res.get(0), tuple); + + control.stop(SRV1); + + res = client.syncOps().select(spaceId, pkId, key, 0, 1, Iterator.EQ); + assertEquals(res.get(0), Arrays.asList(1, 1)); + + control.stop(SRV2); + + res = client.syncOps().select(spaceId, pkId, key, 0, 1, Iterator.EQ); + assertEquals(res.get(0), Arrays.asList(1, 1)); + + control.stop(SRV3); + + CommunicationException e = assertThrows(CommunicationException.class, new Executable() { + @Override + public void execute() throws Throwable { + client.syncOps().select(spaceId, pkId, key, 0, 1, Iterator.EQ); + } + }); + + assertEquals("Connection time out.", e.getMessage()); + } + + private TarantoolClientImpl makeClient(String...addrs) { + TarantoolClusterClientConfig config = makeClusterClientConfig(); + return new TarantoolClusterClient(config, addrs); + } +} diff --git a/src/test/java/org/tarantool/TarantoolConsole.java b/src/test/java/org/tarantool/TarantoolConsole.java index 9ae1106b..f664f86b 100644 --- a/src/test/java/org/tarantool/TarantoolConsole.java +++ b/src/test/java/org/tarantool/TarantoolConsole.java @@ -122,13 +122,17 @@ public void exec(String expr) { } public T eval(String expr) { + List list = evalList(expr); + return list.get(0); + } + + public List evalList(String expr) { suppressPrompt(); write(expr); suppressEcho(expr); Matcher m = expect(REPLY_PATTERN); Yaml yaml = new Yaml(); - List result = yaml.load(m.group(0)); - return result.get(0); + return yaml.load(m.group(0)); } /** diff --git a/src/test/java/org/tarantool/TarantoolControl.java b/src/test/java/org/tarantool/TarantoolControl.java index 3b8f9261..38028eba 100644 --- a/src/test/java/org/tarantool/TarantoolControl.java +++ b/src/test/java/org/tarantool/TarantoolControl.java @@ -5,6 +5,7 @@ import java.nio.channels.FileChannel; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.Map; @@ -35,6 +36,8 @@ public class TarantoolControlException extends RuntimeException { protected static final String instanceDir = new File("src/test").getAbsolutePath(); protected static final String tarantoolCtlConfig = new File("src/test/.tarantoolctl").getAbsolutePath(); protected static final int RESTART_TIMEOUT = 2000; + // Per-instance environment. + protected final Map> instanceEnv = new HashMap>(); static { try { @@ -125,8 +128,7 @@ protected void executeCommand(String command, String instanceName) { ProcessBuilder builder = new ProcessBuilder("env", "tarantoolctl", command, instanceName); builder.directory(new File(tntCtlWorkDir)); Map env = builder.environment(); - env.put("PWD", tntCtlWorkDir); - env.put("TEST_WORKDIR", tntCtlWorkDir); + env.putAll(buildInstanceEnvironment(instanceName)); final Process process; try { @@ -137,7 +139,7 @@ protected void executeCommand(String command, String instanceName) { final CountDownLatch latch = new CountDownLatch(1); // The thread below is necessary to organize timed wait on the process. - // We cannot use Process.waitFor(long, TimeUnit) because we on java 6. + // We cannot use Process.waitFor(long, TimeUnit) because we're on java 6. Thread thread = new Thread(new Runnable() { @Override public void run() { @@ -188,9 +190,6 @@ public void run() { * * Then test the instance with TarantoolTcpConsole (ADMIN environment * variable is set) or TarantoolLocalConsole. - * - * XXX: Now TarantoolLocalConsole is used unconditionally, see - * openConsole(). */ public void waitStarted(String instanceName) { while (status(instanceName) != 0) @@ -244,13 +243,74 @@ public int status(String instanceName) { return 0; } + public Map buildInstanceEnvironment(String instanceName) { + Map env = new HashMap(); + env.put("PWD", tntCtlWorkDir); + env.put("TEST_WORKDIR", tntCtlWorkDir); + + Map instanceEnv = this.instanceEnv.get(instanceName); + if (instanceEnv != null) { + env.putAll(instanceEnv); + } + return env; + } + + public void createInstance(String instanceName, String luaFile, Map env) { + File src = new File(instanceDir, luaFile.endsWith(".lua") ? luaFile : luaFile.concat(".lua")); + if (!src.exists()) + throw new RuntimeException("Lua file " + src + " doesn't exist."); + + File dst = new File(tntCtlWorkDir, instanceName + ".lua"); + try { + copyFile(src, dst); + } catch (IOException e) { + throw new RuntimeException(e); + } + + instanceEnv.put(instanceName, env); + } + + public void cleanupInstance(String instanceName) { + instanceEnv.remove(instanceName); + + File dst = new File(tntCtlWorkDir, instanceName + ".lua"); + dst.delete(); + + try { + rmdir(new File(tntCtlWorkDir, instanceName)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void waitReplication(String instanceName, int timeout) { + TarantoolConsole console = openConsole(instanceName); + try { + TestUtils.waitReplication(console, timeout); + } finally { + console.close(); + } + } + /* - * XXX: This function is planned to use text console (from ADMIN - * environment variable) when it is available for the instance and - * fallback to TarantoolLocalConsole. + * Open a console to the instance. + * + * Use text console (from ADMIN environment variable) when it is available + * for the instance or fallback to TarantoolLocalConsole. */ public TarantoolConsole openConsole(String instanceName) { - return TarantoolConsole.open(tntCtlWorkDir, instanceName); + Map env = instanceEnv.get(instanceName); + if (env == null) + throw new RuntimeException("No such instance '" + instanceName +"'."); + + String admin = env.get("ADMIN"); + if (admin == null) { + return TarantoolConsole.open(tntCtlWorkDir, instanceName); + } else { + int idx = admin.indexOf(':'); + return TarantoolConsole.open(idx < 0 ? "localhost" : admin.substring(0, idx), + Integer.valueOf(idx < 0 ? admin : admin.substring(idx + 1))); + } } public static void sleep() { diff --git a/src/test/java/org/tarantool/TestUtils.java b/src/test/java/org/tarantool/TestUtils.java new file mode 100644 index 00000000..86a71dc0 --- /dev/null +++ b/src/test/java/org/tarantool/TestUtils.java @@ -0,0 +1,138 @@ +package org.tarantool; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestUtils { + final static String replicationInfoRequest = "return " + + "box.info.id, " + + "box.info.lsn, " + + "box.info.replication"; + + public static String makeReplicationString(String user, String pass, String... addrs) { + StringBuilder sb = new StringBuilder(); + for (int idx = 0; idx < addrs.length; idx++) { + if (sb.length() > 0) { + sb.append(';'); + } + sb.append(user); + sb.append(':'); + sb.append(pass); + sb.append('@'); + sb.append(addrs[idx]); + } + return sb.toString(); + } + + public static Map makeInstanceEnv(int port, int consolePort) { + Map env = new HashMap(); + env.put("LISTEN", Integer.toString(port)); + env.put("ADMIN", Integer.toString(consolePort)); + return env; + } + + public static Map makeInstanceEnv(int port, int consolePort, String replicationConfig, + double replicationTimeout) { + Map env = makeInstanceEnv(port, consolePort); + env.put("MASTER", replicationConfig); + env.put("REPLICATION_TIMEOUT", Double.toString(replicationTimeout)); + return env; + } + + /** + * Wait until all replicas will be in sync with master's log. + * + * It is useful to wait until the last data modification performed on + * **that** instance will be applied on its replicas. It does not take care + * to modifications performed on instances, which are master's of that one. + */ + public static void waitReplication(TarantoolClientImpl client, int timeout) { + long deadline = System.currentTimeMillis() + timeout; + for (;;) { + List v; + try { + v = client.syncOps().eval(replicationInfoRequest); + } catch (TarantoolException ignored) { + continue; + } + + if (parseAndCheckReplicationStatus(v)) + return; + + if (deadline < System.currentTimeMillis()) + throw new RuntimeException("Test failure: timeout waiting for replication."); + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + /** + * See waitReplication(TarantoolClientImpl client, int timeout). + */ + protected static void waitReplication(TarantoolConsole console, int timeout) { + long deadline = System.currentTimeMillis() + timeout; + for (;;) { + List v = console.evalList(replicationInfoRequest); + + if (parseAndCheckReplicationStatus(v)) + return; + + if (deadline < System.currentTimeMillis()) + throw new RuntimeException("Test failure: timeout waiting for replication."); + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private static boolean parseAndCheckReplicationStatus(List data) { + if (data == null || data.size() != 3) + throw new IllegalStateException("Unexpected format of replication status."); + + Number masterId = ensureType(Number.class, data.get(0)); + Number masterLsn = ensureType(Number.class, data.get(1)); + Map replInfo = ensureTypeOrNull(Map.class, data.get(2)); + + if (replInfo == null || replInfo.size() < 2) + return false; + + for (Object info : replInfo.values()) { + Map replItems = ensureTypeOrNull(Map.class, info); + + Map downstreamInfo = ensureTypeOrNull(Map.class, replItems.get("downstream")); + if (downstreamInfo != null) { + Map replica_vclock = ensureTypeOrNull(Map.class, downstreamInfo.get("vclock")); + + if (replica_vclock == null) + return false; + + Number replicaLsn = ensureTypeOrNull(Number.class, replica_vclock.get(masterId)); + + if (replicaLsn == null || replicaLsn.longValue() < masterLsn.longValue()) { + return false; + } + } + } + return true; + } + + private static T ensureTypeOrNull(Class cls, Object v) { + return v == null ? null : ensureType(cls, v); + } + + private static T ensureType(Class cls, Object v) { + if (v == null || !cls.isAssignableFrom(v.getClass())) { + throw new IllegalArgumentException(String.format("Wrong value type '%s', expected '%s'.", + v == null ? "null" : v.getClass().getName(), cls.getName())); + } + return cls.cast(v); + } +} diff --git a/src/test/java/org/tarantool/jdbc/AbstractJdbcIT.java b/src/test/java/org/tarantool/jdbc/AbstractJdbcIT.java index c0c00c07..83eaadd3 100644 --- a/src/test/java/org/tarantool/jdbc/AbstractJdbcIT.java +++ b/src/test/java/org/tarantool/jdbc/AbstractJdbcIT.java @@ -22,6 +22,7 @@ import java.util.List; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.tarantool.TestUtils.makeInstanceEnv; import org.tarantool.TarantoolControl; @@ -33,6 +34,10 @@ public abstract class AbstractJdbcIT { private static final String pass = System.getProperty("tntPass", "4pWBZmLEgkmKK5WP"); private static String URL = String.format("tarantool://%s:%d?user=%s&password=%s", host, port, user, pass); + protected static final String LUA_FILE = "jdk-testing.lua"; + protected static final int LISTEN = 3301; + protected static final int ADMIN = 3313; + private static String[] initSql = new String[] { "CREATE TABLE test(id INT PRIMARY KEY, val VARCHAR(100))", "INSERT INTO test VALUES (1, 'one'), (2, 'two'), (3, 'three')", @@ -116,6 +121,7 @@ public abstract class AbstractJdbcIT { @BeforeAll public static void setupEnv() throws Exception { control = new TarantoolControl(); + control.createInstance("jdk-testing", LUA_FILE, makeInstanceEnv(LISTEN, ADMIN)); control.start("jdk-testing"); control.waitStarted("jdk-testing"); diff --git a/src/test/jdk-testing.lua b/src/test/jdk-testing.lua index fa6615d9..b2eba40a 100644 --- a/src/test/jdk-testing.lua +++ b/src/test/jdk-testing.lua @@ -1,5 +1,7 @@ box.cfg { - listen = 3301, + listen = os.getenv('LISTEN') or 3301, + replication = os.getenv('MASTER') and string.split(os.getenv('MASTER'), ";") or nil, + replication_timeout = tonumber(os.getenv('REPLICATION_TIMEOUT')), } box.once('init', function() @@ -21,4 +23,4 @@ end) -- Java has no internal support for unix domain sockets, -- so we will use tcp for console communication. console = require('console') -console.listen(3313) +console.listen(os.getenv('ADMIN') or 3313)