From 767df42db0e1b1167fd2c63437851826d4cecbc5 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 25 Apr 2018 15:29:58 +0200 Subject: [PATCH 1/6] Watcher: Configure HttpClient parallel sent requests The HTTPClient used in watcher is based on the apache http client. The current client is using a lot of defaults - which are not always optimal. Two of those defaults are the maximum number of total connections and the maximum number of connections to a single route. If one of those limits is reached, the HTTPClient waits for a connection to be finished thus acting in a blocking fashion. In order to prevent this when many requests are being executed, we increase the limit of total connections as well as the connections per route (a route is basically an endpoint, which also contains proxy information, not containing an URL, just hosts). On top of that an additional option has been made configurable to evict long running connections, which can potentially be reused after some time. As this requires an additional background thread, this required some changes to ensure that the httpclient is closed properly. Also the timeout for this can be configured. --- .../settings/notification-settings.asciidoc | 15 +++ .../core/LocalStateCompositeXPackPlugin.java | 10 +- .../elasticsearch/xpack/watcher/Watcher.java | 9 +- .../xpack/watcher/common/http/HttpClient.java | 19 ++- .../watcher/common/http/HttpSettings.java | 27 ++++ .../actions/webhook/WebhookActionTests.java | 6 +- .../watcher/common/http/HttpClientTests.java | 121 ++++++++++-------- .../common/http/HttpReadTimeoutTests.java | 67 +++++----- 8 files changed, 185 insertions(+), 89 deletions(-) diff --git a/x-pack/docs/en/settings/notification-settings.asciidoc b/x-pack/docs/en/settings/notification-settings.asciidoc index 7a3d832ed3451..8d38cfb88a212 100644 --- a/x-pack/docs/en/settings/notification-settings.asciidoc +++ b/x-pack/docs/en/settings/notification-settings.asciidoc @@ -63,6 +63,21 @@ request is aborted. Specifies the maximum size a HTTP response is allowed to have, defaults to `10mb`, the maximum configurable value is `50mb`. +`xpack.http.apache.evict_idle_connections`:: +A setting to configure if the internal HTTP client used in watcher should +evict connections with a background thread. Defaults to `true`. + +`xpack.http.apache.evict_idle_connections_timeout`:: +If connections should be evicted, this specifies the possible timeout. +Defaults to `2m`. + +`xpack.http.apache.max_conn_total`:: +The number of total open connections in parallel. Defaults to `100`. + +`xpack.http.apache.max_conn_total_per_route`:: +The number of open connections per the same route in parallel. Defaults +to the `xpack.http.apache.max_conn_total` setting. + [[ssl-notification-settings]] :ssl-prefix: xpack.http :component: {watcher} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 0becdffb7ea7d..bfcd1cc08891d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -38,6 +38,7 @@ import org.elasticsearch.ingest.Processor; import org.elasticsearch.license.LicenseService; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.AnalysisPlugin; import org.elasticsearch.plugins.ClusterPlugin; @@ -57,9 +58,9 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.watcher.ResourceWatcherService; -import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.xpack.core.ssl.SSLService; +import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; @@ -391,6 +392,13 @@ public List> getPersistentTasksExecutor(ClusterServic .collect(toList()); } + @Override + public void close() throws IOException { + for (Plugin plugin : plugins) { + plugin.close(); + } + } + private List filterPlugins(Class type) { return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p)) .collect(Collectors.toList()); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 57fcff7671518..8ce64ad7f56e0 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexModule; @@ -216,6 +217,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin { private static final Logger logger = Loggers.getLogger(Watcher.class); private WatcherIndexingListener listener; + private HttpClient httpClient; protected final Settings settings; protected final boolean transportClient; @@ -266,7 +268,7 @@ public Collection createComponents(Client client, ClusterService cluster // TODO: add more auth types, or remove this indirection HttpAuthRegistry httpAuthRegistry = new HttpAuthRegistry(httpAuthFactories); HttpRequestTemplate.Parser httpTemplateParser = new HttpRequestTemplate.Parser(httpAuthRegistry); - final HttpClient httpClient = new HttpClient(settings, httpAuthRegistry, getSslService()); + httpClient = new HttpClient(settings, httpAuthRegistry, getSslService()); // notification EmailService emailService = new EmailService(settings, cryptoService, clusterService.getClusterSettings()); @@ -608,4 +610,9 @@ public List getBootstrapChecks() { public List getContexts() { return Arrays.asList(Watcher.SCRIPT_SEARCH_CONTEXT, Watcher.SCRIPT_EXECUTABLE_CONTEXT, Watcher.SCRIPT_TEMPLATE_CONTEXT); } + + @Override + public void close() throws IOException { + IOUtils.closeWhileHandlingException(httpClient); + } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpClient.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpClient.java index 729696ffa3518..3abefc0ac37ed 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpClient.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpClient.java @@ -46,6 +46,7 @@ import javax.net.ssl.HostnameVerifier; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -55,8 +56,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; -public class HttpClient extends AbstractComponent { +public class HttpClient extends AbstractComponent implements Closeable { private static final String SETTINGS_SSL_PREFIX = "xpack.http.ssl."; @@ -84,6 +86,16 @@ public HttpClient(Settings settings, HttpAuthRegistry httpAuthRegistry, SSLServi SSLConnectionSocketFactory factory = new SSLConnectionSocketFactory(sslService.sslSocketFactory(sslSettings), verifier); clientBuilder.setSSLSocketFactory(factory); + if (HttpSettings.APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS.get(settings)) { + clientBuilder.evictExpiredConnections(); + TimeValue timeout = HttpSettings.APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS_TIMEOUT.get(settings); + clientBuilder.evictIdleConnections(timeout.millis(), TimeUnit.MILLISECONDS); + } + int maxConnectionsPerRoute = HttpSettings.APACHE_HTTP_CLIENT_MAX_CONN_PER_ROUTE.get(settings); + clientBuilder.setMaxConnPerRoute(maxConnectionsPerRoute ); + int maxConnectionsTotal = HttpSettings.APACHE_HTTP_CLIENT_MAX_CONN_TOTAL.get(settings); + clientBuilder.setMaxConnTotal(maxConnectionsTotal); + client = clientBuilder.build(); } @@ -251,6 +263,11 @@ private URI createURI(HttpRequest request) { } } + @Override + public void close() throws IOException { + client.close(); + } + /** * Helper class to have all HTTP methods except HEAD allow for an body, including GET */ diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpSettings.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpSettings.java index f4f97df1d4fd8..4a84ec85173cd 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpSettings.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpSettings.java @@ -28,6 +28,29 @@ public class HttpSettings { static final Setting CONNECTION_TIMEOUT = Setting.timeSetting("xpack.http.default_connection_timeout", DEFAULT_CONNECTION_TIMEOUT, Property.NodeScope); + + // these are very apache http client specific settings, which only apply to how the apache http client is working + // keep them in their own namespace, so that we could for example switch to the new java http client to get rid + // of another dependency in the future + // more information https://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html + + // should idle connections be evicted? This will start an additional thread doing this + static final Setting APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS = + Setting.boolSetting("xpack.http.apache.evict_idle_connections", false, Property.NodeScope); + // what is the timeout for evicting idle connections + // this prevents form many connections being open due to the pooled client + // this value resembles the default set in org.apache.http.impl.client.HttpClientBuilder.build() + static final Setting APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS_TIMEOUT = + Setting.timeSetting("xpack.http.apache.evict_idle_connections_timeout", TimeValue.timeValueSeconds(10), Property.NodeScope); + // how many total connections should the http client be able to keep open at once + static final Setting APACHE_HTTP_CLIENT_MAX_CONN_TOTAL = + Setting.intSetting("xpack.http.apache.max_conn_total", 100, 1, Property.NodeScope); + // how many total connections per route should the http client be able to keep open at once + // this for example defines how often a user is able to poll the same _search endpoint of a remote cluster, which is + // also the reason why this is set to the same value than the total connections + static final Setting APACHE_HTTP_CLIENT_MAX_CONN_PER_ROUTE = + Setting.intSetting("xpack.http.apache.max_conn_total_per_route", APACHE_HTTP_CLIENT_MAX_CONN_TOTAL, 1, Property.NodeScope); + private static final String PROXY_HOST_KEY = "xpack.http.proxy.host"; private static final String PROXY_PORT_KEY = "xpack.http.proxy.port"; private static final String PROXY_SCHEME_KEY = "xpack.http.proxy.scheme"; @@ -54,6 +77,10 @@ public static List> getSettings() { settings.add(PROXY_PORT); settings.add(PROXY_SCHEME); settings.add(MAX_HTTP_RESPONSE_SIZE); + settings.add(APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS); + settings.add(APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS_TIMEOUT); + settings.add(APACHE_HTTP_CLIENT_MAX_CONN_TOTAL); + settings.add(APACHE_HTTP_CLIENT_MAX_CONN_PER_ROUTE); return settings; } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookActionTests.java index f57f65f1d6204..09ca57c1708f7 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookActionTests.java @@ -44,7 +44,6 @@ import org.junit.Before; import javax.mail.internet.AddressException; - import java.io.IOException; import java.util.Map; @@ -219,10 +218,9 @@ private WebhookActionFactory webhookFactory(HttpClient client) { public void testThatSelectingProxyWorks() throws Exception { Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build()); - HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry, - new SSLService(environment.settings(), environment)); - try (MockWebServer proxyServer = new MockWebServer()) { + try (HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry, + new SSLService(environment.settings(), environment)); MockWebServer proxyServer = new MockWebServer()) { proxyServer.start(); proxyServer.enqueue(new MockResponse().setResponseCode(200).setBody("fullProxiedContent")); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpClientTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpClientTests.java index 2a02c5300bded..c63df9ef3301a 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpClientTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpClientTests.java @@ -71,12 +71,27 @@ public class HttpClientTests extends ESTestCase { public void init() throws Exception { authRegistry = new HttpAuthRegistry(singletonMap(BasicAuth.TYPE, new BasicAuthFactory(null))); webServer.start(); - httpClient = new HttpClient(Settings.EMPTY, authRegistry, new SSLService(environment.settings(), environment)); + Settings.Builder settingsBuilder = Settings.builder(); + if (randomBoolean()) { + settingsBuilder.put(HttpSettings.APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS.getKey(), randomBoolean()); + } + if (randomBoolean()) { + settingsBuilder.put(HttpSettings.APACHE_HTTP_CLIENT_MAX_CONN_TOTAL.getKey(), randomIntBetween(1, 100)); + } + if (randomBoolean()) { + settingsBuilder.put(HttpSettings.APACHE_HTTP_CLIENT_MAX_CONN_PER_ROUTE.getKey(), randomIntBetween(1, 100)); + } + if (randomBoolean()) { + settingsBuilder.put(HttpSettings.APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS_TIMEOUT.getKey(), + TimeValue.timeValueSeconds(randomIntBetween(5, 10))); + } + httpClient = new HttpClient(settingsBuilder.build(), authRegistry, new SSLService(environment.settings(), environment)); } @After public void shutdown() throws Exception { webServer.close(); + httpClient.close(); } public void testBasics() throws Exception { @@ -184,17 +199,18 @@ public void testHttps() throws Exception { .setSecureSettings(secureSettings) .build(); } - httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment)); - secureSettings = new MockSecureSettings(); - // We can't use the client created above for the server since it is only a truststore - secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode"); - Settings settings2 = Settings.builder() - .put("xpack.ssl.keystore.path", getDataPath("/org/elasticsearch/xpack/security/keystore/testnode.jks")) - .setSecureSettings(secureSettings) - .build(); + try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) { + secureSettings = new MockSecureSettings(); + // We can't use the client created above for the server since it is only a truststore + secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode"); + Settings settings2 = Settings.builder() + .put("xpack.ssl.keystore.path", getDataPath("/org/elasticsearch/xpack/security/keystore/testnode.jks")) + .setSecureSettings(secureSettings) + .build(); - TestsSSLService sslService = new TestsSSLService(settings2, environment); - testSslMockWebserver(sslService.sslContext(), false); + TestsSSLService sslService = new TestsSSLService(settings2, environment); + testSslMockWebserver(client, sslService.sslContext(), false); + } } public void testHttpsDisableHostnameVerification() throws Exception { @@ -217,18 +233,19 @@ public void testHttpsDisableHostnameVerification() throws Exception { .setSecureSettings(secureSettings) .build(); } - httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment)); - MockSecureSettings secureSettings = new MockSecureSettings(); - // We can't use the client created above for the server since it only defines a truststore - secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode-no-subjaltname"); - Settings settings2 = Settings.builder() - .put("xpack.ssl.keystore.path", - getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode-no-subjaltname.jks")) - .setSecureSettings(secureSettings) - .build(); + try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) { + MockSecureSettings secureSettings = new MockSecureSettings(); + // We can't use the client created above for the server since it only defines a truststore + secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode-no-subjaltname"); + Settings settings2 = Settings.builder() + .put("xpack.ssl.keystore.path", + getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode-no-subjaltname.jks")) + .setSecureSettings(secureSettings) + .build(); - TestsSSLService sslService = new TestsSSLService(settings2, environment); - testSslMockWebserver(sslService.sslContext(), false); + TestsSSLService sslService = new TestsSSLService(settings2, environment); + testSslMockWebserver(client, sslService.sslContext(), false); + } } public void testHttpsClientAuth() throws Exception { @@ -241,11 +258,12 @@ public void testHttpsClientAuth() throws Exception { .build(); TestsSSLService sslService = new TestsSSLService(settings, environment); - httpClient = new HttpClient(settings, authRegistry, sslService); - testSslMockWebserver(sslService.sslContext(), true); + try (HttpClient client = new HttpClient(settings, authRegistry, sslService)) { + testSslMockWebserver(client, sslService.sslContext(), true); + } } - private void testSslMockWebserver(SSLContext sslContext, boolean needClientAuth) throws IOException { + private void testSslMockWebserver(HttpClient client, SSLContext sslContext, boolean needClientAuth) throws IOException { try (MockWebServer mockWebServer = new MockWebServer(sslContext, needClientAuth)) { mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("body")); mockWebServer.start(); @@ -253,7 +271,7 @@ private void testSslMockWebserver(SSLContext sslContext, boolean needClientAuth) HttpRequest.Builder request = HttpRequest.builder("localhost", mockWebServer.getPort()) .scheme(Scheme.HTTPS) .path("/test"); - HttpResponse response = httpClient.execute(request.build()); + HttpResponse response = client.execute(request.build()); assertThat(response.status(), equalTo(200)); assertThat(response.body().utf8ToString(), equalTo("body")); @@ -288,14 +306,14 @@ public void testHttpResponseWithAnyStatusCodeCanReturnBody() throws Exception { @Network public void testHttpsWithoutTruststore() throws Exception { - HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry, new SSLService(Settings.EMPTY, environment)); - - // Known server with a valid cert from a commercial CA - HttpRequest.Builder request = HttpRequest.builder("www.elastic.co", 443).scheme(Scheme.HTTPS); - HttpResponse response = httpClient.execute(request.build()); - assertThat(response.status(), equalTo(200)); - assertThat(response.hasContent(), is(true)); - assertThat(response.body(), notNullValue()); + try (HttpClient client = new HttpClient(Settings.EMPTY, authRegistry, new SSLService(Settings.EMPTY, environment))) { + // Known server with a valid cert from a commercial CA + HttpRequest.Builder request = HttpRequest.builder("www.elastic.co", 443).scheme(Scheme.HTTPS); + HttpResponse response = client.execute(request.build()); + assertThat(response.status(), equalTo(200)); + assertThat(response.hasContent(), is(true)); + assertThat(response.body(), notNullValue()); + } } public void testThatProxyCanBeConfigured() throws Exception { @@ -307,15 +325,16 @@ public void testThatProxyCanBeConfigured() throws Exception { .put(HttpSettings.PROXY_HOST.getKey(), "localhost") .put(HttpSettings.PROXY_PORT.getKey(), proxyServer.getPort()) .build(); - HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment)); HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort()) .method(HttpMethod.GET) .path("/"); - HttpResponse response = httpClient.execute(requestBuilder.build()); - assertThat(response.status(), equalTo(200)); - assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent")); + try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) { + HttpResponse response = client.execute(requestBuilder.build()); + assertThat(response.status(), equalTo(200)); + assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent")); + } // ensure we hit the proxyServer and not the webserver assertThat(webServer.requests(), hasSize(0)); @@ -386,16 +405,16 @@ public void testProxyCanHaveDifferentSchemeThanRequest() throws Exception { .setSecureSettings(secureSettings) .build(); - HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment)); - HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort()) .method(HttpMethod.GET) .scheme(Scheme.HTTP) .path("/"); - HttpResponse response = httpClient.execute(requestBuilder.build()); - assertThat(response.status(), equalTo(200)); - assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent")); + try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) { + HttpResponse response = client.execute(requestBuilder.build()); + assertThat(response.status(), equalTo(200)); + assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent")); + } // ensure we hit the proxyServer and not the webserver assertThat(webServer.requests(), hasSize(0)); @@ -413,16 +432,17 @@ public void testThatProxyCanBeOverriddenByRequest() throws Exception { .put(HttpSettings.PROXY_PORT.getKey(), proxyServer.getPort() + 1) .put(HttpSettings.PROXY_HOST.getKey(), "https") .build(); - HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment)); HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort()) .method(HttpMethod.GET) .proxy(new HttpProxy("localhost", proxyServer.getPort(), Scheme.HTTP)) .path("/"); - HttpResponse response = httpClient.execute(requestBuilder.build()); - assertThat(response.status(), equalTo(200)); - assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent")); + try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) { + HttpResponse response = client.execute(requestBuilder.build()); + assertThat(response.status(), equalTo(200)); + assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent")); + } // ensure we hit the proxyServer and not the webserver assertThat(webServer.requests(), hasSize(0)); @@ -535,12 +555,13 @@ public void testMaxHttpResponseSize() throws Exception { Settings settings = Settings.builder() .put(HttpSettings.MAX_HTTP_RESPONSE_SIZE.getKey(), new ByteSizeValue(randomBytesLength - 1, ByteSizeUnit.BYTES)) .build(); - HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(environment.settings(), environment)); HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort()).method(HttpMethod.GET).path("/"); - IOException e = expectThrows(IOException.class, () -> httpClient.execute(requestBuilder.build())); - assertThat(e.getMessage(), startsWith("Maximum limit of")); + try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(environment.settings(), environment))) { + IOException e = expectThrows(IOException.class, () -> client.execute(requestBuilder.build())); + assertThat(e.getMessage(), startsWith("Maximum limit of")); + } } public void testThatGetRedirectIsFollowed() throws Exception { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpReadTimeoutTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpReadTimeoutTests.java index 2d134681e8b18..fa5a53f4e1da0 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpReadTimeoutTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpReadTimeoutTests.java @@ -40,66 +40,69 @@ public void cleanup() throws Exception { public void testDefaultTimeout() throws Exception { Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build()); - HttpClient httpClient = new HttpClient(Settings.EMPTY, mock(HttpAuthRegistry.class), - new SSLService(environment.settings(), environment)); - HttpRequest request = HttpRequest.builder("localhost", webServer.getPort()) .method(HttpMethod.POST) .path("/") .build(); - long start = System.nanoTime(); - expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request)); - TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start); - logger.info("http connection timed out after {}", timeout); + try (HttpClient httpClient = new HttpClient(Settings.EMPTY, mock(HttpAuthRegistry.class), + new SSLService(environment.settings(), environment))) { + long start = System.nanoTime(); + + expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request)); + TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start); + logger.info("http connection timed out after {}", timeout); - // it's supposed to be 10, but we'll give it an error margin of 2 seconds - assertThat(timeout.seconds(), greaterThan(8L)); - assertThat(timeout.seconds(), lessThan(12L)); + // it's supposed to be 10, but we'll give it an error margin of 2 seconds + assertThat(timeout.seconds(), greaterThan(8L)); + assertThat(timeout.seconds(), lessThan(12L)); + } } public void testDefaultTimeoutCustom() throws Exception { Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build()); - HttpClient httpClient = new HttpClient(Settings.builder() - .put("xpack.http.default_read_timeout", "3s").build() - , mock(HttpAuthRegistry.class), new SSLService(environment.settings(), environment)); - HttpRequest request = HttpRequest.builder("localhost", webServer.getPort()) .method(HttpMethod.POST) .path("/") .build(); - long start = System.nanoTime(); - expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request)); - TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start); - logger.info("http connection timed out after {}", timeout); + try (HttpClient httpClient = new HttpClient(Settings.builder() + .put("xpack.http.default_read_timeout", "3s").build() + , mock(HttpAuthRegistry.class), new SSLService(environment.settings(), environment))) { - // it's supposed to be 3, but we'll give it an error margin of 2 seconds - assertThat(timeout.seconds(), greaterThan(1L)); - assertThat(timeout.seconds(), lessThan(5L)); + long start = System.nanoTime(); + expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request)); + TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start); + logger.info("http connection timed out after {}", timeout); + + // it's supposed to be 3, but we'll give it an error margin of 2 seconds + assertThat(timeout.seconds(), greaterThan(1L)); + assertThat(timeout.seconds(), lessThan(5L)); + } } public void testTimeoutCustomPerRequest() throws Exception { Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build()); - HttpClient httpClient = new HttpClient(Settings.builder() - .put("xpack.http.default_read_timeout", "10s").build() - , mock(HttpAuthRegistry.class), new SSLService(environment.settings(), environment)); - HttpRequest request = HttpRequest.builder("localhost", webServer.getPort()) .readTimeout(TimeValue.timeValueSeconds(3)) .method(HttpMethod.POST) .path("/") .build(); - long start = System.nanoTime(); - expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request)); - TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start); - logger.info("http connection timed out after {}", timeout); + try (HttpClient httpClient = new HttpClient(Settings.builder() + .put("xpack.http.default_read_timeout", "10s").build() + , mock(HttpAuthRegistry.class), new SSLService(environment.settings(), environment))) { + + long start = System.nanoTime(); + expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request)); + TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start); + logger.info("http connection timed out after {}", timeout); - // it's supposed to be 3, but we'll give it an error margin of 2 seconds - assertThat(timeout.seconds(), greaterThan(1L)); - assertThat(timeout.seconds(), lessThan(5L)); + // it's supposed to be 3, but we'll give it an error margin of 2 seconds + assertThat(timeout.seconds(), greaterThan(1L)); + assertThat(timeout.seconds(), lessThan(5L)); + } } } From 0c593ce4d2b5d940f1bb1816485969f59ad0813b Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 27 Apr 2018 09:29:15 +0200 Subject: [PATCH 2/6] review comment: dont make this options configurable, set it to reasonable defaults --- .../xpack/watcher/common/http/HttpClient.java | 17 +++++------- .../watcher/common/http/HttpSettings.java | 27 ------------------- .../watcher/common/http/HttpClientTests.java | 16 +---------- 3 files changed, 8 insertions(+), 52 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpClient.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpClient.java index 3abefc0ac37ed..80d12f5fbce4e 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpClient.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpClient.java @@ -56,11 +56,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; public class HttpClient extends AbstractComponent implements Closeable { private static final String SETTINGS_SSL_PREFIX = "xpack.http.ssl."; + // picking a reasonable high value here to allow for setups with lots of watch executions or many http inputs/actions + // this is also used as the value per route, if you are connecting to the same endpoint a lot, which is likely, when + // you are querying a remote Elasticsearch cluster + private static final int MAX_CONNECTIONS = 500; private final HttpAuthRegistry httpAuthRegistry; private final CloseableHttpClient client; @@ -86,15 +89,9 @@ public HttpClient(Settings settings, HttpAuthRegistry httpAuthRegistry, SSLServi SSLConnectionSocketFactory factory = new SSLConnectionSocketFactory(sslService.sslSocketFactory(sslSettings), verifier); clientBuilder.setSSLSocketFactory(factory); - if (HttpSettings.APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS.get(settings)) { - clientBuilder.evictExpiredConnections(); - TimeValue timeout = HttpSettings.APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS_TIMEOUT.get(settings); - clientBuilder.evictIdleConnections(timeout.millis(), TimeUnit.MILLISECONDS); - } - int maxConnectionsPerRoute = HttpSettings.APACHE_HTTP_CLIENT_MAX_CONN_PER_ROUTE.get(settings); - clientBuilder.setMaxConnPerRoute(maxConnectionsPerRoute ); - int maxConnectionsTotal = HttpSettings.APACHE_HTTP_CLIENT_MAX_CONN_TOTAL.get(settings); - clientBuilder.setMaxConnTotal(maxConnectionsTotal); + clientBuilder.evictExpiredConnections(); + clientBuilder.setMaxConnPerRoute(MAX_CONNECTIONS); + clientBuilder.setMaxConnTotal(MAX_CONNECTIONS); client = clientBuilder.build(); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpSettings.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpSettings.java index 4a84ec85173cd..f4f97df1d4fd8 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpSettings.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpSettings.java @@ -28,29 +28,6 @@ public class HttpSettings { static final Setting CONNECTION_TIMEOUT = Setting.timeSetting("xpack.http.default_connection_timeout", DEFAULT_CONNECTION_TIMEOUT, Property.NodeScope); - - // these are very apache http client specific settings, which only apply to how the apache http client is working - // keep them in their own namespace, so that we could for example switch to the new java http client to get rid - // of another dependency in the future - // more information https://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html - - // should idle connections be evicted? This will start an additional thread doing this - static final Setting APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS = - Setting.boolSetting("xpack.http.apache.evict_idle_connections", false, Property.NodeScope); - // what is the timeout for evicting idle connections - // this prevents form many connections being open due to the pooled client - // this value resembles the default set in org.apache.http.impl.client.HttpClientBuilder.build() - static final Setting APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS_TIMEOUT = - Setting.timeSetting("xpack.http.apache.evict_idle_connections_timeout", TimeValue.timeValueSeconds(10), Property.NodeScope); - // how many total connections should the http client be able to keep open at once - static final Setting APACHE_HTTP_CLIENT_MAX_CONN_TOTAL = - Setting.intSetting("xpack.http.apache.max_conn_total", 100, 1, Property.NodeScope); - // how many total connections per route should the http client be able to keep open at once - // this for example defines how often a user is able to poll the same _search endpoint of a remote cluster, which is - // also the reason why this is set to the same value than the total connections - static final Setting APACHE_HTTP_CLIENT_MAX_CONN_PER_ROUTE = - Setting.intSetting("xpack.http.apache.max_conn_total_per_route", APACHE_HTTP_CLIENT_MAX_CONN_TOTAL, 1, Property.NodeScope); - private static final String PROXY_HOST_KEY = "xpack.http.proxy.host"; private static final String PROXY_PORT_KEY = "xpack.http.proxy.port"; private static final String PROXY_SCHEME_KEY = "xpack.http.proxy.scheme"; @@ -77,10 +54,6 @@ public static List> getSettings() { settings.add(PROXY_PORT); settings.add(PROXY_SCHEME); settings.add(MAX_HTTP_RESPONSE_SIZE); - settings.add(APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS); - settings.add(APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS_TIMEOUT); - settings.add(APACHE_HTTP_CLIENT_MAX_CONN_TOTAL); - settings.add(APACHE_HTTP_CLIENT_MAX_CONN_PER_ROUTE); return settings; } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpClientTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpClientTests.java index c63df9ef3301a..10618b36e8ae9 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpClientTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpClientTests.java @@ -71,21 +71,7 @@ public class HttpClientTests extends ESTestCase { public void init() throws Exception { authRegistry = new HttpAuthRegistry(singletonMap(BasicAuth.TYPE, new BasicAuthFactory(null))); webServer.start(); - Settings.Builder settingsBuilder = Settings.builder(); - if (randomBoolean()) { - settingsBuilder.put(HttpSettings.APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS.getKey(), randomBoolean()); - } - if (randomBoolean()) { - settingsBuilder.put(HttpSettings.APACHE_HTTP_CLIENT_MAX_CONN_TOTAL.getKey(), randomIntBetween(1, 100)); - } - if (randomBoolean()) { - settingsBuilder.put(HttpSettings.APACHE_HTTP_CLIENT_MAX_CONN_PER_ROUTE.getKey(), randomIntBetween(1, 100)); - } - if (randomBoolean()) { - settingsBuilder.put(HttpSettings.APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS_TIMEOUT.getKey(), - TimeValue.timeValueSeconds(randomIntBetween(5, 10))); - } - httpClient = new HttpClient(settingsBuilder.build(), authRegistry, new SSLService(environment.settings(), environment)); + httpClient = new HttpClient(Settings.EMPTY, authRegistry, new SSLService(environment.settings(), environment)); } @After From 37f4a8686f8cb99a395c1ee5c8528989a006ac06 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 27 Apr 2018 09:30:25 +0200 Subject: [PATCH 3/6] remove docs --- .../en/settings/notification-settings.asciidoc | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/x-pack/docs/en/settings/notification-settings.asciidoc b/x-pack/docs/en/settings/notification-settings.asciidoc index 8d38cfb88a212..7a3d832ed3451 100644 --- a/x-pack/docs/en/settings/notification-settings.asciidoc +++ b/x-pack/docs/en/settings/notification-settings.asciidoc @@ -63,21 +63,6 @@ request is aborted. Specifies the maximum size a HTTP response is allowed to have, defaults to `10mb`, the maximum configurable value is `50mb`. -`xpack.http.apache.evict_idle_connections`:: -A setting to configure if the internal HTTP client used in watcher should -evict connections with a background thread. Defaults to `true`. - -`xpack.http.apache.evict_idle_connections_timeout`:: -If connections should be evicted, this specifies the possible timeout. -Defaults to `2m`. - -`xpack.http.apache.max_conn_total`:: -The number of total open connections in parallel. Defaults to `100`. - -`xpack.http.apache.max_conn_total_per_route`:: -The number of open connections per the same route in parallel. Defaults -to the `xpack.http.apache.max_conn_total` setting. - [[ssl-notification-settings]] :ssl-prefix: xpack.http :component: {watcher} From 94ee2f0c0ccfbc5c22034910d0d569e4fb1ece77 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 2 May 2018 18:11:44 +0200 Subject: [PATCH 4/6] Add changelog entry --- docs/CHANGELOG.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 7fc5c48d73acd..35abec6d10624 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -90,6 +90,8 @@ option. ({pull}30140[#29658]) Added new "Request" object flavored request methods. Prefer these instead of the multi-argument versions. ({pull}29623[#29623]) +Watcher HTTP client used in watches now allows more parallel connections to the +same endpoint and evicts long running connections. ({pull}30130[#30130]) [float] === Bug Fixes From 69cbb956b909b0e1f167b1c7feb4c1abf4150433 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 2 May 2018 23:20:38 +0200 Subject: [PATCH 5/6] use IOUtils.close --- .../xpack/core/LocalStateCompositeXPackPlugin.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index bfcd1cc08891d..44fd61e1693ad 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.http.HttpServerTransport; @@ -394,9 +395,7 @@ public List> getPersistentTasksExecutor(ClusterServic @Override public void close() throws IOException { - for (Plugin plugin : plugins) { - plugin.close(); - } + IOUtils.close(plugins); } private List filterPlugins(Class type) { From 0a9443a66ad515d962ae4694842b431899f7860e Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Thu, 3 May 2018 11:38:32 +0200 Subject: [PATCH 6/6] add empty line to release notes --- docs/CHANGELOG.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 2f014f404ada0..fb5b426c2b353 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -98,6 +98,7 @@ multi-argument versions. ({pull}29623[#29623]) Watcher HTTP client used in watches now allows more parallel connections to the same endpoint and evicts long running connections. ({pull}30130[#30130]) + The cluster state listener to decide if watcher should be stopped/started/paused now runs far less code in an executor but is more synchronous and predictable. Also the trigger engine thread is only started on