Skip to content

Commit 674fb17

Browse files
HTTP-150 Add Proxy to HttpClient for Lookup Source (#151)
1 parent d200f1d commit 674fb17

File tree

11 files changed

+367
-15
lines changed

11 files changed

+367
-15
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## [Unreleased]
44

5+
- Added option to define a proxy for the lookup source (including authentication)
6+
57
- Added support for generic json and URL query creator
68

79
- Retries support for source table:

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,10 @@ be requested if the current time is later than the cached token expiry time minu
555555
| gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff | optional | Exponential-delay initial delay. Default 1 second. |
556556
| gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff | optional | Exponential-delay maximum delay. Default 1 minute. Use with `lookup.max-retries` parameter. |
557557
| gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier | optional | Exponential-delay multiplier. Default value 1.5 |
558+
| gid.connector.http.source.lookup.proxy.host | optional | Specify the hostname of the proxy. |
559+
| gid.connector.http.source.lookup.proxy.port | optional | Specify the port of the proxy. |
560+
| gid.connector.http.source.lookup.proxy.username | optional | Specify the username used for proxy authentication. |
561+
| gid.connector.http.source.lookup.proxy.password | optional | Specify the password used for proxy authentication. |
558562
| gid.connector.http.request.query-param-fields | optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The names of the fields that will be mapped to query parameters. The parameters are separated by semicolons, such as `param1;param2`. |
559563
| gid.connector.http.request.body-fields | optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The names of the fields that will be mapped to the body. The parameters are separated by semicolons, such as `param1;param2`. | |
560564
| gid.connector.http.request.url-map | optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The map of insert names to column names used as url segments. Parses a string as a map of strings. For example if there are table columns called `customerId` and `orderId`, then specifying value `customerId:cid1,orderID:oid` and a url of https://myendpoint/customers/{cid}/orders/{oid} will mean that the url used for the lookup query will dynamically pickup the values for `customerId`, `orderId` and use them in the url. The expected format of the map is: `key1:value1,key2:value2`. |

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,18 @@ public final class HttpConnectorConfigConstants {
8888
public static final String SOURCE_CONNECTION_TIMEOUT =
8989
SOURCE_LOOKUP_PREFIX + "connection.timeout";
9090

91+
public static final String SOURCE_PROXY_HOST =
92+
SOURCE_LOOKUP_PREFIX + "proxy.host";
93+
94+
public static final String SOURCE_PROXY_PORT =
95+
SOURCE_LOOKUP_PREFIX + "proxy.port";
96+
97+
public static final String SOURCE_PROXY_USERNAME =
98+
SOURCE_LOOKUP_PREFIX + "proxy.username";
99+
100+
public static final String SOURCE_PROXY_PASSWORD =
101+
SOURCE_LOOKUP_PREFIX + "proxy.password";
102+
91103
public static final String SINK_HTTP_TIMEOUT_SECONDS =
92104
GID_CONNECTOR_HTTP + "sink.request.timeout";
93105

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,30 @@ public class HttpLookupConnectorOptions {
8282
.noDefaultValue()
8383
.withDescription("Http client connection timeout.");
8484

85+
public static final ConfigOption<String> SOURCE_LOOKUP_PROXY_HOST =
86+
ConfigOptions.key(SOURCE_PROXY_HOST)
87+
.stringType()
88+
.noDefaultValue()
89+
.withDescription("Http client proxy host.");
90+
91+
public static final ConfigOption<Integer> SOURCE_LOOKUP_PROXY_PORT =
92+
ConfigOptions.key(SOURCE_PROXY_PORT)
93+
.intType()
94+
.noDefaultValue()
95+
.withDescription("Http client proxy port.");
96+
97+
public static final ConfigOption<String> SOURCE_LOOKUP_PROXY_USERNAME =
98+
ConfigOptions.key(SOURCE_PROXY_USERNAME)
99+
.stringType()
100+
.noDefaultValue()
101+
.withDescription("Http client proxy username for authentication.");
102+
103+
public static final ConfigOption<String> SOURCE_LOOKUP_PROXY_PASSWORD =
104+
ConfigOptions.key(SOURCE_PROXY_PASSWORD)
105+
.stringType()
106+
.noDefaultValue()
107+
.withDescription("Http client proxy password for authentication.");
108+
85109
public static final ConfigOption<String> SOURCE_LOOKUP_RETRY_STRATEGY =
86110
ConfigOptions.key(SOURCE_RETRY_STRATEGY_TYPE)
87111
.stringType()

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ public Set<ConfigOption<?>> optionalOptions() {
134134
SOURCE_LOOKUP_HTTP_RETRY_CODES,
135135
SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES,
136136

137+
SOURCE_LOOKUP_PROXY_HOST,
138+
SOURCE_LOOKUP_PROXY_PORT,
139+
SOURCE_LOOKUP_PROXY_USERNAME,
140+
SOURCE_LOOKUP_PROXY_PASSWORD,
137141
SOURCE_LOOKUP_CONNECTION_TIMEOUT // TODO: add request timeout from properties
138142
);
139143
}

src/main/java/com/getindata/connectors/http/internal/utils/JavaNetHttpClientFactory.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package com.getindata.connectors.http.internal.utils;
22

3+
import java.net.InetSocketAddress;
4+
import java.net.ProxySelector;
35
import java.net.http.HttpClient;
46
import java.net.http.HttpClient.Redirect;
57
import java.security.NoSuchAlgorithmException;
68
import java.util.ArrayList;
79
import java.util.List;
10+
import java.util.Optional;
811
import java.util.Properties;
912
import java.util.concurrent.Executor;
1013
import javax.net.ssl.SSLContext;
@@ -14,6 +17,7 @@
1417
import lombok.AccessLevel;
1518
import lombok.NoArgsConstructor;
1619
import lombok.extern.slf4j.Slf4j;
20+
import org.apache.flink.configuration.ReadableConfig;
1721
import org.apache.flink.util.StringUtils;
1822

1923
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
@@ -41,9 +45,26 @@ public static HttpClient createClient(HttpLookupConfig options) {
4145
.followRedirects(Redirect.NORMAL)
4246
.sslContext(sslContext);
4347

44-
options.getReadableConfig()
45-
.getOptional(HttpLookupConnectorOptions.SOURCE_LOOKUP_CONNECTION_TIMEOUT)
46-
.ifPresent(clientBuilder::connectTimeout);
48+
ReadableConfig readableConfig = options.getReadableConfig();
49+
50+
readableConfig
51+
.getOptional(HttpLookupConnectorOptions.SOURCE_LOOKUP_CONNECTION_TIMEOUT)
52+
.ifPresent(clientBuilder::connectTimeout);
53+
54+
Optional<String> proxyHost = readableConfig.getOptional(HttpLookupConnectorOptions.SOURCE_LOOKUP_PROXY_HOST);
55+
Optional<Integer> proxyPort = readableConfig.getOptional(HttpLookupConnectorOptions.SOURCE_LOOKUP_PROXY_PORT);
56+
57+
if(proxyHost.isPresent() && proxyPort.isPresent()){
58+
59+
Optional<String> proxyUsername = readableConfig
60+
.getOptional(HttpLookupConnectorOptions.SOURCE_LOOKUP_PROXY_USERNAME);
61+
Optional<String> proxyPassword = readableConfig
62+
.getOptional(HttpLookupConnectorOptions.SOURCE_LOOKUP_PROXY_PASSWORD);
63+
64+
ProxyConfig proxyConfig = new ProxyConfig(proxyHost.get(), proxyPort.get(), proxyUsername, proxyPassword);
65+
clientBuilder.proxy(ProxySelector.of(new InetSocketAddress(proxyConfig.getHost(), proxyConfig.getPort())));
66+
proxyConfig.getAuthenticator().ifPresent(clientBuilder::authenticator);
67+
}
4768

4869
return clientBuilder.build();
4970
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.getindata.connectors.http.internal.utils;
2+
3+
import java.net.Authenticator;
4+
import java.net.PasswordAuthentication;
5+
import java.util.Optional;
6+
7+
import lombok.Getter;
8+
9+
@Getter
10+
public class ProxyConfig {
11+
12+
private final String host;
13+
14+
private final int port;
15+
16+
private final Optional<Authenticator> authenticator;
17+
18+
public ProxyConfig(String host, int port, Optional<String> proxyUsername, Optional<String> proxyPassword) {
19+
this.host = host;
20+
this.port = port;
21+
22+
if(proxyUsername.isPresent() && proxyPassword.isPresent()){
23+
this.authenticator = Optional.of(new Authenticator() {
24+
@Override
25+
protected PasswordAuthentication getPasswordAuthentication() {
26+
if (getRequestorType().equals(RequestorType.PROXY) && getRequestingHost().equalsIgnoreCase(host)) {
27+
return new PasswordAuthentication(proxyUsername.get(),
28+
proxyPassword.get().toCharArray());
29+
} else {
30+
return null;
31+
}
32+
}
33+
});
34+
} else {
35+
this.authenticator = Optional.empty();
36+
}
37+
38+
}
39+
40+
}

src/test/java/com/getindata/connectors/http/internal/status/HttpCodesParserTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ class HttpCodesParserTest {
2222

2323
@ParameterizedTest
2424
@ValueSource(strings = {
25-
"6XX",
26-
"1XXX",
27-
"600",
28-
"99",
29-
"1XX,11",
30-
"abc",
31-
"!1XX",
32-
"1 2 3",
33-
"1X X"
25+
"6XX",
26+
"1XXX",
27+
"600",
28+
"99",
29+
"1XX,11",
30+
"abc",
31+
"!1XX",
32+
"1 2 3",
33+
"1X X"
3434
})
3535
void failWhenCodeExpressionIsInvalid(String codeExpression) {
3636
assertThrows(ConfigurationException.class,

src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,9 +310,9 @@ void shouldProcessWithMissingArguments() throws ConfigurationException {
310310

311311
@ParameterizedTest
312312
@CsvSource({
313-
"user:password, false",
314-
"Basic dXNlcjpwYXNzd29yZA==, false",
315-
"Basic dXNlcjpwYXNzd29yZA==, true"
313+
"user:password, false",
314+
"Basic dXNlcjpwYXNzd29yZA==, false",
315+
"Basic dXNlcjpwYXNzd29yZA==, true"
316316
})
317317
public void shouldConnectWithBasicAuth(String authorizationHeaderValue,
318318
boolean useRawAuthHeader) throws ConfigurationException {

0 commit comments

Comments
 (0)