Skip to content

Propagate client thumbstone when a client creation fails #221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected InetSocketAddress getLastObtainedAddress() {
@Override
protected SocketChannel makeAttempt(int retryNumber, Throwable lastError) throws IOException {
if (retryNumber > getAddressCount()) {
throwFatalError("No more connection addresses are left.");
throwFatalError("No more connection addresses are left.", lastError);
}

int retriesLimit = getRetriesLimit();
Expand All @@ -165,7 +165,7 @@ protected SocketChannel makeAttempt(int retryNumber, Throwable lastError) throws
@Override
public void setRetriesLimit(int retriesLimit) {
if (retriesLimit == 0) {
throwFatalError("Retries count should be at least 1 or more");
throwFatalError("Retries count should be at least 1 or more", null);
}
super.setRetriesLimit(retriesLimit);
}
Expand Down Expand Up @@ -212,8 +212,8 @@ public void refreshAddresses(Collection<String> addresses) {
updateAddressList(addresses);
}

private void throwFatalError(String message) {
throw new CommunicationException(message);
private void throwFatalError(String message, Throwable lastError) {
throw new CommunicationException(message, lastError);
}

}
10 changes: 3 additions & 7 deletions src/main/java/org/tarantool/TarantoolClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@

public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements TarantoolClient {

public static final CommunicationException NOT_INIT_EXCEPTION
= new CommunicationException("Not connected, initializing connection");

protected TarantoolClientConfig config;
protected long operationTimeout;

Expand Down Expand Up @@ -101,7 +98,6 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient
}

private void initClient(SocketChannelProvider socketProvider, TarantoolClientConfig config) {
this.thumbstone = NOT_INIT_EXCEPTION;
this.config = config;
this.initialRequestSize = config.defaultRequestSize;
this.operationTimeout = config.operationExpiryTimeMillis;
Expand Down Expand Up @@ -130,8 +126,8 @@ private void startConnector(long initTimeoutMillis) {
CommunicationException e = new CommunicationException(
initTimeoutMillis +
"ms is exceeded when waiting for client initialization. " +
"You could configure init timeout in TarantoolConfig"
);
"You could configure init timeout in TarantoolConfig",
thumbstone);

close(e);
throw e;
Expand All @@ -147,7 +143,7 @@ protected void reconnect(Throwable lastError) {
int retryNumber = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
channel = socketProvider.get(retryNumber++, lastError == NOT_INIT_EXCEPTION ? null : lastError);
channel = socketProvider.get(retryNumber++, lastError);
} catch (Exception e) {
closeChannel(channel);
lastError = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ public int read(ByteBuffer buffer) throws IOException {
count = n = channel.read(buffer);

if (n < 0) {
throw new CommunicationException("Channel read failed " + n);
throw new CommunicationException("Channel read failed " + formatReadBytes(n));
}

while (buffer.remaining() > 0) {
selector.select();
n = channel.read(buffer);
if (n < 0) {
throw new CommunicationException("Channel read failed: " + n);
throw new CommunicationException("Channel read failed: " + formatReadBytes(n));
}
count += n;
}
Expand All @@ -61,4 +61,15 @@ public void close() throws IOException {
selector.close();
channel.close();
}

/**
* Formats the bytes count to a human readable message.
*
* @param bytes number of bytes
*
* @return formatted message
*/
private String formatReadBytes(int bytes) {
return bytes < 0 ? "EOF" : bytes + " bytes";
}
}
70 changes: 70 additions & 0 deletions src/test/java/org/tarantool/ClientReconnectClusterIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.tarantool.TestUtils.findCause;
import static org.tarantool.TestUtils.makeDefaultClusterClientConfig;
import static org.tarantool.TestUtils.makeDiscoveryFunction;

Expand All @@ -13,6 +15,7 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.net.ConnectException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -393,6 +396,73 @@ void testDelayFunctionResultFetch() {
expectDisconnected(client, spaceId, pkId);
}

@Test
void testRoundRobinSocketProviderRefusedByFakeReason() {
stopInstancesAndAwait(SRV1);
stopInstancesAndAwait(SRV2);
stopInstancesAndAwait(SRV3);

RuntimeException error = new RuntimeException("Fake error");
TarantoolClusterClientConfig config = makeDefaultClusterClientConfig();
config.initTimeoutMillis = 1000;
Throwable exception = assertThrows(
CommunicationException.class,
() -> {
new TarantoolClusterClient(
config,
TestUtils.wrapByErroredProvider(new RoundRobinSocketProviderImpl(
"localhost:" + PORTS[0],
"localhost:" + PORTS[1],
"localhost:" + PORTS[2]
), error)
);
}
);
assertTrue(findCause(exception, error));
}

@Test
void testRoundRobinSocketProviderRefused() {
stopInstancesAndAwait(SRV1);
stopInstancesAndAwait(SRV2);
stopInstancesAndAwait(SRV3);

TarantoolClusterClientConfig config = makeDefaultClusterClientConfig();
config.initTimeoutMillis = 1000;
Throwable exception = assertThrows(
CommunicationException.class,
() -> {
new TarantoolClusterClient(
config,
new RoundRobinSocketProviderImpl("localhost:" + PORTS[0])
);
}
);
assertTrue(findCause(exception, ConnectException.class));
}

@Test
void testRoundRobinSocketProviderRefusedAfterConnect() {
final TarantoolClientImpl client = makeClusterClient(
"localhost:" + PORTS[0],
"localhost:" + PORTS[1],
"localhost:" + PORTS[2]
);

client.ping();
stopInstancesAndAwait(SRV1);

client.ping();
stopInstancesAndAwait(SRV2);

client.ping();
stopInstancesAndAwait(SRV3);

CommunicationException exception = assertThrows(CommunicationException.class, client::ping);
Throwable origin = exception.getCause();
assertEquals(origin, client.getThumbstone());
}

private void tryAwait(CyclicBarrier barrier) {
try {
barrier.await(6000, TimeUnit.MILLISECONDS);
Expand Down
99 changes: 96 additions & 3 deletions src/test/java/org/tarantool/ClientReconnectIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.tarantool.TestUtils.findCause;
import static org.tarantool.TestUtils.makeDefaultClientConfig;
import static org.tarantool.TestUtils.makeTestClient;

Expand All @@ -16,6 +17,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;

import java.net.ConnectException;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.Collections;
Expand All @@ -27,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.LockSupport;

Expand Down Expand Up @@ -377,7 +380,7 @@ public void run() {
/**
* Verify that we don't exceed a file descriptor limit (and so likely don't
* leak file descriptors) when trying to connect to an existing node with
* wrong authentification credentials.
* wrong authentication credentials.
* <p>
* The test sets SO_LINGER to 0 for outgoing connections to avoid producing
* many TIME_WAIT sockets, because an available port range can be
Expand Down Expand Up @@ -412,13 +415,103 @@ public void testReconnectWrongAuth() throws Exception {
client.close();
}

private TestSocketChannelProvider makeZeroLingerProvider() {
@Test
void testFirstConnectionRefused() {
RuntimeException error = new RuntimeException("Fake error");
TarantoolClientConfig config = makeDefaultClientConfig();
config.initTimeoutMillis = 100;
Throwable exception = assertThrows(
CommunicationException.class,
() -> new TarantoolClientImpl(makeErroredProvider(error), config)
);
assertTrue(findCause(exception, error));
}

@Test
void testConnectionRefusedAfterConnect() {
TarantoolClientImpl client = new TarantoolClientImpl(makeErroredProvider(null), makeDefaultClientConfig());
client.ping();

testHelper.stopInstance();
CommunicationException exception = assertThrows(CommunicationException.class, client::ping);

Throwable origin = exception.getCause();
assertEquals(origin, client.getThumbstone());

testHelper.startInstance();
}

@Test
void testSocketProviderRefusedByFakeReason() {
TarantoolClientConfig config = makeDefaultClientConfig();
RuntimeException error = new RuntimeException("Fake error");
config.initTimeoutMillis = 1000;

SingleSocketChannelProviderImpl socketProvider = new SingleSocketChannelProviderImpl("localhost:3301");

testHelper.stopInstance();
Throwable exception = assertThrows(
CommunicationException.class,
() -> new TarantoolClientImpl(TestUtils.wrapByErroredProvider(socketProvider, error), config)
);
testHelper.startInstance();
assertTrue(findCause(exception, error));
}

@Test
void testSingleSocketProviderRefused() {
testHelper.stopInstance();

TarantoolClientConfig config = makeDefaultClientConfig();
config.initTimeoutMillis = 1000;

SingleSocketChannelProviderImpl socketProvider = new SingleSocketChannelProviderImpl("localhost:3301");

Throwable exception = assertThrows(
CommunicationException.class,
() -> new TarantoolClientImpl(socketProvider, config)
);
testHelper.startInstance();
assertTrue(findCause(exception, ConnectException.class));
}

@Test
void testSingleSocketProviderRefusedAfterConnect() {
TarantoolClientImpl client = new TarantoolClientImpl(socketChannelProvider, makeDefaultClientConfig());

client.ping();
testHelper.stopInstance();

CommunicationException exception = assertThrows(CommunicationException.class, client::ping);
Throwable origin = exception.getCause();
assertEquals(origin, client.getThumbstone());

testHelper.startInstance();
}

private SocketChannelProvider makeZeroLingerProvider() {
return new TestSocketChannelProvider(
TarantoolTestHelper.HOST, TarantoolTestHelper.PORT, RESTART_TIMEOUT
).setSoLinger(0);
}

TarantoolClient makeClient(SocketChannelProvider provider) {
private SocketChannelProvider makeErroredProvider(RuntimeException error) {
return new SocketChannelProvider() {
private final SocketChannelProvider delegate = makeZeroLingerProvider();
private AtomicReference<RuntimeException> errorReference = new AtomicReference<>(error);

@Override
public SocketChannel get(int retryNumber, Throwable lastError) {
RuntimeException rawError = errorReference.get();
if (rawError != null) {
throw rawError;
}
return delegate.get(retryNumber, lastError);
}
};
}

private TarantoolClient makeClient(SocketChannelProvider provider) {
return new TarantoolClientImpl(provider, makeDefaultClientConfig());
}

Expand Down
Loading