diff --git a/docs/reference/settings/ml-settings.asciidoc b/docs/reference/settings/ml-settings.asciidoc index 09fb8adad8523..91afcbe5b3466 100644 --- a/docs/reference/settings/ml-settings.asciidoc +++ b/docs/reference/settings/ml-settings.asciidoc @@ -109,3 +109,11 @@ cluster and the job is assigned to run on that node. IMPORTANT: This setting assumes some external process is capable of adding ML nodes to the cluster. This setting is only useful when used in conjunction with such an external process. + +`xpack.ml.process_connect_timeout` (<>):: +The connection timeout for {ml} processes that run separately from the {es} JVM. +Defaults to `10s`. Some {ml} processing is done by processes that run separately +to the {es} JVM. When such processes are started they must connect to the {es} +JVM. If such a process does not connect within the time period specified by this +setting then the process is assumed to have failed. Defaults to `10s`. The minimum +value for this setting is `5s`. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 031fef8c76a49..cf73ef95a2639 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -328,6 +328,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu public static final Setting MAX_OPEN_JOBS_PER_NODE = Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope); + public static final Setting PROCESS_CONNECT_TIMEOUT = + Setting.timeSetting("xpack.ml.process_connect_timeout", TimeValue.timeValueSeconds(10), + TimeValue.timeValueSeconds(5), Setting.Property.Dynamic, Setting.Property.NodeScope); + // Undocumented setting for integration test purposes public static final Setting MIN_DISK_SPACE_OFF_HEAP = Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope); @@ -363,6 +367,7 @@ public static boolean isMlNode(DiscoveryNode node) { public List> getSettings() { return List.of( MachineLearningField.AUTODETECT_PROCESS, + PROCESS_CONNECT_TIMEOUT, ML_ENABLED, CONCURRENT_JOB_ALLOCATIONS, MachineLearningField.MAX_MODEL_MEMORY_LIMIT, @@ -477,8 +482,8 @@ public Collection createComponents(Client client, ClusterService cluster nativeController, client, clusterService); - normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController); - analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController); + normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService); + analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController, clusterService); mlController = nativeController; } catch (IOException e) { // The low level cause of failure from the named pipe helper's perspective is almost never the real root cause, so diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index 70cd3615413da..e5554e3dcd01a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -7,10 +7,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; @@ -28,14 +31,21 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { private static final Logger LOGGER = LogManager.getLogger(NativeAnalyticsProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); - public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); private final Environment env; private final NativeController nativeController; + private volatile Duration processConnectTimeout; - public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController) { + public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) { this.env = Objects.requireNonNull(env); this.nativeController = Objects.requireNonNull(nativeController); + setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings())); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, + this::setProcessConnectTimeout); + } + + void setProcessConnectTimeout(TimeValue processConnectTimeout) { + this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis()); } @Override @@ -74,7 +84,7 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP filesToDelete); try { analyticsBuilder.build(); - processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); + processPipes.connectStreams(processConnectTimeout); } catch (IOException e) { String msg = "Failed to launch data frame analytics process for job " + jobId; LOGGER.error(msg); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index b4ee5da5b2b08..9987083159905 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -10,6 +10,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; @@ -37,13 +38,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); - public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); private final Client client; private final Environment env; private final Settings settings; private final NativeController nativeController; private final ClusterService clusterService; + private volatile Duration processConnectTimeout; public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client, ClusterService clusterService) { @@ -52,6 +53,13 @@ public NativeAutodetectProcessFactory(Environment env, Settings settings, Native this.nativeController = Objects.requireNonNull(nativeController); this.client = client; this.clusterService = clusterService; + setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(settings)); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, + this::setProcessConnectTimeout); + } + + void setProcessConnectTimeout(TimeValue processConnectTimeout) { + this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis()); } @Override @@ -88,8 +96,8 @@ public AutodetectProcess createAutodetectProcess(Job job, } } - private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes, - List filesToDelete) { + void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes, + List filesToDelete) { try { Settings updatedSettings = Settings.builder() @@ -109,7 +117,7 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro autodetectBuilder.quantiles(autodetectParams.quantiles()); } autodetectBuilder.build(); - processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); + processPipes.connectStreams(processConnectTimeout); } catch (IOException e) { String msg = "Failed to launch autodetect for job " + job.getId(); LOGGER.error(msg); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index aa4d7bd6b78e5..0022a44f42fb9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -7,10 +7,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; @@ -25,14 +28,21 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); - private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); private final Environment env; private final NativeController nativeController; + private volatile Duration processConnectTimeout; - public NativeNormalizerProcessFactory(Environment env, NativeController nativeController) { + public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) { this.env = Objects.requireNonNull(env); this.nativeController = Objects.requireNonNull(nativeController); + setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings())); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, + this::setProcessConnectTimeout); + } + + void setProcessConnectTimeout(TimeValue processConnectTimeout) { + this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis()); } @Override @@ -64,7 +74,7 @@ private void createNativeProcess(String jobId, String quantilesState, ProcessPip List command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build(); processPipes.addArgs(command); nativeController.startProcess(command); - processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); + processPipes.connectStreams(processConnectTimeout); } catch (IOException e) { String msg = "Failed to launch normalizer for job " + jobId; LOGGER.error(msg); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactoryTests.java new file mode 100644 index 0000000000000..f486823be7b40 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactoryTests.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; +import org.elasticsearch.xpack.ml.process.NativeController; +import org.elasticsearch.xpack.ml.process.ProcessPipes; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.Set; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class NativeAutodetectProcessFactoryTests extends ESTestCase { + + public void testSetProcessConnectTimeout() throws IOException { + + int timeoutSeconds = randomIntBetween(5, 100); + + Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .build(); + Environment env = TestEnvironment.newEnvironment(settings); + NativeController nativeController = mock(NativeController.class); + Client client = mock(Client.class); + ClusterSettings clusterSettings = new ClusterSettings(settings, + Set.of(MachineLearning.PROCESS_CONNECT_TIMEOUT, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC)); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + Job job = mock(Job.class); + when(job.getId()).thenReturn("set_process_connect_test_job"); + AutodetectParams autodetectParams = mock(AutodetectParams.class); + ProcessPipes processPipes = mock(ProcessPipes.class); + + NativeAutodetectProcessFactory nativeAutodetectProcessFactory = + new NativeAutodetectProcessFactory(env, settings, nativeController, client, clusterService); + nativeAutodetectProcessFactory.setProcessConnectTimeout(TimeValue.timeValueSeconds(timeoutSeconds)); + nativeAutodetectProcessFactory.createNativeProcess(job, autodetectParams, processPipes, Collections.emptyList()); + + verify(processPipes, times(1)).connectStreams(eq(Duration.ofSeconds(timeoutSeconds))); + } +}