Skip to content

Commit

Permalink
Merge pull request #1721 from ClickHouse/fix_stale_connection_issue
Browse files Browse the repository at this point in the history
Add retry on NoHttpResponseException
  • Loading branch information
chernser authored Jul 9, 2024
2 parents 66f8bce + 845e76d commit 03fcfb1
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.clickhouse.client.AbstractSocketClient;
import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseSocketFactory;
Expand Down Expand Up @@ -31,9 +30,11 @@
import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
import org.apache.hc.client5.http.ssl.DefaultHostnameVerifier;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.NoHttpResponseException;
import org.apache.hc.core5.http.config.Registry;
import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.io.SocketConfig;
Expand All @@ -45,7 +46,6 @@

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -58,7 +58,6 @@
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.StandardSocketOptions;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -251,11 +250,35 @@ protected ClickHouseHttpResponse post(ClickHouseConfig config, String sql, Click
ClickHouseHttpEntity postBody = new ClickHouseHttpEntity(config, contentType, contentEncoding, boundary,
sql, data, tables);
post.setEntity(postBody);
CloseableHttpResponse response;
try {
response = client.execute(post);
} catch (IOException e) {
throw new ConnectException(ClickHouseUtils.format("HTTP request failed: %s", e.getMessage()));
CloseableHttpResponse response = null;

int retryAttempts = config.getBoolOption(ClickHouseHttpOption.AHC_RETRY_ON_FAILURE) ? 2 : 1;
for (int attempt = 0; attempt < retryAttempts; attempt++) {
boolean isLastAttempt = attempt == retryAttempts - 1;
log.debug("HTTP request attempt " + attempt);
try {
response = client.execute(post);

if (!isLastAttempt && (response.getCode() == HttpURLConnection.HTTP_UNAVAILABLE)) {
log.debug("HTTP request failed with status code 503, retrying...");
continue;
}

break;
} catch (NoHttpResponseException | ConnectionClosedException e) {
if (isLastAttempt) {
throw new ConnectException(e.getMessage());
} else {
continue;
}
} catch (IOException e) {
log.error("HTTP request failed", e);
throw new ConnectException(e.getMessage());
}
}
if (response == null) {
// Should not happen but needed for compiler
throw new ConnectException("HTTP request failed");
}

checkResponse(config, response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.clickhouse.data.ClickHouseChecker;

import java.io.Serializable;
import java.net.UnknownHostException;

/**
* Http client options.
Expand Down Expand Up @@ -73,6 +74,20 @@ public enum ClickHouseHttpOption implements ClickHouseOption {
*/
AHC_VALIDATE_AFTER_INACTIVITY("ahc_validate_after_inactivity", 5000L,
"The time in milliseconds after which the connection is validated after inactivity."),

/**
* Whether to retry on failure with AsyncHttpClient. Failure includes some 'critical' IO exceptions:
* <ul>
* <li>{@code org.apache.hc.core5.http.ConnectionClosedException}</li>
* <li>{@code org.apache.hc.core5.http.NoHttpResponseException}</li>
* </ul>
*
* And next status codes:
* <ul>
* <li>{@code 503 Service Unavailable}</li>
* </ul>
*/
AHC_RETRY_ON_FAILURE("ahc_retry_on_failure", false, "Whether to retry on failure with AsyncHttpClient.")
;

private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import java.util.concurrent.atomic.AtomicBoolean;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.admin.model.ScenarioState;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.http.Fault;
import com.github.tomakehurst.wiremock.stubbing.Scenario;
import com.github.tomakehurst.wiremock.stubbing.StubMapping;
import com.github.tomakehurst.wiremock.stubbing.Scenario;
import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
import org.apache.hc.core5.http.NoHttpResponseException;
import org.apache.hc.core5.http.HttpStatus;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -144,12 +148,70 @@ public void testFailureWhileRequest() {
httpClient.executeAndWait(request);
} catch (ClickHouseException e) {
Assert.assertEquals(e.getErrorCode(), ClickHouseException.ERROR_NETWORK);
return;
}

Assert.fail("Should throw exception");
} finally {
faultyServer.stop();
}
}

@Test(groups = {"unit"}, dataProvider = "retryOnFailureProvider")
public void testRetryOnFailure(StubMapping failureStub) {
faultyServer = new WireMockServer(9090);
faultyServer.start();
try {
faultyServer.addStubMapping(failureStub);
faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl())
.withRequestBody(WireMock.equalTo("SELECT 1"))
.inScenario("Retry")
.whenScenarioStateIs("Failed")
.willReturn(WireMock.aResponse()
.withHeader("X-ClickHouse-Summary",
"{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}"))
.build());

ClickHouseHttpClient httpClient = new ClickHouseHttpClient();
Map<ClickHouseOption, Serializable> options = new HashMap<>();
options.put(ClickHouseHttpOption.AHC_RETRY_ON_FAILURE, true);
ClickHouseConfig config = new ClickHouseConfig(options);
httpClient.init(config);
ClickHouseRequest request = httpClient.read("http://localhost:9090/").query("SELECT 1");

ClickHouseResponse response = null;
try {
response = httpClient.executeAndWait(request);
} catch (ClickHouseException e) {
Assert.fail("Should not throw exception", e);
}
Assert.assertEquals(response.getSummary().getReadBytes(), 10);
Assert.assertEquals(response.getSummary().getReadRows(), 1);
} finally {
faultyServer.stop();
}
}

@DataProvider(name = "retryOnFailureProvider")
private static StubMapping[] retryOnFailureProvider() {
return new StubMapping[] {
WireMock.post(WireMock.anyUrl())
.withRequestBody(WireMock.equalTo("SELECT 1"))
.inScenario("Retry")
.whenScenarioStateIs(Scenario.STARTED)
.willReturn(WireMock.aResponse().withFault(Fault.EMPTY_RESPONSE))
.willSetStateTo("Failed")
.build()
,WireMock.post(WireMock.anyUrl())
.withRequestBody(WireMock.equalTo("SELECT 1"))
.inScenario("Retry")
.whenScenarioStateIs(Scenario.STARTED)
.willReturn(WireMock.aResponse().withStatus(HttpStatus.SC_SERVICE_UNAVAILABLE))
.willSetStateTo("Failed")
.build()
};
}

@Test(groups = {"unit"}, dataProvider = "validationTimeoutProvider")
public void testNoHttpResponseExceptionWithValidation(long validationTimeout) {

Expand Down

0 comments on commit 03fcfb1

Please sign in to comment.