Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/reference/settings/ml-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,11 @@ cluster and the job is assigned to run on that node.
IMPORTANT: This setting assumes some external process is capable of adding ML nodes
to the cluster. This setting is only useful when used in conjunction with
such an external process.

`xpack.ml.process_connect_timeout` (<<cluster-update-settings,Dynamic>>)::
The connection timeout for {ml} processes that run separately from the {es} JVM.
Defaults to `10s`. Some {ml} processing is done by processes that run separately
to the {es} JVM. When such processes are started they must connect to the {es}
JVM. If such a process does not connect within the time period specified by this
setting then the process is assumed to have failed. Defaults to `10s`. The minimum
value for this setting is `5s`.
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope);

public static final Setting<TimeValue> PROCESS_CONNECT_TIMEOUT =
Setting.timeSetting("xpack.ml.process_connect_timeout", TimeValue.timeValueSeconds(10),
TimeValue.timeValueSeconds(5), Setting.Property.Dynamic, Setting.Property.NodeScope);

// Undocumented setting for integration test purposes
public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP =
Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope);
Expand Down Expand Up @@ -363,6 +367,7 @@ public static boolean isMlNode(DiscoveryNode node) {
public List<Setting<?>> getSettings() {
return List.of(
MachineLearningField.AUTODETECT_PROCESS,
PROCESS_CONNECT_TIMEOUT,
ML_ENABLED,
CONCURRENT_JOB_ALLOCATIONS,
MachineLearningField.MAX_MODEL_MEMORY_LIMIT,
Expand Down Expand Up @@ -477,8 +482,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
nativeController,
client,
clusterService);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController);
analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService);
analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController, clusterService);
mlController = nativeController;
} catch (IOException e) {
// The low level cause of failure from the named pipe helper's perspective is almost never the real root cause, so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
Expand All @@ -28,14 +31,21 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory {
private static final Logger LOGGER = LogManager.getLogger(NativeAnalyticsProcessFactory.class);

private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);

private final Environment env;
private final NativeController nativeController;
private volatile Duration processConnectTimeout;

public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController) {
public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
this::setProcessConnectTimeout);
}

void setProcessConnectTimeout(TimeValue processConnectTimeout) {
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
}

@Override
Expand Down Expand Up @@ -74,7 +84,7 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP
filesToDelete);
try {
analyticsBuilder.build();
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
processPipes.connectStreams(processConnectTimeout);
} catch (IOException e) {
String msg = "Failed to launch data frame analytics process for job " + jobId;
LOGGER.error(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -37,13 +38,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory

private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcessFactory.class);
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);

private final Client client;
private final Environment env;
private final Settings settings;
private final NativeController nativeController;
private final ClusterService clusterService;
private volatile Duration processConnectTimeout;

public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client,
ClusterService clusterService) {
Expand All @@ -52,6 +53,13 @@ public NativeAutodetectProcessFactory(Environment env, Settings settings, Native
this.nativeController = Objects.requireNonNull(nativeController);
this.client = client;
this.clusterService = clusterService;
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
this::setProcessConnectTimeout);
}

void setProcessConnectTimeout(TimeValue processConnectTimeout) {
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
}

@Override
Expand Down Expand Up @@ -88,8 +96,8 @@ public AutodetectProcess createAutodetectProcess(Job job,
}
}

private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
List<Path> filesToDelete) {
void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
List<Path> filesToDelete) {
try {

Settings updatedSettings = Settings.builder()
Expand All @@ -109,7 +117,7 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro
autodetectBuilder.quantiles(autodetectParams.quantiles());
}
autodetectBuilder.build();
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
processPipes.connectStreams(processConnectTimeout);
} catch (IOException e) {
String msg = "Failed to launch autodetect for job " + job.getId();
LOGGER.error(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
Expand All @@ -25,14 +28,21 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory

private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class);
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);

private final Environment env;
private final NativeController nativeController;
private volatile Duration processConnectTimeout;

public NativeNormalizerProcessFactory(Environment env, NativeController nativeController) {
public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
this::setProcessConnectTimeout);
}

void setProcessConnectTimeout(TimeValue processConnectTimeout) {
this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
}

@Override
Expand Down Expand Up @@ -64,7 +74,7 @@ private void createNativeProcess(String jobId, String quantilesState, ProcessPip
List<String> command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build();
processPipes.addArgs(command);
nativeController.startProcess(command);
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
processPipes.connectStreams(processConnectTimeout);
} catch (IOException e) {
String msg = "Failed to launch normalizer for job " + jobId;
LOGGER.error(msg);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Set;

import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class NativeAutodetectProcessFactoryTests extends ESTestCase {

public void testSetProcessConnectTimeout() throws IOException {

int timeoutSeconds = randomIntBetween(5, 100);

Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build();
Environment env = TestEnvironment.newEnvironment(settings);
NativeController nativeController = mock(NativeController.class);
Client client = mock(Client.class);
ClusterSettings clusterSettings = new ClusterSettings(settings,
Set.of(MachineLearning.PROCESS_CONNECT_TIMEOUT, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC));
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
Job job = mock(Job.class);
when(job.getId()).thenReturn("set_process_connect_test_job");
AutodetectParams autodetectParams = mock(AutodetectParams.class);
ProcessPipes processPipes = mock(ProcessPipes.class);

NativeAutodetectProcessFactory nativeAutodetectProcessFactory =
new NativeAutodetectProcessFactory(env, settings, nativeController, client, clusterService);
nativeAutodetectProcessFactory.setProcessConnectTimeout(TimeValue.timeValueSeconds(timeoutSeconds));
nativeAutodetectProcessFactory.createNativeProcess(job, autodetectParams, processPipes, Collections.emptyList());

verify(processPipes, times(1)).connectStreams(eq(Duration.ofSeconds(timeoutSeconds)));
}
}