Skip to content

Commit

Permalink
Remove support for maxRetryTimeout from low-level REST client (#38085)
Browse files Browse the repository at this point in the history
We have had various reports of problems caused by the maxRetryTimeout
setting in the low-level REST client. Such setting was initially added
in the attempts to not have requests go through retries if the request
already took longer than the provided timeout.

The implementation was problematic though as such timeout would also
expire in the first request attempt (see #31834), would leave the
request executing after expiration causing memory leaks (see #33342),
and would not take into account the http client internal queuing (see #25951).

Given all these issues, it seems that this custom timeout mechanism 
gives little benefits while causing a lot of harm. We should rather rely 
on connect and socket timeout exposed by the underlying http client 
and accept that a request can overall take longer than the configured 
timeout, which is the case even with a single retry anyways.

This commit removes the `maxRetryTimeout` setting and all of its usages.
  • Loading branch information
javanna authored Feb 6, 2019
1 parent 9492dce commit a7046e0
Show file tree
Hide file tree
Showing 23 changed files with 509 additions and 769 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ public ResponseException(Response response) throws IOException {
this.response = response;
}

/**
* Wrap a {@linkplain ResponseException} with another one with the current
* stack trace. This is used during synchronous calls so that the caller
* ends up in the stack trace of the exception thrown.
*/
ResponseException(ResponseException e) throws IOException {
super(e.getMessage(), e);
this.response = e.getResponse();
}

static String buildMessage(Response response) throws IOException {
String message = String.format(Locale.ROOT,
"method [%s], host [%s], URI [%s], status line [%s]",
Expand Down
463 changes: 234 additions & 229 deletions client/rest/src/main/java/org/elasticsearch/client/RestClient.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@
public final class RestClientBuilder {
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000;
public static final int DEFAULT_MAX_RETRY_TIMEOUT_MILLIS = DEFAULT_SOCKET_TIMEOUT_MILLIS;
public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10;
public static final int DEFAULT_MAX_CONN_TOTAL = 30;

private static final Header[] EMPTY_HEADERS = new Header[0];

private final List<Node> nodes;
private int maxRetryTimeout = DEFAULT_MAX_RETRY_TIMEOUT_MILLIS;
private Header[] defaultHeaders = EMPTY_HEADERS;
private RestClient.FailureListener failureListener;
private HttpClientConfigCallback httpClientConfigCallback;
Expand Down Expand Up @@ -102,20 +100,6 @@ public RestClientBuilder setFailureListener(RestClient.FailureListener failureLi
return this;
}

/**
* Sets the maximum timeout (in milliseconds) to honour in case of multiple retries of the same request.
* {@link #DEFAULT_MAX_RETRY_TIMEOUT_MILLIS} if not specified.
*
* @throws IllegalArgumentException if {@code maxRetryTimeoutMillis} is not greater than 0
*/
public RestClientBuilder setMaxRetryTimeoutMillis(int maxRetryTimeoutMillis) {
if (maxRetryTimeoutMillis <= 0) {
throw new IllegalArgumentException("maxRetryTimeoutMillis must be greater than 0");
}
this.maxRetryTimeout = maxRetryTimeoutMillis;
return this;
}

/**
* Sets the {@link HttpClientConfigCallback} to be used to customize http client configuration
*
Expand Down Expand Up @@ -208,7 +192,7 @@ public CloseableHttpAsyncClient run() {
return createHttpClient();
}
});
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes,
RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
httpClient.start();
return restClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,6 @@ public void testBuild() throws IOException {
assertNotNull(restClient);
}

try {
RestClient.builder(new HttpHost("localhost", 9200))
.setMaxRetryTimeoutMillis(randomIntBetween(Integer.MIN_VALUE, 0));
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals("maxRetryTimeoutMillis must be greater than 0", e.getMessage());
}

try {
RestClient.builder(new HttpHost("localhost", 9200)).setDefaultHeaders(null);
fail("should have failed");
Expand Down Expand Up @@ -156,12 +148,9 @@ public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder reques
builder.setDefaultHeaders(headers);
}
if (randomBoolean()) {
builder.setMaxRetryTimeoutMillis(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
String pathPrefix = (randomBoolean() ? "/" : "") + randomAsciiOfLengthBetween(2, 5);
String pathPrefix = (randomBoolean() ? "/" : "") + randomAsciiLettersOfLengthBetween(2, 5);
while (pathPrefix.length() < 20 && randomBoolean()) {
pathPrefix += "/" + randomAsciiOfLengthBetween(3, 6);
pathPrefix += "/" + randomAsciiLettersOfLengthBetween(3, 6);
}
builder.setPathPrefix(pathPrefix + (randomBoolean() ? "/" : ""));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void onFailure(Exception exception) {
* Test host selector against a real server <strong>and</strong>
* test what happens after calling
*/
public void testNodeSelector() throws IOException {
public void testNodeSelector() throws Exception {
try (RestClient restClient = buildRestClient(firstPositionNodeSelector())) {
Request request = new Request("GET", "/200");
int rounds = between(1, 10);
Expand All @@ -210,7 +210,7 @@ public void testNodeSelector() throws IOException {
*/
if (stoppedFirstHost) {
try {
restClient.performRequest(request);
RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
fail("expected to fail to connect");
} catch (ConnectException e) {
// Windows isn't consistent here. Sometimes the message is even null!
Expand All @@ -219,7 +219,7 @@ public void testNodeSelector() throws IOException {
}
}
} else {
Response response = restClient.performRequest(request);
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
assertEquals(httpHosts[0], response.getHost());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,10 @@
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.ProtocolVersion;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.junit.After;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -49,7 +34,6 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode;
import static org.elasticsearch.client.RestClientTestUtil.randomErrorRetryStatusCode;
Expand All @@ -61,9 +45,6 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Tests for {@link RestClient} behaviour against multiple hosts: fail-over, blacklisting etc.
Expand All @@ -75,47 +56,16 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
private List<Node> nodes;
private HostsTrackingFailureListener failureListener;

@SuppressWarnings("unchecked")
public RestClient createRestClient(NodeSelector nodeSelector) {
CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class);
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
@Override
public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Throwable {
HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0];
final HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest();
final HttpHost httpHost = requestProducer.getTarget();
HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2];
assertThat(context.getAuthCache().get(httpHost), instanceOf(BasicScheme.class));
final FutureCallback<HttpResponse> futureCallback = (FutureCallback<HttpResponse>) invocationOnMock.getArguments()[3];
//return the desired status code or exception depending on the path
exec.execute(new Runnable() {
@Override
public void run() {
if (request.getURI().getPath().equals("/soe")) {
futureCallback.failed(new SocketTimeoutException(httpHost.toString()));
} else if (request.getURI().getPath().equals("/coe")) {
futureCallback.failed(new ConnectTimeoutException(httpHost.toString()));
} else if (request.getURI().getPath().equals("/ioe")) {
futureCallback.failed(new IOException(httpHost.toString()));
} else {
int statusCode = Integer.parseInt(request.getURI().getPath().substring(1));
StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, "");
futureCallback.completed(new BasicHttpResponse(statusLine));
}
}
});
return null;
}
});
CloseableHttpAsyncClient httpClient = RestClientSingleHostTests.mockHttpClient(exec);
int numNodes = RandomNumbers.randomIntBetween(getRandom(), 2, 5);
nodes = new ArrayList<>(numNodes);
for (int i = 0; i < numNodes; i++) {
nodes.add(new Node(new HttpHost("localhost", 9200 + i)));
}
nodes = Collections.unmodifiableList(nodes);
failureListener = new HostsTrackingFailureListener();
return new RestClient(httpClient, 10000, new Header[0], nodes, null, failureListener, nodeSelector, false);
return new RestClient(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false);
}

/**
Expand All @@ -126,14 +76,15 @@ public void shutdownExec() {
exec.shutdown();
}

public void testRoundRobinOkStatusCodes() throws IOException {
public void testRoundRobinOkStatusCodes() throws Exception {
RestClient restClient = createRestClient(NodeSelector.ANY);
int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5);
for (int i = 0; i < numIters; i++) {
Set<HttpHost> hostsSet = hostsSet();
for (int j = 0; j < nodes.size(); j++) {
int statusCode = randomOkStatusCode(getRandom());
Response response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode));
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient,
new Request(randomHttpMethod(getRandom()), "/" + statusCode));
assertEquals(statusCode, response.getStatusLine().getStatusCode());
assertTrue("host not found: " + response.getHost(), hostsSet.remove(response.getHost()));
}
Expand All @@ -142,7 +93,7 @@ public void testRoundRobinOkStatusCodes() throws IOException {
failureListener.assertNotCalled();
}

public void testRoundRobinNoRetryErrors() throws IOException {
public void testRoundRobinNoRetryErrors() throws Exception {
RestClient restClient = createRestClient(NodeSelector.ANY);
int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5);
for (int i = 0; i < numIters; i++) {
Expand All @@ -151,7 +102,8 @@ public void testRoundRobinNoRetryErrors() throws IOException {
String method = randomHttpMethod(getRandom());
int statusCode = randomErrorNoRetryStatusCode(getRandom());
try {
Response response = restClient.performRequest(new Request(method, "/" + statusCode));
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient,
new Request(method, "/" + statusCode));
if (method.equals("HEAD") && statusCode == 404) {
//no exception gets thrown although we got a 404
assertEquals(404, response.getStatusLine().getStatusCode());
Expand All @@ -175,18 +127,13 @@ public void testRoundRobinNoRetryErrors() throws IOException {
failureListener.assertNotCalled();
}

public void testRoundRobinRetryErrors() throws IOException {
public void testRoundRobinRetryErrors() throws Exception {
RestClient restClient = createRestClient(NodeSelector.ANY);
String retryEndpoint = randomErrorRetryEndpoint();
try {
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request(randomHttpMethod(getRandom()), retryEndpoint));
fail("request should have failed");
} catch (ResponseException e) {
/*
* Unwrap the top level failure that was added so the stack trace contains
* the caller. It wraps the exception that contains the failed hosts.
*/
e = (ResponseException) e.getCause();
Set<HttpHost> hostsSet = hostsSet();
//first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each
failureListener.assertCalled(nodes);
Expand All @@ -206,11 +153,6 @@ public void testRoundRobinRetryErrors() throws IOException {
} while(e != null);
assertEquals("every host should have been used but some weren't: " + hostsSet, 0, hostsSet.size());
} catch (IOException e) {
/*
* Unwrap the top level failure that was added so the stack trace contains
* the caller. It wraps the exception that contains the failed hosts.
*/
e = (IOException) e.getCause();
Set<HttpHost> hostsSet = hostsSet();
//first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each
failureListener.assertCalled(nodes);
Expand All @@ -236,7 +178,8 @@ public void testRoundRobinRetryErrors() throws IOException {
for (int j = 0; j < nodes.size(); j++) {
retryEndpoint = randomErrorRetryEndpoint();
try {
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
RestClientSingleHostTests.performRequestSyncOrAsync(restClient,
new Request(randomHttpMethod(getRandom()), retryEndpoint));
fail("request should have failed");
} catch (ResponseException e) {
Response response = e.getResponse();
Expand All @@ -247,11 +190,6 @@ public void testRoundRobinRetryErrors() throws IOException {
failureListener.assertCalled(response.getHost());
assertEquals(0, e.getSuppressed().length);
} catch (IOException e) {
/*
* Unwrap the top level failure that was added so the stack trace contains
* the caller. It wraps the exception that contains the failed hosts.
*/
e = (IOException) e.getCause();
HttpHost httpHost = HttpHost.create(e.getMessage());
assertTrue("host [" + httpHost + "] not found, most likely used multiple times", hostsSet.remove(httpHost));
//after the first request, all hosts are blacklisted, a single one gets resurrected each time
Expand All @@ -268,7 +206,8 @@ public void testRoundRobinRetryErrors() throws IOException {
int statusCode = randomErrorNoRetryStatusCode(getRandom());
Response response;
try {
response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode));
response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient,
new Request(randomHttpMethod(getRandom()), "/" + statusCode));
} catch (ResponseException e) {
response = e.getResponse();
}
Expand All @@ -285,19 +224,15 @@ public void testRoundRobinRetryErrors() throws IOException {
for (int y = 0; y < i + 1; y++) {
retryEndpoint = randomErrorRetryEndpoint();
try {
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
RestClientSingleHostTests.performRequestSyncOrAsync(restClient,
new Request(randomHttpMethod(getRandom()), retryEndpoint));
fail("request should have failed");
} catch (ResponseException e) {
Response response = e.getResponse();
assertThat(response.getStatusLine().getStatusCode(), equalTo(Integer.parseInt(retryEndpoint.substring(1))));
assertThat(response.getHost(), equalTo(selectedHost));
failureListener.assertCalled(selectedHost);
} catch(IOException e) {
/*
* Unwrap the top level failure that was added so the stack trace contains
* the caller. It wraps the exception that contains the failed hosts.
*/
e = (IOException) e.getCause();
HttpHost httpHost = HttpHost.create(e.getMessage());
assertThat(httpHost, equalTo(selectedHost));
failureListener.assertCalled(selectedHost);
Expand All @@ -307,7 +242,7 @@ public void testRoundRobinRetryErrors() throws IOException {
}
}

public void testNodeSelector() throws IOException {
public void testNodeSelector() throws Exception {
NodeSelector firstPositionOnly = new NodeSelector() {
@Override
public void select(Iterable<Node> restClientNodes) {
Expand All @@ -330,12 +265,12 @@ public void select(Iterable<Node> restClientNodes) {
* NodeSelector overrides the round robin behavior.
*/
Request request = new Request("GET", "/200");
Response response = restClient.performRequest(request);
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
assertEquals(nodes.get(0).getHost(), response.getHost());
}
}

public void testSetNodes() throws IOException {
public void testSetNodes() throws Exception {
RestClient restClient = createRestClient(NodeSelector.SKIP_DEDICATED_MASTERS);
List<Node> newNodes = new ArrayList<>(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
Expand All @@ -350,7 +285,7 @@ public void testSetNodes() throws IOException {
* NodeSelector overrides the round robin behavior.
*/
Request request = new Request("GET", "/200");
Response response = restClient.performRequest(request);
Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request);
assertEquals(newNodes.get(0).getHost(), response.getHost());
}
}
Expand Down
Loading

0 comments on commit a7046e0

Please sign in to comment.