Skip to content

Commit 868b377

Browse files
committed
Propagate client thumbstone when a client creation fails
Closes: #30
1 parent 08e37a2 commit 868b377

File tree

4 files changed

+176
-9
lines changed

4 files changed

+176
-9
lines changed

src/main/java/org/tarantool/TarantoolClientImpl.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828

2929
public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements TarantoolClient {
3030

31-
public static final CommunicationException NOT_INIT_EXCEPTION
32-
= new CommunicationException("Not connected, initializing connection");
33-
3431
protected TarantoolClientConfig config;
3532
protected long operationTimeout;
3633

@@ -101,7 +98,6 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient
10198
}
10299

103100
private void initClient(SocketChannelProvider socketProvider, TarantoolClientConfig config) {
104-
this.thumbstone = NOT_INIT_EXCEPTION;
105101
this.config = config;
106102
this.initialRequestSize = config.defaultRequestSize;
107103
this.operationTimeout = config.operationExpiryTimeMillis;
@@ -130,8 +126,8 @@ private void startConnector(long initTimeoutMillis) {
130126
CommunicationException e = new CommunicationException(
131127
initTimeoutMillis +
132128
"ms is exceeded when waiting for client initialization. " +
133-
"You could configure init timeout in TarantoolConfig"
134-
);
129+
"You could configure init timeout in TarantoolConfig",
130+
thumbstone);
135131

136132
close(e);
137133
throw e;
@@ -147,7 +143,7 @@ protected void reconnect(Throwable lastError) {
147143
int retryNumber = 0;
148144
while (!Thread.currentThread().isInterrupted()) {
149145
try {
150-
channel = socketProvider.get(retryNumber++, lastError == NOT_INIT_EXCEPTION ? null : lastError);
146+
channel = socketProvider.get(retryNumber++, lastError);
151147
} catch (Exception e) {
152148
closeChannel(channel);
153149
lastError = e;

src/test/java/org/tarantool/ClientReconnectClusterIT.java

+49
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import static org.junit.jupiter.api.Assertions.assertEquals;
44
import static org.junit.jupiter.api.Assertions.assertThrows;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
import static org.tarantool.TestUtils.findCause;
57
import static org.tarantool.TestUtils.makeDefaultClusterClientConfig;
68
import static org.tarantool.TestUtils.makeDiscoveryFunction;
79

@@ -393,6 +395,53 @@ void testDelayFunctionResultFetch() {
393395
expectDisconnected(client, spaceId, pkId);
394396
}
395397

398+
@Test
399+
void testRoundRobinSocketProviderRefused() {
400+
stopInstancesAndAwait(SRV1);
401+
stopInstancesAndAwait(SRV2);
402+
stopInstancesAndAwait(SRV3);
403+
404+
RuntimeException error = new RuntimeException("Fake error");
405+
TarantoolClusterClientConfig config = makeDefaultClusterClientConfig();
406+
config.initTimeoutMillis = 1000;
407+
Throwable exception = assertThrows(
408+
CommunicationException.class,
409+
() -> {
410+
new TarantoolClusterClient(
411+
config,
412+
TestUtils.wrapByErroredProvider(new RoundRobinSocketProviderImpl(
413+
"localhost:" + PORTS[0],
414+
"localhost:" + PORTS[1],
415+
"localhost:" + PORTS[2]
416+
), error)
417+
);
418+
}
419+
);
420+
assertTrue(findCause(exception, error));
421+
}
422+
423+
@Test
424+
void testRoundRobinSocketProviderRefusedAfterConnect() {
425+
final TarantoolClientImpl client = makeClusterClient(
426+
"localhost:" + PORTS[0],
427+
"localhost:" + PORTS[1],
428+
"localhost:" + PORTS[2]
429+
);
430+
431+
client.ping();
432+
stopInstancesAndAwait(SRV1);
433+
434+
client.ping();
435+
stopInstancesAndAwait(SRV2);
436+
437+
client.ping();
438+
stopInstancesAndAwait(SRV3);
439+
440+
CommunicationException exception = assertThrows(CommunicationException.class, client::ping);
441+
Throwable origin = exception.getCause();
442+
assertEquals(origin, client.getThumbstone());
443+
}
444+
396445
private void tryAwait(CyclicBarrier barrier) {
397446
try {
398447
barrier.await(6000, TimeUnit.MILLISECONDS);

src/test/java/org/tarantool/ClientReconnectIT.java

+77-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
88
import static org.junit.jupiter.api.Assertions.assertTrue;
99
import static org.junit.jupiter.api.Assertions.fail;
10+
import static org.tarantool.TestUtils.findCause;
1011
import static org.tarantool.TestUtils.makeDefaultClientConfig;
1112
import static org.tarantool.TestUtils.makeTestClient;
1213

@@ -27,6 +28,7 @@
2728
import java.util.concurrent.TimeUnit;
2829
import java.util.concurrent.atomic.AtomicBoolean;
2930
import java.util.concurrent.atomic.AtomicInteger;
31+
import java.util.concurrent.atomic.AtomicReference;
3032
import java.util.concurrent.atomic.AtomicReferenceArray;
3133
import java.util.concurrent.locks.LockSupport;
3234

@@ -412,13 +414,86 @@ public void testReconnectWrongAuth() throws Exception {
412414
client.close();
413415
}
414416

415-
private TestSocketChannelProvider makeZeroLingerProvider() {
417+
@Test
418+
void testFirstConnectionRefused() {
419+
RuntimeException error = new RuntimeException("Fake error");
420+
TarantoolClientConfig config = makeDefaultClientConfig();
421+
config.initTimeoutMillis = 100;
422+
Throwable exception = assertThrows(
423+
CommunicationException.class,
424+
() -> new TarantoolClientImpl(makeErroredProvider(error), config)
425+
);
426+
assertTrue(findCause(exception, error));
427+
}
428+
429+
@Test
430+
void testConnectionRefusedAfterConnect() {
431+
TarantoolClientImpl client = new TarantoolClientImpl(makeErroredProvider(null), makeDefaultClientConfig());
432+
client.ping();
433+
434+
testHelper.stopInstance();
435+
CommunicationException exception = assertThrows(CommunicationException.class, client::ping);
436+
437+
Throwable origin = exception.getCause();
438+
assertEquals(origin, client.getThumbstone());
439+
440+
testHelper.startInstance();
441+
}
442+
443+
@Test
444+
void testSingleSocketProviderRefused() {
445+
TarantoolClientConfig config = makeDefaultClientConfig();
446+
RuntimeException error = new RuntimeException("Fake error");
447+
config.initTimeoutMillis = 1000;
448+
449+
SingleSocketChannelProviderImpl socketProvider = new SingleSocketChannelProviderImpl("localhost:3301");
450+
451+
testHelper.stopInstance();
452+
Throwable exception = assertThrows(
453+
CommunicationException.class,
454+
() -> new TarantoolClientImpl(TestUtils.wrapByErroredProvider(socketProvider, error), config)
455+
);
456+
testHelper.startInstance();
457+
assertTrue(findCause(exception, error));
458+
}
459+
460+
@Test
461+
void testSingleSocketProviderRefusedAfterConnect() {
462+
TarantoolClientImpl client = new TarantoolClientImpl(socketChannelProvider, makeDefaultClientConfig());
463+
464+
client.ping();
465+
testHelper.stopInstance();
466+
467+
CommunicationException exception = assertThrows(CommunicationException.class, client::ping);
468+
Throwable origin = exception.getCause();
469+
assertEquals(origin, client.getThumbstone());
470+
471+
testHelper.startInstance();
472+
}
473+
474+
private SocketChannelProvider makeZeroLingerProvider() {
416475
return new TestSocketChannelProvider(
417476
TarantoolTestHelper.HOST, TarantoolTestHelper.PORT, RESTART_TIMEOUT
418477
).setSoLinger(0);
419478
}
420479

421-
TarantoolClient makeClient(SocketChannelProvider provider) {
480+
private SocketChannelProvider makeErroredProvider(RuntimeException error) {
481+
return new SocketChannelProvider() {
482+
private final SocketChannelProvider delegate = makeZeroLingerProvider();
483+
private AtomicReference<RuntimeException> errorReference = new AtomicReference<>(error);
484+
485+
@Override
486+
public SocketChannel get(int retryNumber, Throwable lastError) {
487+
RuntimeException rawError = errorReference.get();
488+
if (rawError != null) {
489+
throw rawError;
490+
}
491+
return delegate.get(retryNumber, lastError);
492+
}
493+
};
494+
}
495+
496+
private TarantoolClient makeClient(SocketChannelProvider provider) {
422497
return new TarantoolClientImpl(provider, makeDefaultClientConfig());
423498
}
424499

src/test/java/org/tarantool/TestUtils.java

+47
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.net.InetSocketAddress;
66
import java.net.Socket;
77
import java.net.SocketAddress;
8+
import java.nio.channels.SocketChannel;
89
import java.util.Collection;
910
import java.util.HashMap;
1011
import java.util.List;
@@ -290,4 +291,50 @@ public static TarantoolClientConfig makeDefaultClientConfig() {
290291
return config;
291292
}
292293

294+
/**
295+
* Wraps a socket channel provider
296+
* {@link SocketChannelProvider#get(int, Throwable)} method.
297+
* When an error is raised the wrapper substitutes
298+
* this error by the predefined one. The original value is
299+
* still accessible as a cause of the injected error.
300+
*
301+
* @param provider provider to be wrapped
302+
* @param error error to be thrown instead of original
303+
*
304+
* @return wrapped provider
305+
*/
306+
public static SocketChannelProvider wrapByErroredProvider(SocketChannelProvider provider, RuntimeException error) {
307+
return new SocketChannelProvider() {
308+
private final SocketChannelProvider delegate = provider;
309+
310+
@Override
311+
public SocketChannel get(int retryNumber, Throwable lastError) {
312+
try {
313+
return delegate.get(retryNumber, lastError);
314+
} catch (Exception e) {
315+
error.initCause(e);
316+
throw error;
317+
}
318+
}
319+
};
320+
}
321+
322+
/**
323+
* Searches recursively the given cause for a root error.
324+
*
325+
* @param error root error
326+
* @param cause cause to be found
327+
*
328+
* @return {@literal true} if cause is found within a cause chain
329+
*/
330+
public static boolean findCause(Throwable error, Throwable cause) {
331+
while (error.getCause() != null) {
332+
error = error.getCause();
333+
if (cause.equals(error)) {
334+
return true;
335+
}
336+
}
337+
return false;
338+
}
339+
293340
}

0 commit comments

Comments
 (0)