diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/RestHighLevelClientFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/RestHighLevelClientFactory.java index f5ec707f749a9a..1da66f3192f807 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/RestHighLevelClientFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/RestHighLevelClientFactory.java @@ -2,14 +2,28 @@ import com.linkedin.gms.factory.auth.AwsRequestSigningApacheInterceptor; import com.linkedin.gms.factory.spring.YamlPropertySourceFactory; +import java.io.IOException; import javax.annotation.Nonnull; +import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.http.HttpRequestInterceptor; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.ssl.DefaultHostnameVerifier; import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.util.PublicSuffixMatcherLoader; +import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; +import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.nio.conn.NHttpClientConnectionManager; +import org.apache.http.nio.conn.NoopIOSessionStrategy; +import org.apache.http.nio.conn.SchemeIOSessionStrategy; +import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; +import org.apache.http.nio.reactor.IOReactorException; +import org.apache.http.nio.reactor.IOReactorExceptionHandler; +import org.apache.http.ssl.SSLContexts; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; @@ -70,106 +84,104 @@ public class RestHighLevelClientFactory { @Bean(name = "elasticSearchRestHighLevelClient") @Nonnull - protected RestHighLevelClient createInstance() { - RestClientBuilder restClientBuilder; - if (useSSL) { - restClientBuilder = loadRestHttpsClient(host, port, pathPrefix, threadCount, connectionRequestTimeout, sslContext, username, - password, opensearchUseAwsIamAuth, region); - } else { - restClientBuilder = loadRestHttpClient(host, port, pathPrefix, threadCount, connectionRequestTimeout, username, - password, opensearchUseAwsIamAuth, region); - } + public RestHighLevelClient createInstance(RestClientBuilder restClientBuilder) { return new RestHighLevelClient(restClientBuilder); } - @Nonnull - private static RestClientBuilder loadRestHttpClient(@Nonnull String host, int port, String pathPrefix, int threadCount, - int connectionRequestTimeout) { - RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http")) - .setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder - .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build())); + @Bean + public RestClientBuilder loadRestClient() { + final RestClientBuilder builder = createBuilder(useSSL ? "https" : "http"); - if (!StringUtils.isEmpty(pathPrefix)) { - builder.setPathPrefix(pathPrefix); - } - - builder.setRequestConfigCallback( - requestConfigBuilder -> requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout)); + builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { + if (useSSL) { + httpAsyncClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier()); + } + try { + httpAsyncClientBuilder.setConnectionManager(createConnectionManager()); + } catch (IOReactorException e) { + throw new IllegalStateException("Unable to start ElasticSearch client. Please verify connection configuration."); + } + httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()); - return builder; - } + setCredentials(httpAsyncClientBuilder); - @Nonnull - private static RestClientBuilder loadRestHttpClient(@Nonnull String host, int port, String pathPrefix, int threadCount, - int connectionRequestTimeout, String username, String password, boolean opensearchUseAwsIamAuth, String region) { - RestClientBuilder builder = loadRestHttpClient(host, port, pathPrefix, threadCount, connectionRequestTimeout); - - builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { - public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) { - httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()); - - if (username != null && password != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); - httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - } - if (opensearchUseAwsIamAuth) { - HttpRequestInterceptor interceptor = getAwsRequestSigningInterceptor(region); - httpAsyncClientBuilder.addInterceptorLast(interceptor); - } - - return httpAsyncClientBuilder; - } + return httpAsyncClientBuilder; }); return builder; } @Nonnull - private static RestClientBuilder loadRestHttpsClient(@Nonnull String host, int port, String pathPrefix, int threadCount, - int connectionRequestTimeout, @Nonnull SSLContext sslContext, String username, String password, - boolean opensearchUseAwsIamAuth, String region) { - - final RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "https")); + private RestClientBuilder createBuilder(String scheme) { + final RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, scheme)); if (!StringUtils.isEmpty(pathPrefix)) { builder.setPathPrefix(pathPrefix); } - builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { - public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) { - httpAsyncClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier()) - .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()); - - if (username != null && password != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); - httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - } else if (opensearchUseAwsIamAuth) { - HttpRequestInterceptor interceptor = getAwsRequestSigningInterceptor(region); - httpAsyncClientBuilder.addInterceptorLast(interceptor); - } - - return httpAsyncClientBuilder; - } - }); - builder.setRequestConfigCallback( requestConfigBuilder -> requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout)); return builder; } - private static HttpRequestInterceptor getAwsRequestSigningInterceptor(String region) { + /** + * Needed to override ExceptionHandler behavior for cases where IO error would have put client in unrecoverable state + * We don't utilize system properties in the client builder, so setting defaults pulled from + * {@link HttpAsyncClientBuilder#build()}. + * @return + */ + private NHttpClientConnectionManager createConnectionManager() throws IOReactorException { + SSLContext sslContext = SSLContexts.createDefault(); + HostnameVerifier hostnameVerifier = new DefaultHostnameVerifier(PublicSuffixMatcherLoader.getDefault()); + SchemeIOSessionStrategy sslStrategy = + new SSLIOSessionStrategy(sslContext, null, null, hostnameVerifier); + + IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(threadCount).build(); + DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); + IOReactorExceptionHandler ioReactorExceptionHandler = new IOReactorExceptionHandler() { + @Override + public boolean handle(IOException ex) { + log.error("IO Exception caught during ElasticSearch connection.", ex); + return true; + } + + @Override + public boolean handle(RuntimeException ex) { + log.error("Runtime Exception caught during ElasticSearch connection.", ex); + return true; + } + }; + ioReactor.setExceptionHandler(ioReactorExceptionHandler); + + return new PoolingNHttpClientConnectionManager(ioReactor, + RegistryBuilder.create() + .register("http", NoopIOSessionStrategy.INSTANCE) + .register("https", sslStrategy) + .build()); + } + + private void setCredentials(HttpAsyncClientBuilder httpAsyncClientBuilder) { + if (username != null && password != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + if (opensearchUseAwsIamAuth) { + HttpRequestInterceptor interceptor = getAwsRequestSigningInterceptor(region); + httpAsyncClientBuilder.addInterceptorLast(interceptor); + } + } + + private HttpRequestInterceptor getAwsRequestSigningInterceptor(String region) { if (region == null) { - throw new NullPointerException("Region must not be null when opensearchUseAwsIamAuth is enabled"); + throw new IllegalArgumentException("Region must not be null when opensearchUseAwsIamAuth is enabled"); } Aws4Signer signer = Aws4Signer.create(); // Uses default AWS credentials - HttpRequestInterceptor interceptor = new AwsRequestSigningApacheInterceptor("es", signer, + return new AwsRequestSigningApacheInterceptor("es", signer, DefaultCredentialsProvider.create(), region); - return interceptor; } }