Skip to content

Commit a9b9441

Browse files
author
David Roberts
committed
[ML] Introduce a setting for the process connect timeout (#43234)
This change introduces a new setting, xpack.ml.process_connect_timeout, to enable the timeout for one of the external ML processes to connect to the ES JVM to be increased. The timeout may need to be increased if many processes are being started simultaneously on the same machine. This is unlikely in clusters with many ML nodes, as we balance the processes across the ML nodes, but can happen in clusters with a single ML node and a high value for xpack.ml.node_concurrent_job_allocations.
1 parent 69a1d49 commit a9b9441

File tree

5 files changed

+100
-8
lines changed

5 files changed

+100
-8
lines changed

docs/reference/settings/ml-settings.asciidoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,11 @@ cluster and the job is assigned to run on that node.
109109
IMPORTANT: This setting assumes some external process is capable of adding ML nodes
110110
to the cluster. This setting is only useful when used in conjunction with
111111
such an external process.
112+
113+
`xpack.ml.process_connect_timeout` (<<cluster-update-settings,Dynamic>>)::
114+
The connection timeout for {ml} processes that run separately from the {es} JVM.
115+
Defaults to `10s`. Some {ml} processing is done by processes that run separately
116+
to the {es} JVM. When such processes are started they must connect to the {es}
117+
JVM. If such a process does not connect within the time period specified by this
118+
setting then the process is assumed to have failed. Defaults to `10s`. The minimum
119+
value for this setting is `5s`.

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
296296
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
297297
Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope);
298298

299+
public static final Setting<TimeValue> PROCESS_CONNECT_TIMEOUT =
300+
Setting.timeSetting("xpack.ml.process_connect_timeout", TimeValue.timeValueSeconds(10),
301+
TimeValue.timeValueSeconds(5), Setting.Property.Dynamic, Setting.Property.NodeScope);
302+
299303
// Undocumented setting for integration test purposes
300304
public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP =
301305
Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope);
@@ -332,6 +336,7 @@ public static boolean isMlNode(DiscoveryNode node) {
332336
public List<Setting<?>> getSettings() {
333337
return Collections.unmodifiableList(
334338
Arrays.asList(MachineLearningField.AUTODETECT_PROCESS,
339+
PROCESS_CONNECT_TIMEOUT,
335340
ML_ENABLED,
336341
CONCURRENT_JOB_ALLOCATIONS,
337342
MachineLearningField.MAX_MODEL_MEMORY_LIMIT,
@@ -448,7 +453,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
448453
nativeController,
449454
client,
450455
clusterService);
451-
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController);
456+
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService);
452457
} catch (IOException e) {
453458
// The low level cause of failure from the named pipe helper's perspective is almost never the real root cause, so
454459
// only log this at the lowest level of detail. It's almost always "file not found" on a named pipe we expect to be

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.client.Client;
1111
import org.elasticsearch.cluster.service.ClusterService;
1212
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.common.unit.TimeValue;
1314
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1415
import org.elasticsearch.core.internal.io.IOUtils;
1516
import org.elasticsearch.env.Environment;
@@ -36,13 +37,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
3637

3738
private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcessFactory.class);
3839
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
39-
public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
4040

4141
private final Client client;
4242
private final Environment env;
4343
private final Settings settings;
4444
private final NativeController nativeController;
4545
private final ClusterService clusterService;
46+
private volatile Duration processConnectTimeout;
4647

4748
public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client,
4849
ClusterService clusterService) {
@@ -51,6 +52,13 @@ public NativeAutodetectProcessFactory(Environment env, Settings settings, Native
5152
this.nativeController = Objects.requireNonNull(nativeController);
5253
this.client = client;
5354
this.clusterService = clusterService;
55+
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(settings));
56+
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
57+
this::setProcessConnectTimeout);
58+
}
59+
60+
void setProcessConnectTimeout(TimeValue processConnectTimeout) {
61+
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
5462
}
5563

5664
@Override
@@ -87,8 +95,8 @@ public AutodetectProcess createAutodetectProcess(Job job,
8795
}
8896
}
8997

