diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b499503..0a3a100b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ ## [Unreleased] +### Added + +- Added support for optionally using a custom SLF4J logger to trace HTTP lookup queries. + New configuration parameter: `gid.connector.http.source.lookup.request-callback` with default value + `slf4j-lookup-logger`. If this parameter is not provided then the default SLF4J logger + [Slf4JHttpLookupPostRequestCallback](https://github.com/getindata/flink-http-connector/blob/main/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java) + is used instead. + ## [0.13.0] - 2024-04-03 ### Added diff --git a/README.md b/README.md index bbaa0ff9..2fb28f5f 100644 --- a/README.md +++ b/README.md @@ -338,18 +338,43 @@ CREATE TABLE http ( ``` #### Custom request/response callback -Http Sink processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the + +- Http Sink processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the behaviour of the additional stage of processing done by Table API Sink by implementing [HttpPostRequestCallback](src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java) and [HttpPostRequestCallbackFactory](src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java) -interfaces. Custom implementations of `HttpSinkRequestCallbackFactory` can be registered along other factories in -`resources/META-INF.services/org.apache.flink.table.factories.Factory` file and then referenced by their identifiers in +interfaces. Custom implementations of `HttpPostRequestCallbackFactory` can be registered along other factories in +`resources/META-INF/services/org.apache.flink.table.factories.Factory` file and then referenced by their identifiers in the HttpSink DDL property field `gid.connector.http.sink.request-callback`. -A default implementation that logs those pairs as *INFO* level logs using Slf4j + For example, one can create a class `CustomHttpSinkPostRequestCallbackFactory` with a unique identifier, say `rest-sink-logger`, +that implements interface `HttpPostRequestCallbackFactory` to create a new instance of a custom callback +`CustomHttpSinkPostRequestCallback`. This factory can be registered along other factories by appending the fully-qualified name +of class `CustomHttpSinkPostRequestCallbackFactory` in `resources/META-INF/services/org.apache.flink.table.factories.Factory` file +and then reference identifier `rest-sink-logger` in the HttpSink DDL property field `gid.connector.http.sink.request-callback`. + + A default implementation that logs those pairs as *INFO* level logs using Slf4j ([Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java)) is provided. + +- Http Lookup Source processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the +behaviour of the additional stage of processing done by Table Function API by implementing +[HttpPostRequestCallback](src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java) and +[HttpPostRequestCallbackFactory](src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java) +interfaces. + + For example, one can create a class `CustomHttpLookupPostRequestCallbackFactory` with a unique identifier, say `rest-lookup-logger`, +that implements interface `HttpPostRequestCallbackFactory` to create a new instance of a custom callback +`CustomHttpLookupPostRequestCallback`. This factory can be registered along other factories by appending the fully-qualified name +of class `CustomHttpLookupPostRequestCallbackFactory` in `resources/META-INF/services/org.apache.flink.table.factories.Factory` file +and then reference identifier `rest-lookup-logger` in the HTTP lookup DDL property field `gid.connector.http.source.lookup.request-callback`. + + A default implementation that logs those pairs as *INFO* level logs using Slf4j +([Slf4JHttpLookupPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java)) +is provided. + + ## HTTP status code handler Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors. By default all 400s and 500s response codes will be interpreted as error code. @@ -409,6 +434,7 @@ is set to `'true'`, it will be used as header value as is, without any extra mod | gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. | | gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. | | gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. | +| gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. | ### HTTP Sink | Option | Required | Description/Value | diff --git a/src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java b/src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java index f21fe3da..c4b3e5a0 100644 --- a/src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java +++ b/src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java @@ -2,17 +2,27 @@ import org.apache.flink.table.factories.Factory; +import com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSource; import com.getindata.connectors.http.internal.table.sink.HttpDynamicSink; /** * The {@link Factory} that dynamically creates and injects {@link HttpPostRequestCallback} to - * {@link HttpDynamicSink}. + * {@link HttpDynamicSink} and {@link HttpLookupTableSource}. * *

Custom implementations of {@link HttpPostRequestCallbackFactory} can be registered along * other factories in - *

