Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watcher: Configure HttpClient parallel sent requests #30130

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ option. ({pull}30140[#29658])
Added new "Request" object flavored request methods. Prefer these instead of the
multi-argument versions. ({pull}29623[#29623])

Watcher HTTP client used in watches now allows more parallel connections to the
same endpoint and evicts long running connections. ({pull}30130[#30130])

The cluster state listener to decide if watcher should be
stopped/started/paused now runs far less code in an executor but is more
synchronous and predictable. Also the trigger engine thread is only started on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.http.HttpServerTransport;
Expand All @@ -38,6 +39,7 @@
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.ClusterPlugin;
Expand All @@ -57,9 +59,9 @@
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.xpack.core.ssl.SSLService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -391,6 +393,11 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
.collect(toList());
}

@Override
public void close() throws IOException {
IOUtils.close(plugins);
}

private <T> List<T> filterPlugins(Class<T> type) {
return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexModule;
Expand Down Expand Up @@ -216,6 +217,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {

private static final Logger logger = Loggers.getLogger(Watcher.class);
private WatcherIndexingListener listener;
private HttpClient httpClient;

protected final Settings settings;
protected final boolean transportClient;
Expand Down Expand Up @@ -266,7 +268,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
// TODO: add more auth types, or remove this indirection
HttpAuthRegistry httpAuthRegistry = new HttpAuthRegistry(httpAuthFactories);
HttpRequestTemplate.Parser httpTemplateParser = new HttpRequestTemplate.Parser(httpAuthRegistry);
final HttpClient httpClient = new HttpClient(settings, httpAuthRegistry, getSslService());
httpClient = new HttpClient(settings, httpAuthRegistry, getSslService());

// notification
EmailService emailService = new EmailService(settings, cryptoService, clusterService.getClusterSettings());
Expand Down Expand Up @@ -608,4 +610,9 @@ public List<BootstrapCheck> getBootstrapChecks() {
public List<ScriptContext> getContexts() {
return Arrays.asList(Watcher.SCRIPT_SEARCH_CONTEXT, Watcher.SCRIPT_EXECUTABLE_CONTEXT, Watcher.SCRIPT_TEMPLATE_CONTEXT);
}

@Override
public void close() throws IOException {
IOUtils.closeWhileHandlingException(httpClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import javax.net.ssl.HostnameVerifier;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -56,9 +57,13 @@
import java.util.List;
import java.util.Map;

public class HttpClient extends AbstractComponent {
public class HttpClient extends AbstractComponent implements Closeable {

private static final String SETTINGS_SSL_PREFIX = "xpack.http.ssl.";
// picking a reasonable high value here to allow for setups with lots of watch executions or many http inputs/actions
// this is also used as the value per route, if you are connecting to the same endpoint a lot, which is likely, when
// you are querying a remote Elasticsearch cluster
private static final int MAX_CONNECTIONS = 500;

private final HttpAuthRegistry httpAuthRegistry;
private final CloseableHttpClient client;
Expand All @@ -84,6 +89,10 @@ public HttpClient(Settings settings, HttpAuthRegistry httpAuthRegistry, SSLServi
SSLConnectionSocketFactory factory = new SSLConnectionSocketFactory(sslService.sslSocketFactory(sslSettings), verifier);
clientBuilder.setSSLSocketFactory(factory);

clientBuilder.evictExpiredConnections();
clientBuilder.setMaxConnPerRoute(MAX_CONNECTIONS);
clientBuilder.setMaxConnTotal(MAX_CONNECTIONS);

client = clientBuilder.build();
}

Expand Down Expand Up @@ -251,6 +260,11 @@ private URI createURI(HttpRequest request) {
}
}

@Override
public void close() throws IOException {
client.close();
}

/**
* Helper class to have all HTTP methods except HEAD allow for an body, including GET
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.junit.Before;

import javax.mail.internet.AddressException;

import java.io.IOException;
import java.util.Map;

Expand Down Expand Up @@ -219,10 +218,9 @@ private WebhookActionFactory webhookFactory(HttpClient client) {

public void testThatSelectingProxyWorks() throws Exception {
Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry,
new SSLService(environment.settings(), environment));

try (MockWebServer proxyServer = new MockWebServer()) {
try (HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry,
new SSLService(environment.settings(), environment)); MockWebServer proxyServer = new MockWebServer()) {
proxyServer.start();
proxyServer.enqueue(new MockResponse().setResponseCode(200).setBody("fullProxiedContent"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void init() throws Exception {
@After
public void shutdown() throws Exception {
webServer.close();
httpClient.close();
}

public void testBasics() throws Exception {
Expand Down Expand Up @@ -184,17 +185,18 @@ public void testHttps() throws Exception {
.setSecureSettings(secureSettings)
.build();
}
httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
secureSettings = new MockSecureSettings();
// We can't use the client created above for the server since it is only a truststore
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode");
Settings settings2 = Settings.builder()
.put("xpack.ssl.keystore.path", getDataPath("/org/elasticsearch/xpack/security/keystore/testnode.jks"))
.setSecureSettings(secureSettings)
.build();
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
secureSettings = new MockSecureSettings();
// We can't use the client created above for the server since it is only a truststore
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode");
Settings settings2 = Settings.builder()
.put("xpack.ssl.keystore.path", getDataPath("/org/elasticsearch/xpack/security/keystore/testnode.jks"))
.setSecureSettings(secureSettings)
.build();

TestsSSLService sslService = new TestsSSLService(settings2, environment);
testSslMockWebserver(sslService.sslContext(), false);
TestsSSLService sslService = new TestsSSLService(settings2, environment);
testSslMockWebserver(client, sslService.sslContext(), false);
}
}

public void testHttpsDisableHostnameVerification() throws Exception {
Expand All @@ -217,18 +219,19 @@ public void testHttpsDisableHostnameVerification() throws Exception {
.setSecureSettings(secureSettings)
.build();
}
httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
MockSecureSettings secureSettings = new MockSecureSettings();
// We can't use the client created above for the server since it only defines a truststore
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode-no-subjaltname");
Settings settings2 = Settings.builder()
.put("xpack.ssl.keystore.path",
getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode-no-subjaltname.jks"))
.setSecureSettings(secureSettings)
.build();
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
MockSecureSettings secureSettings = new MockSecureSettings();
// We can't use the client created above for the server since it only defines a truststore
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode-no-subjaltname");
Settings settings2 = Settings.builder()
.put("xpack.ssl.keystore.path",
getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode-no-subjaltname.jks"))
.setSecureSettings(secureSettings)
.build();

TestsSSLService sslService = new TestsSSLService(settings2, environment);
testSslMockWebserver(sslService.sslContext(), false);
TestsSSLService sslService = new TestsSSLService(settings2, environment);
testSslMockWebserver(client, sslService.sslContext(), false);
}
}

public void testHttpsClientAuth() throws Exception {
Expand All @@ -241,19 +244,20 @@ public void testHttpsClientAuth() throws Exception {
.build();

TestsSSLService sslService = new TestsSSLService(settings, environment);
httpClient = new HttpClient(settings, authRegistry, sslService);
testSslMockWebserver(sslService.sslContext(), true);
try (HttpClient client = new HttpClient(settings, authRegistry, sslService)) {
testSslMockWebserver(client, sslService.sslContext(), true);
}
}

private void testSslMockWebserver(SSLContext sslContext, boolean needClientAuth) throws IOException {
private void testSslMockWebserver(HttpClient client, SSLContext sslContext, boolean needClientAuth) throws IOException {
try (MockWebServer mockWebServer = new MockWebServer(sslContext, needClientAuth)) {
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
mockWebServer.start();

HttpRequest.Builder request = HttpRequest.builder("localhost", mockWebServer.getPort())
.scheme(Scheme.HTTPS)
.path("/test");
HttpResponse response = httpClient.execute(request.build());
HttpResponse response = client.execute(request.build());
assertThat(response.status(), equalTo(200));
assertThat(response.body().utf8ToString(), equalTo("body"));

Expand Down Expand Up @@ -288,14 +292,14 @@ public void testHttpResponseWithAnyStatusCodeCanReturnBody() throws Exception {

@Network
public void testHttpsWithoutTruststore() throws Exception {
HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry, new SSLService(Settings.EMPTY, environment));

// Known server with a valid cert from a commercial CA
HttpRequest.Builder request = HttpRequest.builder("www.elastic.co", 443).scheme(Scheme.HTTPS);
HttpResponse response = httpClient.execute(request.build());
assertThat(response.status(), equalTo(200));
assertThat(response.hasContent(), is(true));
assertThat(response.body(), notNullValue());
try (HttpClient client = new HttpClient(Settings.EMPTY, authRegistry, new SSLService(Settings.EMPTY, environment))) {
// Known server with a valid cert from a commercial CA
HttpRequest.Builder request = HttpRequest.builder("www.elastic.co", 443).scheme(Scheme.HTTPS);
HttpResponse response = client.execute(request.build());
assertThat(response.status(), equalTo(200));
assertThat(response.hasContent(), is(true));
assertThat(response.body(), notNullValue());
}
}

public void testThatProxyCanBeConfigured() throws Exception {
Expand All @@ -307,15 +311,16 @@ public void testThatProxyCanBeConfigured() throws Exception {
.put(HttpSettings.PROXY_HOST.getKey(), "localhost")
.put(HttpSettings.PROXY_PORT.getKey(), proxyServer.getPort())
.build();
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));

HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
.method(HttpMethod.GET)
.path("/");

HttpResponse response = httpClient.execute(requestBuilder.build());
assertThat(response.status(), equalTo(200));
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
HttpResponse response = client.execute(requestBuilder.build());
assertThat(response.status(), equalTo(200));
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
}

// ensure we hit the proxyServer and not the webserver
assertThat(webServer.requests(), hasSize(0));
Expand Down Expand Up @@ -386,16 +391,16 @@ public void testProxyCanHaveDifferentSchemeThanRequest() throws Exception {
.setSecureSettings(secureSettings)
.build();

HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));

HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
.method(HttpMethod.GET)
.scheme(Scheme.HTTP)
.path("/");

HttpResponse response = httpClient.execute(requestBuilder.build());
assertThat(response.status(), equalTo(200));
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
HttpResponse response = client.execute(requestBuilder.build());
assertThat(response.status(), equalTo(200));
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
}

// ensure we hit the proxyServer and not the webserver
assertThat(webServer.requests(), hasSize(0));
Expand All @@ -413,16 +418,17 @@ public void testThatProxyCanBeOverriddenByRequest() throws Exception {
.put(HttpSettings.PROXY_PORT.getKey(), proxyServer.getPort() + 1)
.put(HttpSettings.PROXY_HOST.getKey(), "https")
.build();
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));

HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
.method(HttpMethod.GET)
.proxy(new HttpProxy("localhost", proxyServer.getPort(), Scheme.HTTP))
.path("/");

HttpResponse response = httpClient.execute(requestBuilder.build());
assertThat(response.status(), equalTo(200));
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
HttpResponse response = client.execute(requestBuilder.build());
assertThat(response.status(), equalTo(200));
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
}

// ensure we hit the proxyServer and not the webserver
assertThat(webServer.requests(), hasSize(0));
Expand Down Expand Up @@ -535,12 +541,13 @@ public void testMaxHttpResponseSize() throws Exception {
Settings settings = Settings.builder()
.put(HttpSettings.MAX_HTTP_RESPONSE_SIZE.getKey(), new ByteSizeValue(randomBytesLength - 1, ByteSizeUnit.BYTES))
.build();
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(environment.settings(), environment));

HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort()).method(HttpMethod.GET).path("/");

IOException e = expectThrows(IOException.class, () -> httpClient.execute(requestBuilder.build()));
assertThat(e.getMessage(), startsWith("Maximum limit of"));
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(environment.settings(), environment))) {
IOException e = expectThrows(IOException.class, () -> client.execute(requestBuilder.build()));
assertThat(e.getMessage(), startsWith("Maximum limit of"));
}
}

public void testThatGetRedirectIsFollowed() throws Exception {
Expand Down
Loading