90-
private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
91-
List<Path> filesToDelete) {
98+
void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
99+
List<Path> filesToDelete) {
92100
try {
93101

94102
Settings updatedSettings = Settings.builder()
@@ -108,7 +116,7 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro
108116
autodetectBuilder.quantiles(autodetectParams.quantiles());
109117
}
110118
autodetectBuilder.build();
111-
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
119+
processPipes.connectStreams(processConnectTimeout);
112120
} catch (IOException e) {
113121
String msg = "Failed to launch autodetect for job " + job.getId();
114122
LOGGER.error(msg);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10+
import org.elasticsearch.cluster.service.ClusterService;
11+
import org.elasticsearch.common.unit.TimeValue;
1012
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1113
import org.elasticsearch.core.internal.io.IOUtils;
1214
import org.elasticsearch.env.Environment;
1315
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
16+
import org.elasticsearch.xpack.ml.MachineLearning;
1417
import org.elasticsearch.xpack.ml.process.NativeController;
1518
import org.elasticsearch.xpack.ml.process.ProcessPipes;
1619
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
@@ -25,14 +28,21 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
2528

2629
private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class);
2730
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
28-
private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
2931

3032
private final Environment env;
3133
private final NativeController nativeController;
34+
private volatile Duration processConnectTimeout;
3235

33-
public NativeNormalizerProcessFactory(Environment env, NativeController nativeController) {
36+
public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
3437
this.env = Objects.requireNonNull(env);
3538
this.nativeController = Objects.requireNonNull(nativeController);
39+
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
40+
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
41+
this::setProcessConnectTimeout);
42+
}
43+
44+
void setProcessConnectTimeout(TimeValue processConnectTimeout) {
45+
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
3646
}
3747

3848
@Override
@@ -64,7 +74,7 @@ private void createNativeProcess(String jobId, String quantilesState, ProcessPip
6474
List<String> command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build();
6575
processPipes.addArgs(command);
6676
nativeController.startProcess(command);
67-
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
77+
processPipes.connectStreams(processConnectTimeout);
6878
} catch (IOException e) {
6979
String msg = "Failed to launch normalizer for job " + jobId;
7080
LOGGER.error(msg);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.job.process.autodetect;
7+
8+
import org.elasticsearch.client.Client;
9+
import org.elasticsearch.cluster.service.ClusterService;
10+
import org.elasticsearch.common.settings.ClusterSettings;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.common.unit.TimeValue;
13+
import org.elasticsearch.common.util.set.Sets;
14+
import org.elasticsearch.env.Environment;
15+
import org.elasticsearch.env.TestEnvironment;
16+
import org.elasticsearch.test.ESTestCase;
17+
import org.elasticsearch.xpack.core.ml.job.config.Job;
18+
import org.elasticsearch.xpack.ml.MachineLearning;
19+
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
20+
import org.elasticsearch.xpack.ml.process.NativeController;
21+
import org.elasticsearch.xpack.ml.process.ProcessPipes;
22+
23+
import java.io.IOException;
24+
import java.time.Duration;
25+
import java.util.Collections;
26+
27+
import static org.mockito.Matchers.eq;
28+
import static org.mockito.Mockito.mock;
29+
import static org.mockito.Mockito.times;
30+
import static org.mockito.Mockito.verify;
31+
import static org.mockito.Mockito.when;
32+
33+
public class NativeAutodetectProcessFactoryTests extends ESTestCase {
34+
35+
public void testSetProcessConnectTimeout() throws IOException {
36+
37+
int timeoutSeconds = randomIntBetween(5, 100);
38+
39+
Settings settings = Settings.builder()
40+
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
41+
.build();
42+
Environment env = TestEnvironment.newEnvironment(settings);
43+
NativeController nativeController = mock(NativeController.class);
44+
Client client = mock(Client.class);
45+
ClusterSettings clusterSettings = new ClusterSettings(settings,
46+
Sets.newHashSet(MachineLearning.PROCESS_CONNECT_TIMEOUT, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC));
47+
ClusterService clusterService = mock(ClusterService.class);
48+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
49+
Job job = mock(Job.class);
50+
when(job.getId()).thenReturn("set_process_connect_test_job");
51+
AutodetectParams autodetectParams = mock(AutodetectParams.class);
52+
ProcessPipes processPipes = mock(ProcessPipes.class);
53+
54+
NativeAutodetectProcessFactory nativeAutodetectProcessFactory =
55+
new NativeAutodetectProcessFactory(env, settings, nativeController, client, clusterService);
56+
nativeAutodetectProcessFactory.setProcessConnectTimeout(TimeValue.timeValueSeconds(timeoutSeconds));
57+
nativeAutodetectProcessFactory.createNativeProcess(job, autodetectParams, processPipes, Collections.emptyList());
58+
59+
verify(processPipes, times(1)).connectStreams(eq(Duration.ofSeconds(timeoutSeconds)));
60+
}
61+
}

0 commit comments

Comments
 (0)