resources/META-INF.services/org.apache.flink.table.factories.Factory
- * file and then referenced by their identifiers in the HttpSink DDL property field - * gid.connector.http.sink.request-callback. + *
resources/META-INF/services/org.apache.flink.table.factories.Factory
+ * file and then referenced by their identifiers in: + *
  • + * The HttpSink DDL property field gid.connector.http.sink.request-callback + * for HTTP sink. + *
  • + *
  • + * The Http lookup DDL property field gid.connector.http.source.lookup.request-callback + * for HTTP lookup. + *
  • + * + *
    * *

    The following example shows the minimum Table API example to create a {@link HttpDynamicSink} * that uses a custom callback created by a factory that returns my-callback as its @@ -30,8 +40,24 @@ * ) * } * + *

    The following example shows the minimum Table API example to create a + * {@link HttpLookupTableSource} that uses a custom callback created by a factory that + * returns my-callback as its identifier. + * + *

    {@code
    + * CREATE TABLE httplookup (
    + *   id bigint
    + * ) with (
    + *   'connector' = 'rest-lookup',
    + *   'url' = 'http://example.com/myendpoint',
    + *   'format' = 'json',
    + *   'gid.connector.http.source.lookup.request-callback' = 'my-callback'
    + * )
    + * }
    + * * @param type of the HTTP request wrapper */ + public interface HttpPostRequestCallbackFactory extends Factory { /** * @return {@link HttpPostRequestCallback} custom request callback instance diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index c7c10e20..5415a159 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -48,6 +48,9 @@ public final class HttpConnectorConfigConstants { GID_CONNECTOR_HTTP + "source.lookup.error.code"; // ----------------------------------------------------- + public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER = + GID_CONNECTOR_HTTP + "source.lookup.request-callback"; + public static final String SINK_REQUEST_CALLBACK_IDENTIFIER = GID_CONNECTOR_HTTP + "sink.request-callback"; diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConfig.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConfig.java index 9acf4aec..cba4b26e 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConfig.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConfig.java @@ -9,6 +9,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; +import com.getindata.connectors.http.HttpPostRequestCallback; + @Builder @Data @RequiredArgsConstructor @@ -26,4 +28,6 @@ public class HttpLookupConfig implements Serializable { @Builder.Default private final ReadableConfig readableConfig = new Configuration(); + + private final HttpPostRequestCallback httpPostRequestCallback; } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java index 2b00ec67..9a7b4320 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java @@ -5,6 +5,7 @@ import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW; import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER; public class HttpLookupConnectorOptions { @@ -47,4 +48,9 @@ public class HttpLookupConnectorOptions { .booleanType() .defaultValue(false) .withDescription("Whether to use the raw value of Authorization header"); + + public static final ConfigOption REQUEST_CALLBACK_IDENTIFIER = + ConfigOptions.key(SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER) + .stringType() + .defaultValue(Slf4jHttpLookupPostRequestCallbackFactory.IDENTIFIER); } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java index 7204b9f5..6822e91b 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java @@ -23,6 +23,7 @@ import static org.apache.flink.table.api.DataTypes.FIELD; import static org.apache.flink.table.types.utils.DataTypeUtils.removeTimeAttribute; +import com.getindata.connectors.http.HttpPostRequestCallbackFactory; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.utils.ConfigUtils; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*; @@ -88,7 +89,7 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { - return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD); + return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD, REQUEST_CALLBACK_IDENTIFIER); } private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) { @@ -96,12 +97,21 @@ private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig re Properties httpConnectorProperties = ConfigUtils.getHttpConnectorProperties(context.getCatalogTable().getOptions()); + final HttpPostRequestCallbackFactory + postRequestCallbackFactory = + FactoryUtil.discoverFactory( + context.getClassLoader(), + HttpPostRequestCallbackFactory.class, + readableConfig.get(REQUEST_CALLBACK_IDENTIFIER) + ); + return HttpLookupConfig.builder() .lookupMethod(readableConfig.get(LOOKUP_METHOD)) .url(readableConfig.get(URL)) .useAsync(readableConfig.get(ASYNC_POLLING)) .properties(httpConnectorProperties) .readableConfig(readableConfig) + .httpPostRequestCallback(postRequestCallbackFactory.createHttpPostRequestCallback()) .build(); } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index d1e13324..995cb03d 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -47,8 +47,7 @@ public JavaNetHttpPollingClient( this.responseBodyDecoder = responseBodyDecoder; this.requestFactory = requestFactory; - // TODO inject same way as it is done for Sink - this.httpPostRequestCallback = new Slf4JHttpLookupPostRequestCallback(); + this.httpPostRequestCallback = options.getHttpPostRequestCallback(); // TODO Inject this via constructor when implementing a response processor. // Processor will be injected and it will wrap statusChecker implementation. @@ -92,22 +91,21 @@ private Optional processHttpResponse( this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap()); if (response == null) { - log.warn("Null Http response for request " + request.getHttpRequest().uri().toString()); return Optional.empty(); } String responseBody = response.body(); int statusCode = response.statusCode(); - log.debug("Received {} status code for RestTableSource Request", statusCode); + log.debug("Received status code [%s] for RestTableSource request " + + "with Server response body [%s] ", statusCode, responseBody); + if (notErrorCodeAndNotEmptyBody(responseBody, statusCode)) { - log.trace("Server response body" + responseBody); return Optional.ofNullable(responseBodyDecoder.deserialize(responseBody.getBytes())); } else { log.warn( String.format("Returned Http status code was invalid or returned body was empty. " - + "Status Code [%s], " - + "response body [%s]", statusCode, responseBody) + + "Status Code [%s]", statusCode) ); return Optional.empty(); diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java index 792529fa..a1ae1eae 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java @@ -43,6 +43,8 @@ public void call( } if (response == null) { + log.warn("Null Http response for request " + httpRequest.uri().toString()); + log.info( "Got response for a request.\n Request:\n URL: {}\n " + "Method: {}\n Headers: {}\n Params/Body: {}\nResponse: null", diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4jHttpLookupPostRequestCallbackFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4jHttpLookupPostRequestCallbackFactory.java new file mode 100644 index 00000000..406a71b7 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4jHttpLookupPostRequestCallbackFactory.java @@ -0,0 +1,38 @@ +package com.getindata.connectors.http.internal.table.lookup; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.flink.configuration.ConfigOption; + +import com.getindata.connectors.http.HttpPostRequestCallback; +import com.getindata.connectors.http.HttpPostRequestCallbackFactory; + +/** + * Factory for creating {@link Slf4JHttpLookupPostRequestCallback}. + */ +public class Slf4jHttpLookupPostRequestCallbackFactory + implements HttpPostRequestCallbackFactory { + + public static final String IDENTIFIER = "slf4j-lookup-logger"; + + @Override + public HttpPostRequestCallback createHttpPostRequestCallback() { + return new Slf4JHttpLookupPostRequestCallback(); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return new HashSet<>(); + } + + @Override + public Set> optionalOptions() { + return new HashSet<>(); + } +} diff --git a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 8c61d96b..47fb2c71 100644 --- a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -2,5 +2,6 @@ com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory com.getindata.connectors.http.internal.table.lookup.querycreators.ElasticSearchLiteQueryCreatorFactory com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreatorFactory com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonQueryCreatorFactory +com.getindata.connectors.http.internal.table.lookup.Slf4jHttpLookupPostRequestCallbackFactory com.getindata.connectors.http.internal.table.sink.HttpDynamicTableSinkFactory com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallbackFactory \ No newline at end of file diff --git a/src/test/java/com/getindata/connectors/http/HttpPostRequestCallbackFactoryTest.java b/src/test/java/com/getindata/connectors/http/HttpPostRequestCallbackFactoryTest.java index 4a3719a6..7d503f99 100644 --- a/src/test/java/com/getindata/connectors/http/HttpPostRequestCallbackFactoryTest.java +++ b/src/test/java/com/getindata/connectors/http/HttpPostRequestCallbackFactoryTest.java @@ -9,17 +9,21 @@ import com.github.tomakehurst.wiremock.WireMockServer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import static com.github.tomakehurst.wiremock.client.WireMock.*; import static org.junit.jupiter.api.Assertions.assertEquals; import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest; +import com.getindata.connectors.http.internal.table.lookup.HttpLookupSourceRequestEntry; import com.getindata.connectors.http.internal.table.sink.HttpDynamicTableSinkFactory; +import static com.getindata.connectors.http.TestLookupPostRequestCallbackFactory.TEST_LOOKUP_POST_REQUEST_CALLBACK_IDENT; import static com.getindata.connectors.http.TestPostRequestCallbackFactory.TEST_POST_REQUEST_CALLBACK_IDENT; public class HttpPostRequestCallbackFactoryTest { @@ -30,6 +34,10 @@ public class HttpPostRequestCallbackFactoryTest { protected StreamTableEnvironment tEnv; private static final ArrayList requestEntries = new ArrayList<>(); + + private static final ArrayList + lookupRequestEntries = new ArrayList<>(); + private static final ArrayList> responses = new ArrayList<>(); @BeforeEach @@ -87,6 +95,54 @@ public void httpPostRequestCallbackFactoryTest(String mode, String expectedReque Assertions.assertThat(actualRequest).isEqualToIgnoringNewLines(expectedRequest); } + @Test + public void httpLookupPostRequestCallbackFactoryTest() + throws ExecutionException, InterruptedException { + wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")).willReturn( + aResponse().withStatus(200).withBody("{\"customerId\": 1}") + )); + + final String createTable1 = + "CREATE TABLE Orders (\n" + + " proc_time AS PROCTIME(),\n" + + " orderId INT\n" + + ") WITH (\n" + + " 'connector' = 'datagen',\n" + + " 'fields.orderId.kind' = 'sequence',\n" + + " 'fields.orderId.start' = '1',\n" + + " 'fields.orderId.end' = '1'\n" + + ");"; + tEnv.executeSql(createTable1); + + final String createTable2 = + String.format( + "CREATE TABLE Customers (\n" + + " `customerId` INT\n" + + ") with (\n" + + " 'connector' = '%s',\n" + + " 'url' = '%s',\n" + + " 'format' = 'json',\n" + + " 'gid.connector.http.source.lookup.request-callback' = '%s'\n" + + ")", + "rest-lookup", + "http://localhost:" + SERVER_PORT + "/myendpoint", + TEST_LOOKUP_POST_REQUEST_CALLBACK_IDENT + ); + tEnv.executeSql(createTable2); + + final String joinTable = + "SELECT o.`orderId`, c.`customerId`\n" + + " FROM Orders AS o\n" + + " JOIN Customers FOR SYSTEM_TIME AS OF o.`proc_time` AS c\n" + + " ON o.`orderId` = c.`customerId`;"; + + final TableResult resultTable = tEnv.sqlQuery(joinTable).execute(); + resultTable.await(); + + assertEquals(1, lookupRequestEntries.size()); + assertEquals(1, responses.size()); + } + public static class TestPostRequestCallback implements HttpPostRequestCallback { @Override public void call( @@ -99,4 +155,18 @@ public void call( responses.add(response); } } + + public static class TestLookupPostRequestCallback + implements HttpPostRequestCallback { + @Override + public void call( + HttpResponse response, + HttpLookupSourceRequestEntry requestEntry, + String endpointUrl, + Map headerMap + ) { + lookupRequestEntries.add(requestEntry); + responses.add(response); + } + } } diff --git a/src/test/java/com/getindata/connectors/http/TestLookupPostRequestCallbackFactory.java b/src/test/java/com/getindata/connectors/http/TestLookupPostRequestCallbackFactory.java new file mode 100644 index 00000000..3b7df1c4 --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/TestLookupPostRequestCallbackFactory.java @@ -0,0 +1,29 @@ +package com.getindata.connectors.http; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.flink.configuration.ConfigOption; + +import com.getindata.connectors.http.internal.table.lookup.HttpLookupSourceRequestEntry; + +public class TestLookupPostRequestCallbackFactory + implements HttpPostRequestCallbackFactory { + + public static final String TEST_LOOKUP_POST_REQUEST_CALLBACK_IDENT = + "test-lookup-request-callback"; + + @Override + public HttpPostRequestCallback createHttpPostRequestCallback() { + return new HttpPostRequestCallbackFactoryTest.TestLookupPostRequestCallback(); + } + + @Override + public String factoryIdentifier() { return TEST_LOOKUP_POST_REQUEST_CALLBACK_IDENT; } + + @Override + public Set> requiredOptions() { return new HashSet<>(); } + + @Override + public Set> optionalOptions() { return new HashSet<>(); } +} diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java index 277f7d84..df7b069b 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java @@ -364,6 +364,7 @@ private JavaNetHttpPollingClient setUpPollingClient( HttpLookupConfig lookupConfig = HttpLookupConfig.builder() .url(url) .properties(properties) + .httpPostRequestCallback(new Slf4JHttpLookupPostRequestCallback()) .build(); DataType physicalDataType = DataTypes.ROW( diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java index 9545f9d1..683f9261 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java @@ -259,6 +259,7 @@ private JavaNetHttpPollingClient setUpPollingClient(Properties properties) { HttpLookupConfig lookupConfig = HttpLookupConfig.builder() .url("https://localhost:" + HTTPS_SERVER_PORT + ENDPOINT) .properties(properties) + .httpPostRequestCallback(new Slf4JHttpLookupPostRequestCallback()) .build(); DataType physicalDataType = DataTypes.ROW( diff --git a/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 6f86c05e..df4f5338 100644 --- a/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -1,2 +1,3 @@ com.getindata.connectors.http.TestPostRequestCallbackFactory +com.getindata.connectors.http.TestLookupPostRequestCallbackFactory com.getindata.connectors.http.internal.table.lookup.querycreators.CustomFormatFactory \ No newline at end of file