Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.5][ML] Avoid NPE when node load is calculated on job assignment (#… #49215

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,31 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis

public static DataFrameAnalyticsState getDataFrameAnalyticsState(String analyticsId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTasksCustomMetaData.PersistentTask<?> task = getDataFrameAnalyticsTask(analyticsId, tasks);
if (task != null) {
DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) task.getState();
if (taskState == null) {
return getDataFrameAnalyticsState(task);
}

public static DataFrameAnalyticsState getDataFrameAnalyticsState(@Nullable PersistentTasksCustomMetaData.PersistentTask<?> task) {
if (task == null) {
return DataFrameAnalyticsState.STOPPED;
}
DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) task.getState();
if (taskState == null) {
return DataFrameAnalyticsState.STARTING;
}

DataFrameAnalyticsState state = taskState.getState();
if (taskState.isStatusStale(task)) {
if (state == DataFrameAnalyticsState.STOPPING) {
// previous executor node failed while the job was stopping - it won't
// be restarted on another node, so consider it STOPPED for reassignment purposes
return DataFrameAnalyticsState.STOPPED;
}
if (state != DataFrameAnalyticsState.FAILED) {
// we are relocating at the moment
return DataFrameAnalyticsState.STARTING;
}
return taskState.getState();
}
return DataFrameAnalyticsState.STOPPED;
return state;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;

import java.net.InetAddress;
import java.util.Collections;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;

public class MlTasksTests extends ESTestCase {

public void testGetJobState() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
// A missing task is a closed job
Expand Down Expand Up @@ -168,4 +173,133 @@ public void testDataFrameAnalyticsTaskIds() {
assertThat(taskId, equalTo("data_frame_analytics-foo"));
assertThat(MlTasks.dataFrameAnalyticsIdFromTaskId(taskId), equalTo("foo"));
}

public void testGetDataFrameAnalyticsState_GivenNullTask() {
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(null);
assertThat(state, equalTo(DataFrameAnalyticsState.STOPPED));
}

public void testGetDataFrameAnalyticsState_GivenTaskWithNullState() {
String jobId = "foo";
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node", null, false);

DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);

assertThat(state, equalTo(DataFrameAnalyticsState.STARTING));
}

public void testGetDataFrameAnalyticsState_GivenTaskWithStartedState() {
String jobId = "foo";
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
DataFrameAnalyticsState.STARTED, false);

DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);

assertThat(state, equalTo(DataFrameAnalyticsState.STARTED));
}

public void testGetDataFrameAnalyticsState_GivenStaleTaskWithStartedState() {
String jobId = "foo";
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
DataFrameAnalyticsState.STARTED, true);

DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);

assertThat(state, equalTo(DataFrameAnalyticsState.STARTING));
}

public void testGetDataFrameAnalyticsState_GivenTaskWithReindexingState() {
String jobId = "foo";
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
DataFrameAnalyticsState.REINDEXING, false);

DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);

assertThat(state, equalTo(DataFrameAnalyticsState.REINDEXING));
}

public void testGetDataFrameAnalyticsState_GivenStaleTaskWithReindexingState() {
String jobId = "foo";
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
DataFrameAnalyticsState.REINDEXING, true);

DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);

assertThat(state, equalTo(DataFrameAnalyticsState.STARTING));
}

public void testGetDataFrameAnalyticsState_GivenTaskWithAnalyzingState() {
String jobId = "foo";
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
DataFrameAnalyticsState.ANALYZING, false);

DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);

assertThat(state, equalTo(DataFrameAnalyticsState.ANALYZING));
}

public void testGetDataFrameAnalyticsState_GivenStaleTaskWithAnalyzingState() {
String jobId = "foo";
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
DataFrameAnalyticsState.ANALYZING, true);

DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);

assertThat(state, equalTo(DataFrameAnalyticsState.STARTING));
}

public void testGetDataFrameAnalyticsState_GivenTaskWithStoppingState() {
String jobId = "foo";
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
DataFrameAnalyticsState.STOPPING, false);

DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);

assertThat(state, equalTo(DataFrameAnalyticsState.STOPPING));
}

public void testGetDataFrameAnalyticsState_GivenStaleTaskWithStoppingState() {
String jobId = "foo";
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
DataFrameAnalyticsState.STOPPING, true);

DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);

