Skip to content

Commit

Permalink
Remove support for maxRetryTimeout from low-level REST client
Browse files Browse the repository at this point in the history
We have had various reports of problems caused by the maxRetryTimeout
setting in the low-level REST client. Such setting was initially added
in the attempts to not have requests go through retries if the request
already took longer than the provided timeout.

The implementation was problematic though as such timeout would also
expire in the first request attempt (see elastic#31834), would leave the
request executing after expiration causing memory leaks (see elastic#33342),
and would not take into account the http client internal queuing (see

Given all these issues, my conclusion is that this custom timeout
mechanism gives little benefits while causing a lot of harm. We should
rather rely on connect and socket timeout exposed by the underlying
http client and accept that a request can overall take longer than the
configured timeout, which is the case even with a single retry anyways.

This commit removes the maxRetryTimeout setting and all of its usages.
  • Loading branch information
javanna committed Jan 31, 2019
1 parent cde126d commit 5059a0b
Show file tree
Hide file tree
Showing 17 changed files with 30 additions and 118 deletions.
38 changes: 7 additions & 31 deletions client/rest/src/main/java/org/elasticsearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -103,7 +102,6 @@ public class RestClient implements Closeable {
// We don't rely on default headers supported by HttpAsyncClient as those cannot be replaced.
// These are package private for tests.
final List<Header> defaultHeaders;
private final long maxRetryTimeoutMillis;
private final String pathPrefix;
private final AtomicInteger lastNodeIndex = new AtomicInteger(0);
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();
Expand All @@ -112,10 +110,9 @@ public class RestClient implements Closeable {
private volatile NodeTuple<List<Node>> nodeTuple;
private final WarningsHandler warningsHandler;

RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders, List<Node> nodes, String pathPrefix,
RestClient(CloseableHttpAsyncClient client, Header[] defaultHeaders, List<Node> nodes, String pathPrefix,
FailureListener failureListener, NodeSelector nodeSelector, boolean strictDeprecationMode) {
this.client = client;
this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
this.failureListener = failureListener;
this.pathPrefix = pathPrefix;
Expand Down Expand Up @@ -213,7 +210,7 @@ public List<Node> getNodes() {
* @throws ResponseException in case Elasticsearch responded with a status code that indicated an error
*/
public Response performRequest(Request request) throws IOException {
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
SyncResponseListener listener = new SyncResponseListener();
performRequestAsyncNoCatch(request, listener);
return listener.get();
}
Expand Down Expand Up @@ -335,19 +332,10 @@ public void failed(Exception failure) {

private void retryIfPossible(Exception exception) {
if (nodeTuple.nodes.hasNext()) {
//in case we are retrying, check whether maxRetryTimeout has been reached
long timeElapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
long timeout = maxRetryTimeoutMillis - timeElapsedMillis;
if (timeout <= 0) {
IOException retryTimeoutException = new IOException(
"request retries exceeded max retry timeout [" + maxRetryTimeoutMillis + "]", exception);
listener.onDefinitiveFailure(retryTimeoutException);
} else {
listener.trackFailure(exception);
request.reset();
performRequestAsync(startTime, nodeTuple, request, ignoreErrorCodes,
thisWarningsHandler, httpAsyncResponseConsumerFactory, listener);
}
listener.trackFailure(exception);
request.reset();
performRequestAsync(startTime, nodeTuple, request, ignoreErrorCodes,
thisWarningsHandler, httpAsyncResponseConsumerFactory, listener);
} else {
listener.onDefinitiveFailure(exception);
}
Expand Down Expand Up @@ -630,13 +618,6 @@ static class SyncResponseListener implements ResponseListener {
private final AtomicReference<Response> response = new AtomicReference<>();
private final AtomicReference<Exception> exception = new AtomicReference<>();

private final long timeout;

SyncResponseListener(long timeout) {
assert timeout > 0;
this.timeout = timeout;
}

@Override
public void onSuccess(Response response) {
Objects.requireNonNull(response, "response must not be null");
Expand All @@ -663,15 +644,10 @@ public void onFailure(Exception exception) {
*/
Response get() throws IOException {
try {
//providing timeout is just a safety measure to prevent everlasting waits
//the different client timeouts should already do their jobs
if (latch.await(timeout, TimeUnit.MILLISECONDS) == false) {
throw new IOException("listener timeout after waiting for [" + timeout + "] ms");
}
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException("thread waiting for the response was interrupted", e);
}

Exception exception = this.exception.get();
Response response = this.response.get();
if (exception != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@
public final class RestClientBuilder {
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000;
public static final int DEFAULT_MAX_RETRY_TIMEOUT_MILLIS = DEFAULT_SOCKET_TIMEOUT_MILLIS;
public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10;
public static final int DEFAULT_MAX_CONN_TOTAL = 30;

private static final Header[] EMPTY_HEADERS = new Header[0];

private final List<Node> nodes;
private int maxRetryTimeout = DEFAULT_MAX_RETRY_TIMEOUT_MILLIS;
private Header[] defaultHeaders = EMPTY_HEADERS;
private RestClient.FailureListener failureListener;
private HttpClientConfigCallback httpClientConfigCallback;
Expand Down Expand Up @@ -102,20 +100,6 @@ public RestClientBuilder setFailureListener(RestClient.FailureListener failureLi
return this;
}

/**
* Sets the maximum timeout (in milliseconds) to honour in case of multiple retries of the same request.
* {@link #DEFAULT_MAX_RETRY_TIMEOUT_MILLIS} if not specified.
*
* @throws IllegalArgumentException if {@code maxRetryTimeoutMillis} is not greater than 0
*/
public RestClientBuilder setMaxRetryTimeoutMillis(int maxRetryTimeoutMillis) {
if (maxRetryTimeoutMillis <= 0) {
throw new IllegalArgumentException("maxRetryTimeoutMillis must be greater than 0");
}
this.maxRetryTimeout = maxRetryTimeoutMillis;
return this;
}

/**
* Sets the {@link HttpClientConfigCallback} to be used to customize http client configuration
*
Expand Down Expand Up @@ -208,7 +192,7 @@ public CloseableHttpAsyncClient run() {
return createHttpClient();
}
});
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes,
RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
httpClient.start();
return restClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,6 @@ public void testBuild() throws IOException {
assertNotNull(restClient);
}

try {
RestClient.builder(new HttpHost("localhost", 9200))
.setMaxRetryTimeoutMillis(randomIntBetween(Integer.MIN_VALUE, 0));
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals("maxRetryTimeoutMillis must be greater than 0", e.getMessage());
}

try {
RestClient.builder(new HttpHost("localhost", 9200)).setDefaultHeaders(null);
fail("should have failed");
Expand Down Expand Up @@ -156,12 +148,9 @@ public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder reques
builder.setDefaultHeaders(headers);
}
if (randomBoolean()) {
builder.setMaxRetryTimeoutMillis(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
String pathPrefix = (randomBoolean() ? "/" : "") + randomAsciiOfLengthBetween(2, 5);
String pathPrefix = (randomBoolean() ? "/" : "") + randomAsciiLettersOfLengthBetween(2, 5);
while (pathPrefix.length() < 20 && randomBoolean()) {
pathPrefix += "/" + randomAsciiOfLengthBetween(3, 6);
pathPrefix += "/" + randomAsciiLettersOfLengthBetween(3, 6);
}
builder.setPathPrefix(pathPrefix + (randomBoolean() ? "/" : ""));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void run() {
}
nodes = Collections.unmodifiableList(nodes);
failureListener = new HostsTrackingFailureListener();
return new RestClient(httpClient, 10000, new Header[0], nodes, null, failureListener, nodeSelector, false);
return new RestClient(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void run() {
node = new Node(new HttpHost("localhost", 9200));
failureListener = new HostsTrackingFailureListener();
strictDeprecationMode = randomBoolean();
restClient = new RestClient(httpClient, 10000, defaultHeaders,
restClient = new RestClient(httpClient, defaultHeaders,
singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class RestClientTests extends RestClientTestCase {
public void testCloseIsIdempotent() throws IOException {
List<Node> nodes = singletonList(new Node(new HttpHost("localhost", 9200)));
CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class);
RestClient restClient = new RestClient(closeableHttpAsyncClient, 1_000, new Header[0], nodes, null, null, null, false);
RestClient restClient = new RestClient(closeableHttpAsyncClient, new Header[0], nodes, null, null, null, false);
restClient.close();
verify(closeableHttpAsyncClient, times(1)).close();
restClient.close();
Expand Down Expand Up @@ -353,8 +353,7 @@ private static String assertSelectAllRejected( NodeTuple<List<Node>> nodeTuple,

private static RestClient createRestClient() {
List<Node> nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200)));
return new RestClient(mock(CloseableHttpAsyncClient.class), randomLongBetween(1_000, 30_000),
new Header[] {}, nodes, null, null, null, false);
return new RestClient(mock(CloseableHttpAsyncClient.class), new Header[] {}, nodes, null, null, null, false);
}

public void testRoundRobin() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
import org.apache.http.message.BasicRequestLine;
import org.apache.http.message.BasicStatusLine;

import javax.net.ssl.SSLHandshakeException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.SocketTimeoutException;
import java.net.URISyntaxException;
import javax.net.ssl.SSLHandshakeException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -69,7 +69,7 @@ static void assertExceptionStackContainsCallingMethod(Exception e) {
}

public void testOnSuccessNullResponse() {
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener();
try {
syncResponseListener.onSuccess(null);
fail("onSuccess should have failed");
Expand All @@ -79,7 +79,7 @@ public void testOnSuccessNullResponse() {
}

public void testOnFailureNullException() {
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener();
try {
syncResponseListener.onFailure(null);
fail("onFailure should have failed");
Expand All @@ -89,7 +89,7 @@ public void testOnFailureNullException() {
}

public void testOnSuccess() throws Exception {
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener();
Response mockResponse = mockResponse();
syncResponseListener.onSuccess(mockResponse);
Response response = syncResponseListener.get();
Expand All @@ -106,7 +106,7 @@ public void testOnSuccess() throws Exception {
}

public void testOnFailure() throws Exception {
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener();
RuntimeException firstException = new RuntimeException("first-test");
syncResponseListener.onFailure(firstException);
try {
Expand Down Expand Up @@ -145,7 +145,7 @@ public void testOnFailure() throws Exception {
}

public void testRuntimeIsBuiltCorrectly() throws Exception {
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener();
RuntimeException runtimeException = new RuntimeException();
syncResponseListener.onFailure(runtimeException);
try {
Expand All @@ -162,7 +162,7 @@ public void testRuntimeIsBuiltCorrectly() throws Exception {
}

public void testConnectTimeoutExceptionIsBuiltCorrectly() throws Exception {
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener();
ConnectTimeoutException timeoutException = new ConnectTimeoutException();
syncResponseListener.onFailure(timeoutException);
try {
Expand All @@ -179,7 +179,7 @@ public void testConnectTimeoutExceptionIsBuiltCorrectly() throws Exception {
}

public void testSocketTimeoutExceptionIsBuiltCorrectly() throws Exception {
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener();
SocketTimeoutException timeoutException = new SocketTimeoutException();
syncResponseListener.onFailure(timeoutException);
try {
Expand All @@ -196,7 +196,7 @@ public void testSocketTimeoutExceptionIsBuiltCorrectly() throws Exception {
}

public void testConnectionClosedExceptionIsWrapped() throws Exception {
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener();
ConnectionClosedException closedException = new ConnectionClosedException(randomAsciiAlphanumOfLength(5));
syncResponseListener.onFailure(closedException);
try {
Expand All @@ -213,7 +213,7 @@ public void testConnectionClosedExceptionIsWrapped() throws Exception {
}

public void testSSLHandshakeExceptionIsWrapped() throws Exception {
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener();
SSLHandshakeException exception = new SSLHandshakeException(randomAsciiAlphanumOfLength(5));
syncResponseListener.onFailure(exception);
try {
Expand All @@ -230,7 +230,7 @@ public void testSSLHandshakeExceptionIsWrapped() throws Exception {
}

public void testIOExceptionIsBuiltCorrectly() throws Exception {
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener();
IOException ioException = new IOException();
syncResponseListener.onFailure(ioException);
try {
Expand All @@ -247,7 +247,7 @@ public void testIOExceptionIsBuiltCorrectly() throws Exception {
}

public void testExceptionIsWrapped() throws Exception {
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener();
//we just need any checked exception
URISyntaxException exception = new URISyntaxException("test", "test");
syncResponseListener.onFailure(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,6 @@ public void usage() throws IOException, InterruptedException {
builder.setDefaultHeaders(defaultHeaders); // <1>
//end::rest-client-init-default-headers
}
{
//tag::rest-client-init-max-retry-timeout
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200, "http"));
builder.setMaxRetryTimeoutMillis(10000); // <1>
//end::rest-client-init-max-retry-timeout
}
{
//tag::rest-client-init-node-selector
RestClientBuilder builder = RestClient.builder(
Expand Down Expand Up @@ -305,8 +298,7 @@ public RequestConfig.Builder customizeRequestConfig(
.setConnectTimeout(5000)
.setSocketTimeout(60000);
}
})
.setMaxRetryTimeoutMillis(60000);
});
//end::rest-client-config-timeouts
}
{
Expand Down
3 changes: 1 addition & 2 deletions docs/java-rest/low-level/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/htt
as an argument and has the same return type. The request config builder can
be modified and then returned. In the following example we increase the
connect timeout (defaults to 1 second) and the socket timeout (defaults to 30
seconds). Also we adjust the max retry timeout accordingly (defaults to 30
seconds too).
seconds).

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
Expand Down
9 changes: 0 additions & 9 deletions docs/java-rest/low-level/usage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,6 @@ include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-defaul
<1> Set the default headers that need to be sent with each request, to
prevent having to specify them with each single request

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-max-retry-timeout]
--------------------------------------------------
<1> Set the timeout that should be honoured in case multiple attempts are made
for the same request. The default value is 30 seconds, same as the default
socket timeout. In case the socket timeout is customized, the maximum retry
timeout should be adjusted accordingly

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-failure-listener]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public static Iterable<Object[]> parameters() throws Exception {
protected Settings restClientSettings() {
// Give more time to repository-azure to complete the snapshot operations
return Settings.builder().put(super.restClientSettings())
.put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "60s")
.put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "60s")
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ protected final Settings restClientSettings() {
// increase the timeout here to 90 seconds to handle long waits for a green
// cluster health. the waits for green need to be longer than a minute to
// account for delayed shards
.put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s")
.put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s")
.build();
}
Expand Down
Loading

0 comments on commit 5059a0b

Please sign in to comment.