From 32afe083b8dead81b4b700529f90ffe5a0428e8e Mon Sep 17 00:00:00 2001 From: raz0r Date: Mon, 22 Apr 2024 17:39:25 +0400 Subject: [PATCH 1/3] add max retries --- .../config/HttpConnectorConfigConstants.java | 6 ++ .../lookup/JavaNetHttpPollingClient.java | 66 ++++++++++++++++--- .../lookup/JavaNetHttpPollingClientTest.java | 2 +- 3 files changed, 65 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index c7c10e20..b4ea7a59 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -92,6 +92,12 @@ public final class HttpConnectorConfigConstants { public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE = GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size"; + public static final String LOOKUP_HTTP_MAX_RETRIES = + GID_CONNECTOR_HTTP + "source.lookup.request.max-retries"; + + public static final String LOOKUP_HTTP_RETRY_TIMEOUT_MS = + GID_CONNECTOR_HTTP + "source.lookup.request.retry-timeout-ms"; + // ----------------------------------------------------- diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index d1e13324..b5fb0afd 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -27,6 +27,10 @@ @Slf4j public class JavaNetHttpPollingClient implements PollingClient { + public static final String DEFAULT_REQUEST_MAX_RETRIES = "3"; + + public static final String DEFAULT_REQUEST_RETRY_TIMEOUT_MS = "1000"; + private final HttpClient httpClient; private final HttpStatusCodeChecker statusCodeChecker; @@ -37,6 +41,10 @@ public class JavaNetHttpPollingClient implements PollingClient { private final HttpPostRequestCallback httpPostRequestCallback; + protected final int httpRequestMaxRetries; + + protected final int httpRequestRetryTimeoutMs; + public JavaNetHttpPollingClient( HttpClient httpClient, DeserializationSchema responseBodyDecoder, @@ -62,6 +70,20 @@ public JavaNetHttpPollingClient( .build(); this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + + this.httpRequestMaxRetries = Integer.parseInt( + options.getProperties().getProperty( + HttpConnectorConfigConstants.LOOKUP_HTTP_MAX_RETRIES, + DEFAULT_REQUEST_MAX_RETRIES + ) + ); + + this.httpRequestRetryTimeoutMs = Integer.parseInt( + options.getProperties().getProperty( + HttpConnectorConfigConstants.LOOKUP_HTTP_RETRY_TIMEOUT_MS, + DEFAULT_REQUEST_RETRY_TIMEOUT_MS + ) + ); } @Override @@ -74,15 +96,43 @@ public Optional pull(RowData lookupRow) { } } - // TODO Add Retry Policy And configure TimeOut from properties - private Optional queryAndProcess(RowData lookupData) throws Exception { - + private Optional queryAndProcess(RowData lookupData) { HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData); - HttpResponse response = httpClient.send( - request.getHttpRequest(), - BodyHandlers.ofString() - ); - return processHttpResponse(response, request); + HttpResponse response = null; + + int retryCount = 0; + + while (retryCount < this.httpRequestMaxRetries) { + try { + response = httpClient.send( + request.getHttpRequest(), + BodyHandlers.ofString() + ); + break; + } catch (IOException e) { + log.error("IOException during HTTP request. Retrying...", e); + retryCount++; + if (retryCount == this.httpRequestMaxRetries) { + log.error("Maximum retries reached. Aborting..."); + return Optional.empty(); + } + try { + Thread.sleep(this.httpRequestRetryTimeoutMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("HTTP request interrupted. Aborting...", e); + return Optional.empty(); + } + } + try { + return processHttpResponse(response, request); + } catch (IOException e) { + log.error("IOException during HTTP response processing.", e); + return Optional.empty(); + } } private Optional processHttpResponse( diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java index 6d6feac1..12bbf12c 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java @@ -1,5 +1,6 @@ package com.getindata.connectors.http.internal.table.lookup; +//import java.io.IOException; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -22,7 +23,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import static org.assertj.core.api.Assertions.assertThat; - import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreator; From d4affb7a381c7e9970ef55e957cb7363db5de28a Mon Sep 17 00:00:00 2001 From: raz0r Date: Mon, 22 Apr 2024 19:10:19 +0400 Subject: [PATCH 2/3] add test --- ...avaNetHttpPollingClientConnectionTest.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java index 277f7d84..955ca4fe 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java @@ -7,7 +7,9 @@ import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.MappingBuilder; +import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder; +import com.github.tomakehurst.wiremock.stubbing.Scenario; import com.github.tomakehurst.wiremock.stubbing.StubMapping; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -94,6 +96,7 @@ public void setUp() { int[][] lookupKey = {{}}; this.dynamicTableSourceContext = new LookupRuntimeProviderContext(lookupKey); + wireMockServer.resetAll(); this.lookupRowData = GenericRowData.of( StringData.fromString("1"), StringData.fromString("2") @@ -290,6 +293,45 @@ public void shouldConnectWithBasicAuth(String authorizationHeaderValue, assertThat(nestedDetailsRow.getString(0).toString()).isEqualTo("$1,729.34"); } + @Test + void shouldRetryOnIOExceptionAndSucceedOnSecondAttempt() { + // GIVEN + this.stubMapping = setUpServerStubForIOExceptionOnFirstAttempt(); + Properties properties = new Properties(); + properties.setProperty( + HttpConnectorConfigConstants.LOOKUP_HTTP_MAX_RETRIES, + "3" + ); + JavaNetHttpPollingClient pollingClient = setUpPollingClient( + getBaseUrl(), properties, setUpGetRequestFactory(properties)); + + // WHEN + Optional poll = pollingClient.pull(lookupRowData); + + // THEN + wireMockServer.verify(2, RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); + + assertThat(poll.isPresent()).isTrue(); + } + + private StubMapping setUpServerStubForIOExceptionOnFirstAttempt() { + wireMockServer.stubFor( + get(urlEqualTo(ENDPOINT + "?id=1&uuid=2")) + .inScenario("Retry Scenario") + .whenScenarioStateIs(Scenario.STARTED) // Initial state + .willReturn(aResponse() + .withFault(Fault.CONNECTION_RESET_BY_PEER)) // Fail the first request + .willSetStateTo("Second Attempt")); // Set the next state + + return wireMockServer.stubFor( + get(urlEqualTo(ENDPOINT + "?id=1&uuid=2")) + .inScenario("Retry Scenario") + .whenScenarioStateIs("Second Attempt") // When the state is "Second Attempt" + .willReturn(aResponse() + .withStatus(200) + .withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json")))); + } + private String getBaseUrl() { return wireMockServer.baseUrl() + ENDPOINT; } From ac73de8b473562037c4978b4ef705e5b86f5f4c0 Mon Sep 17 00:00:00 2001 From: raz0r Date: Tue, 23 Apr 2024 11:33:48 +0400 Subject: [PATCH 3/3] update docs --- CHANGELOG.md | 4 ++++ README.md | 2 ++ .../internal/table/lookup/JavaNetHttpPollingClientTest.java | 2 +- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b499503..3134702a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Fixed + +- Lookup queries are now retried in case of IOException up to `gid.connector.http.source.lookup.max-retries` with a delay of `gid.connector.http.source.lookup.request.retry-timeout-ms` between retries. The default values are 3 retries and 1 second delay. + ## [0.13.0] - 2024-04-03 ### Added diff --git a/README.md b/README.md index bbaa0ff9..8d328272 100644 --- a/README.md +++ b/README.md @@ -406,6 +406,8 @@ is set to `'true'`, it will be used as header value as is, without any extra mod | gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | | gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. | | gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. | +| gid.connector.http.source.lookup.max-retries | optional | Sets the maximum number of retries for HTTP lookup request. If not specified, the default value of 3 retries will be used. | +| gid.connector.http.source.lookup.retry-timeout-ms | optional | Sets the delay between retries in milliseconds. If not specified, the default value of 1000 milliseconds will be used. | | gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. | | gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. | | gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. | diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java index 12bbf12c..6d6feac1 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java @@ -1,6 +1,5 @@ package com.getindata.connectors.http.internal.table.lookup; -//import java.io.IOException; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -23,6 +22,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import static org.assertj.core.api.Assertions.assertThat; + import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreator;