Skip to content

Commit

Permalink
Addressing code review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Feb 13, 2023
1 parent 317dcac commit 455048c
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 155 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ jobs:
compile_and_test:
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: 1.16.0
flink_version: 1.16.1
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,19 @@
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContexts;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -135,12 +124,7 @@ class OpensearchAsyncWriter<InputT> extends AsyncSinkWriter<InputT, DocSerdeRequ
.build(),
initialStates);

this.client =
new RestHighLevelClient(
configureRestClientBuilder(
RestClient.builder(hosts.toArray(new HttpHost[0])),
networkClientConfig));

this.client = OpensearchRestClientCreator.create(hosts, networkClientConfig);
final SinkWriterMetricGroup metricGroup = context.metricGroup();
checkNotNull(metricGroup);

Expand Down Expand Up @@ -244,64 +228,4 @@ private void handlePartiallyFailedBulkRequests(
numRecordsOutErrorsCounter.inc(failedRequestEntries.size());
requestResult.accept(failedRequestEntries);
}

private static RestClientBuilder configureRestClientBuilder(
RestClientBuilder builder, NetworkClientConfig networkClientConfig) {
if (networkClientConfig.getConnectionPathPrefix() != null) {
builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
}

builder.setHttpClientConfigCallback(
httpClientBuilder -> {
if (networkClientConfig.getPassword() != null
&& networkClientConfig.getUsername() != null) {
final CredentialsProvider credentialsProvider =
new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(
networkClientConfig.getUsername(),
networkClientConfig.getPassword()));

httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}

if (networkClientConfig.isAllowInsecure().orElse(false)) {
try {
httpClientBuilder.setSSLContext(
SSLContexts.custom()
.loadTrustMaterial(new TrustAllStrategy())
.build());
} catch (final NoSuchAlgorithmException
| KeyStoreException
| KeyManagementException ex) {
throw new IllegalStateException(
"Unable to create custom SSL context", ex);
}
}

return httpClientBuilder;
});
if (networkClientConfig.getConnectionRequestTimeout() != null
|| networkClientConfig.getConnectionTimeout() != null
|| networkClientConfig.getSocketTimeout() != null) {
builder.setRequestConfigCallback(
requestConfigBuilder -> {
if (networkClientConfig.getConnectionRequestTimeout() != null) {
requestConfigBuilder.setConnectionRequestTimeout(
networkClientConfig.getConnectionRequestTimeout());
}
if (networkClientConfig.getConnectionTimeout() != null) {
requestConfigBuilder.setConnectTimeout(
networkClientConfig.getConnectionTimeout());
}
if (networkClientConfig.getSocketTimeout() != null) {
requestConfigBuilder.setSocketTimeout(
networkClientConfig.getSocketTimeout());
}
return requestConfigBuilder;
});
}
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.opensearch.sink;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContexts;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;

import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.List;

