Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(elasticsearch): build in resilience against IO exceptions on httpclient #6680

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,28 @@

import com.linkedin.gms.factory.auth.AwsRequestSigningApacheInterceptor;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import java.io.IOException;
import javax.annotation.Nonnull;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.util.PublicSuffixMatcherLoader;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.http.nio.conn.NoopIOSessionStrategy;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.IOReactorExceptionHandler;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
Expand Down Expand Up @@ -70,106 +84,104 @@ public class RestHighLevelClientFactory {

@Bean(name = "elasticSearchRestHighLevelClient")
@Nonnull
protected RestHighLevelClient createInstance() {
RestClientBuilder restClientBuilder;
if (useSSL) {
restClientBuilder = loadRestHttpsClient(host, port, pathPrefix, threadCount, connectionRequestTimeout, sslContext, username,
password, opensearchUseAwsIamAuth, region);
} else {
restClientBuilder = loadRestHttpClient(host, port, pathPrefix, threadCount, connectionRequestTimeout, username,
password, opensearchUseAwsIamAuth, region);
}
public RestHighLevelClient createInstance(RestClientBuilder restClientBuilder) {

return new RestHighLevelClient(restClientBuilder);
}

@Nonnull
private static RestClientBuilder loadRestHttpClient(@Nonnull String host, int port, String pathPrefix, int threadCount,
int connectionRequestTimeout) {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
@Bean
public RestClientBuilder loadRestClient() {
final RestClientBuilder builder = createBuilder(useSSL ? "https" : "http");

if (!StringUtils.isEmpty(pathPrefix)) {
builder.setPathPrefix(pathPrefix);
}

builder.setRequestConfigCallback(
requestConfigBuilder -> requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout));
builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
if (useSSL) {
httpAsyncClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier());
}
try {
httpAsyncClientBuilder.setConnectionManager(createConnectionManager());
} catch (IOReactorException e) {
throw new IllegalStateException("Unable to start ElasticSearch client. Please verify connection configuration.");
}
httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build());

return builder;
}
setCredentials(httpAsyncClientBuilder);

@Nonnull
private static RestClientBuilder loadRestHttpClient(@Nonnull String host, int port, String pathPrefix, int threadCount,
int connectionRequestTimeout, String username, String password, boolean opensearchUseAwsIamAuth, String region) {
RestClientBuilder builder = loadRestHttpClient(host, port, pathPrefix, threadCount, connectionRequestTimeout);

builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build());

if (username != null && password != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
if (opensearchUseAwsIamAuth) {
HttpRequestInterceptor interceptor = getAwsRequestSigningInterceptor(region);
httpAsyncClientBuilder.addInterceptorLast(interceptor);
}

return httpAsyncClientBuilder;
}
return httpAsyncClientBuilder;
});

return builder;
}

@Nonnull
private static RestClientBuilder loadRestHttpsClient(@Nonnull String host, int port, String pathPrefix, int threadCount,
int connectionRequestTimeout, @Nonnull SSLContext sslContext, String username, String password,
boolean opensearchUseAwsIamAuth, String region) {

final RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "https"));
private RestClientBuilder createBuilder(String scheme) {
final RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, scheme));

if (!StringUtils.isEmpty(pathPrefix)) {
builder.setPathPrefix(pathPrefix);
}

builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier())
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build());

if (username != null && password != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
} else if (opensearchUseAwsIamAuth) {
HttpRequestInterceptor interceptor = getAwsRequestSigningInterceptor(region);
httpAsyncClientBuilder.addInterceptorLast(interceptor);
}

return httpAsyncClientBuilder;
}
});

builder.setRequestConfigCallback(
requestConfigBuilder -> requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout));

return builder;
}

private static HttpRequestInterceptor getAwsRequestSigningInterceptor(String region) {
/**
* Needed to override ExceptionHandler behavior for cases where IO error would have put client in unrecoverable state
* We don't utilize system properties in the client builder, so setting defaults pulled from
* {@link HttpAsyncClientBuilder#build()}.
* @return
*/
private NHttpClientConnectionManager createConnectionManager() throws IOReactorException {
SSLContext sslContext = SSLContexts.createDefault();
HostnameVerifier hostnameVerifier = new DefaultHostnameVerifier(PublicSuffixMatcherLoader.getDefault());
SchemeIOSessionStrategy sslStrategy =
new SSLIOSessionStrategy(sslContext, null, null, hostnameVerifier);

IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(threadCount).build();
DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
IOReactorExceptionHandler ioReactorExceptionHandler = new IOReactorExceptionHandler() {
@Override
public boolean handle(IOException ex) {
log.error("IO Exception caught during ElasticSearch connection.", ex);
return true;
}

@Override
public boolean handle(RuntimeException ex) {
log.error("Runtime Exception caught during ElasticSearch connection.", ex);
return true;
}
};
ioReactor.setExceptionHandler(ioReactorExceptionHandler);

return new PoolingNHttpClientConnectionManager(ioReactor,
RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", sslStrategy)
.build());
}

private void setCredentials(HttpAsyncClientBuilder httpAsyncClientBuilder) {
if (username != null && password != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
if (opensearchUseAwsIamAuth) {
HttpRequestInterceptor interceptor = getAwsRequestSigningInterceptor(region);
httpAsyncClientBuilder.addInterceptorLast(interceptor);
}
}

private HttpRequestInterceptor getAwsRequestSigningInterceptor(String region) {

if (region == null) {
throw new NullPointerException("Region must not be null when opensearchUseAwsIamAuth is enabled");
throw new IllegalArgumentException("Region must not be null when opensearchUseAwsIamAuth is enabled");
}
Aws4Signer signer = Aws4Signer.create();
// Uses default AWS credentials
HttpRequestInterceptor interceptor = new AwsRequestSigningApacheInterceptor("es", signer,
return new AwsRequestSigningApacheInterceptor("es", signer,
DefaultCredentialsProvider.create(), region);
return interceptor;
}
}