assertThat(state, equalTo(DataFrameAnalyticsState.STOPPED));
}

public void testGetDataFrameAnalyticsState_GivenTaskWithFailedState() {
String jobId = "foo";
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
DataFrameAnalyticsState.FAILED, false);

DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);

assertThat(state, equalTo(DataFrameAnalyticsState.FAILED));
}

public void testGetDataFrameAnalyticsState_GivenStaleTaskWithFailedState() {
String jobId = "foo";
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
DataFrameAnalyticsState.FAILED, true);

DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);

assertThat(state, equalTo(DataFrameAnalyticsState.FAILED));
}

private static PersistentTasksCustomMetaData.PersistentTask<?> createDataFrameAnalyticsTask(String jobId, String nodeId,
DataFrameAnalyticsState state,
boolean isStale) {
PersistentTasksCustomMetaData.Builder builder = PersistentTasksCustomMetaData.builder();
builder.addTask(MlTasks.dataFrameAnalyticsTaskId(jobId), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
new StartDataFrameAnalyticsAction.TaskParams(jobId, Version.CURRENT, Collections.emptyList(), false),
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
if (state != null) {
builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(jobId),
new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId() - (isStale ? 1 : 0), null));
}
PersistentTasksCustomMetaData tasks = builder.build();
return tasks.getTask(MlTasks.dataFrameAnalyticsTaskId(jobId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
Expand Down Expand Up @@ -269,7 +268,7 @@ private CurrentLoad calculateCurrentLoadForNode(DiscoveryNode node, PersistentTa
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> assignedAnalyticsTasks = persistentTasks.findTasks(
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> node.getId().equals(task.getExecutorNode()));
for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedAnalyticsTasks) {
DataFrameAnalyticsState dataFrameAnalyticsState = ((DataFrameAnalyticsTaskState) assignedTask.getState()).getState();
DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask);

// Don't count stopped and failed df-analytics tasks as they don't consume native memory
if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,27 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi
+ currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
}

public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_givenTaskHasNullState() {
int numNodes = randomIntBetween(1, 10);
int maxRunningJobsPerNode = 10;
int maxMachineMemoryPercent = 30;

Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "-1");

ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 1, JobState.OPENED, null);

String dataFrameAnalyticsId = "data_frame_analytics_id_new";

JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), dataFrameAnalyticsId,
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, dataFrameAnalyticsId));
PersistentTasksCustomMetaData.Assignment result =
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
assertNotNull(result.getExecutorNode());
}

public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemoryLimiting() {
int numNodes = randomIntBetween(1, 10);
int maxRunningJobsPerNode = randomIntBetween(1, 100);
Expand Down Expand Up @@ -579,6 +600,12 @@ public void testConsiderLazyAssignmentWithLazyNodes() {

private ClusterState.Builder fillNodesWithRunningJobs(Map<String, String> nodeAttr, int numNodes, int numRunningJobsPerNode) {

return fillNodesWithRunningJobs(nodeAttr, numNodes, numRunningJobsPerNode, JobState.OPENED, DataFrameAnalyticsState.STARTED);
}

private ClusterState.Builder fillNodesWithRunningJobs(Map<String, String> nodeAttr, int numNodes, int numRunningJobsPerNode,
JobState anomalyDetectionJobState, DataFrameAnalyticsState dfAnalyticsJobState) {

DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
String[] jobIds = new String[numNodes * numRunningJobsPerNode];
Expand All @@ -591,10 +618,10 @@ private ClusterState.Builder fillNodesWithRunningJobs(Map<String, String> nodeAt
// Both anomaly detector jobs and data frame analytics jobs should count towards the limit
if (randomBoolean()) {
jobIds[id] = "job_id" + id;
TransportOpenJobActionTests.addJobTask(jobIds[id], nodeId, JobState.OPENED, tasksBuilder);
TransportOpenJobActionTests.addJobTask(jobIds[id], nodeId, anomalyDetectionJobState, tasksBuilder);
} else {
jobIds[id] = "data_frame_analytics_id" + id;
addDataFrameAnalyticsJobTask(jobIds[id], nodeId, DataFrameAnalyticsState.STARTED, tasksBuilder);
addDataFrameAnalyticsJobTask(jobIds[id], nodeId, dfAnalyticsJobState, tasksBuilder);
}
}
}
Expand Down