/** The utility class to encapsulate {@link RestHighLevelClient} creation. */
class OpensearchRestClientCreator {
/** Utility class. */
private OpensearchRestClientCreator() {}

/**
* Creates new instance of {@link RestHighLevelClient}.
*
* @param hosts list of hosts to connect
* @param networkClientConfig client network configuration
* @return new instance of {@link RestHighLevelClient}
*/
static RestHighLevelClient create(
final List<HttpHost> hosts, final NetworkClientConfig networkClientConfig) {
return new RestHighLevelClient(
configureRestClientBuilder(
RestClient.builder(hosts.toArray(new HttpHost[0])), networkClientConfig));
}

private static RestClientBuilder configureRestClientBuilder(
RestClientBuilder builder, NetworkClientConfig networkClientConfig) {
if (networkClientConfig.getConnectionPathPrefix() != null) {
builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
}

builder.setHttpClientConfigCallback(
httpClientBuilder -> {
if (networkClientConfig.getPassword() != null
&& networkClientConfig.getUsername() != null) {
final CredentialsProvider credentialsProvider =
new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(
networkClientConfig.getUsername(),
networkClientConfig.getPassword()));

httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}

if (networkClientConfig.isAllowInsecure().orElse(false)) {
try {
httpClientBuilder.setSSLContext(
SSLContexts.custom()
.loadTrustMaterial(new TrustAllStrategy())
.build());
} catch (final NoSuchAlgorithmException
| KeyStoreException
| KeyManagementException ex) {
throw new IllegalStateException(
"Unable to create custom SSL context", ex);
}
}

return httpClientBuilder;
});
if (networkClientConfig.getConnectionRequestTimeout() != null
|| networkClientConfig.getConnectionTimeout() != null
|| networkClientConfig.getSocketTimeout() != null) {
builder.setRequestConfigCallback(
requestConfigBuilder -> {
if (networkClientConfig.getConnectionRequestTimeout() != null) {
requestConfigBuilder.setConnectionRequestTimeout(
networkClientConfig.getConnectionRequestTimeout());
}
if (networkClientConfig.getConnectionTimeout() != null) {
requestConfigBuilder.setConnectTimeout(
networkClientConfig.getConnectionTimeout());
}
if (networkClientConfig.getSocketTimeout() != null) {
requestConfigBuilder.setSocketTimeout(
networkClientConfig.getSocketTimeout());
}
return requestConfigBuilder;
});
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@
import org.apache.flink.util.function.ThrowingRunnable;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContexts;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BackoffPolicy;
Expand All @@ -44,8 +38,6 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
Expand All @@ -55,9 +47,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.List;

import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
Expand Down Expand Up @@ -107,11 +96,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
this.emitter = checkNotNull(emitter);
this.flushOnCheckpoint = flushOnCheckpoint;
this.mailboxExecutor = checkNotNull(mailboxExecutor);
this.client =
new RestHighLevelClient(
configureRestClientBuilder(
RestClient.builder(hosts.toArray(new HttpHost[0])),
networkClientConfig));
this.client = OpensearchRestClientCreator.create(hosts, networkClientConfig);
this.bulkProcessor = createBulkProcessor(bulkProcessorConfig);
this.requestIndexer = new DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter());
checkNotNull(metricGroup);
Expand Down Expand Up @@ -161,66 +146,6 @@ public void close() throws Exception {
client.close();
}

private static RestClientBuilder configureRestClientBuilder(
RestClientBuilder builder, NetworkClientConfig networkClientConfig) {
if (networkClientConfig.getConnectionPathPrefix() != null) {
builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
}

builder.setHttpClientConfigCallback(
httpClientBuilder -> {
if (networkClientConfig.getPassword() != null
&& networkClientConfig.getUsername() != null) {
final CredentialsProvider credentialsProvider =
new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(
networkClientConfig.getUsername(),
networkClientConfig.getPassword()));

httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}

if (networkClientConfig.isAllowInsecure().orElse(false)) {
try {
httpClientBuilder.setSSLContext(
SSLContexts.custom()
.loadTrustMaterial(new TrustAllStrategy())
.build());
} catch (final NoSuchAlgorithmException
| KeyStoreException
| KeyManagementException ex) {
throw new IllegalStateException(
"Unable to create custom SSL context", ex);
}
}

return httpClientBuilder;
});
if (networkClientConfig.getConnectionRequestTimeout() != null
|| networkClientConfig.getConnectionTimeout() != null
|| networkClientConfig.getSocketTimeout() != null) {
builder.setRequestConfigCallback(
requestConfigBuilder -> {
if (networkClientConfig.getConnectionRequestTimeout() != null) {
requestConfigBuilder.setConnectionRequestTimeout(
networkClientConfig.getConnectionRequestTimeout());
}
if (networkClientConfig.getConnectionTimeout() != null) {
requestConfigBuilder.setConnectTimeout(
networkClientConfig.getConnectionTimeout());
}
if (networkClientConfig.getSocketTimeout() != null) {
requestConfigBuilder.setSocketTimeout(
networkClientConfig.getSocketTimeout());
}
return requestConfigBuilder;
});
}
return builder;
}

private BulkProcessor createBulkProcessor(BulkProcessorConfig bulkProcessorConfig) {

final BulkProcessor.Builder builder =
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ under the License.
</modules>

<properties>
<flink.version>1.16.0</flink.version>
<flink.version>1.16.1</flink.version>
<flink.shaded.version>15.0</flink.shaded.version>

<jackson-bom.version>2.13.4.20221013</jackson-bom.version>
Expand Down

0 comments on commit 455048c

Please sign in to comment.