Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Report index unavailable instead of waiting for lazy node #38423

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct

private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
private final Client client;
private final JobConfigProvider jobConfigProvider;
private final MlMemoryTracker memoryTracker;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
Expand All @@ -98,13 +97,12 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
XPackLicenseState licenseState, ClusterService clusterService,
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client,
IndexNameExpressionResolver indexNameExpressionResolver,
JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker) {
super(OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
OpenJobAction.Request::new);
this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService;
this.client = client;
this.jobConfigProvider = jobConfigProvider;
this.memoryTracker = memoryTracker;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
Expand Down Expand Up @@ -136,32 +134,15 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
int maxConcurrentJobAllocations,
int maxMachineMemoryPercent,
MlMemoryTracker memoryTracker,
boolean isMemoryTrackerRecentlyRefreshed,
Logger logger) {
String resultsWriteAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState);
if (unavailableIndices.size() != 0) {
String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
String.join(",", unavailableIndices) + "]";
logger.debug(reason);
return new PersistentTasksCustomMetaData.Assignment(null, reason);
}

// Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe
// because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs
boolean allocateByMemory = true;

if (memoryTracker.isRecentlyRefreshed() == false) {

boolean scheduledRefresh = memoryTracker.asyncRefresh();
if (scheduledRefresh) {
String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested";
logger.debug(reason);
return new PersistentTasksCustomMetaData.Assignment(null, reason);
} else {
allocateByMemory = false;
logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled",
jobId);
}
boolean allocateByMemory = isMemoryTrackerRecentlyRefreshed;
if (isMemoryTrackerRecentlyRefreshed == false) {
logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled",
jobId);
}

List<String> reasons = new LinkedList<>();
Expand Down Expand Up @@ -592,12 +573,33 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobP
return AWAITING_UPGRADE;
}

PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(),
String jobId = params.getJobId();
String resultsWriteAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState);
if (unavailableIndices.size() != 0) {
String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
String.join(",", unavailableIndices) + "]";
logger.debug(reason);
return new PersistentTasksCustomMetaData.Assignment(null, reason);
}

boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
if (isMemoryTrackerRecentlyRefreshed == false) {
boolean scheduledRefresh = memoryTracker.asyncRefresh();
if (scheduledRefresh) {
String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested";
logger.debug(reason);
return new PersistentTasksCustomMetaData.Assignment(null, reason);
}
}

PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(jobId,
params.getJob(),
clusterState,
maxConcurrentJobAllocations,
maxMachineMemoryPercent,
memoryTracker,
isMemoryTrackerRecentlyRefreshed,
logger);
if (assignment.getExecutorNode() == null) {
int numMlNodes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
Expand Down Expand Up @@ -60,11 +61,9 @@

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
Expand All @@ -79,11 +78,13 @@
public class TransportOpenJobActionTests extends ESTestCase {

private MlMemoryTracker memoryTracker;
private boolean isMemoryTrackerRecentlyRefreshed;

@Before
public void setup() {
memoryTracker = mock(MlMemoryTracker.class);
when(memoryTracker.isRecentlyRefreshed()).thenReturn(true);
isMemoryTrackerRecentlyRefreshed = true;
when(memoryTracker.isRecentlyRefreshed()).thenReturn(isMemoryTrackerRecentlyRefreshed);
}

public void testValidate_jobMissing() {
Expand Down Expand Up @@ -141,7 +142,7 @@ public void testSelectLeastLoadedMlNode_byCount() {
jobBuilder.setJobVersion(Version.CURRENT);

Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", jobBuilder.build(),
cs.build(), 2, 30, memoryTracker, logger);
cs.build(), 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
assertEquals("", result.getExplanation());
assertEquals("_node_id3", result.getExecutorNode());
}
Expand Down Expand Up @@ -178,7 +179,7 @@ public void testSelectLeastLoadedMlNode_maxCapacity() {
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id0", new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date());

Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), 2,
30, memoryTracker, logger);
30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
assertNull(result.getExecutorNode());
assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode
+ "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]"));
Expand All @@ -204,7 +205,8 @@ public void testSelectLeastLoadedMlNode_noMlNodes() {

Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());

Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 30, memoryTracker, logger);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 30, memoryTracker,
isMemoryTrackerRecentlyRefreshed, logger);
assertTrue(result.getExplanation().contains("because this node isn't a ml node"));
assertNull(result.getExecutorNode());
}
Expand Down Expand Up @@ -239,7 +241,8 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id6", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());

ClusterState cs = csBuilder.build();
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 30, memoryTracker, logger);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 30, memoryTracker,
isMemoryTrackerRecentlyRefreshed, logger);
assertEquals("_node_id3", result.getExecutorNode());

tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
Expand All @@ -249,7 +252,8 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger);
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
logger);
assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));

Expand All @@ -260,7 +264,8 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger);
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
logger);
assertNull("no node selected, because stale task", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));

Expand All @@ -271,7 +276,8 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger);
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
logger);
assertNull("no node selected, because null state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
}
Expand Down Expand Up @@ -310,7 +316,8 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob()
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());

// Allocation won't be possible if the stale failed job is treated as opening
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker,
isMemoryTrackerRecentlyRefreshed, logger);
assertEquals("_node_id1", result.getExecutorNode());

tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
Expand All @@ -320,7 +327,8 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob()
csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 30, memoryTracker, logger);
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
logger);
assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
}
Expand Down Expand Up @@ -353,7 +361,7 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() {
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 30,
memoryTracker, logger);
memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]"));
assertNull(result.getExecutorNode());
}
Expand Down Expand Up @@ -384,7 +392,7 @@ public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion()
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", job, cs.build(),
2, 30, memoryTracker, logger);
2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
assertThat(result.getExplanation(), containsString(
"because the job's model snapshot requires a node of version [6.3.0] or higher"));
assertNull(result.getExecutorNode());
Expand Down Expand Up @@ -413,7 +421,7 @@ public void testSelectLeastLoadedMlNode_jobWithRulesButNoNodeMeetsRequiredVersio

Job job = jobWithRules("job_with_rules");
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker,
logger);
isMemoryTrackerRecentlyRefreshed, logger);
assertThat(result.getExplanation(), containsString(
"because jobs using custom_rules require a node of version [6.4.0] or higher"));
assertNull(result.getExecutorNode());
Expand Down Expand Up @@ -442,7 +450,7 @@ public void testSelectLeastLoadedMlNode_jobWithRulesAndNodeMeetsRequiredVersion(

Job job = jobWithRules("job_with_rules");
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker,
logger);
isMemoryTrackerRecentlyRefreshed, logger);
assertNotNull(result.getExecutorNode());
}

Expand Down Expand Up @@ -529,10 +537,10 @@ public void testJobTaskMatcherMatch() {

public void testGetAssignment_GivenJobThatRequiresMigration() {
ClusterService clusterService = mock(ClusterService.class);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(
Arrays.asList(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
MachineLearning.MAX_LAZY_ML_NODES)
));
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY,
Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
MachineLearning.MAX_LAZY_ML_NODES)
);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);

TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor(
Expand All @@ -542,6 +550,34 @@ public void testGetAssignment_GivenJobThatRequiresMigration() {
assertEquals(TransportOpenJobAction.AWAITING_MIGRATION, executor.getAssignment(params, mock(ClusterState.class)));
}

// An index being unavailable should take precedence over waiting for a lazy node
public void testGetAssignment_GivenUnavailableIndicesWithLazyNode() {
Settings settings = Settings.builder().put(MachineLearning.MAX_LAZY_ML_NODES.getKey(), 1).build();
ClusterService clusterService = mock(ClusterService.class);
ClusterSettings clusterSettings = new ClusterSettings(settings,
Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
MachineLearning.MAX_LAZY_ML_NODES)
);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);

ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addIndices(metaData, routingTable);
routingTable.remove(".ml-state");
csBuilder.metaData(metaData);
csBuilder.routingTable(routingTable.build());

TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor(
settings, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class));

OpenJobAction.JobParams params = new OpenJobAction.JobParams("unavailable_index_with_lazy_node");
params.setJob(mock(Job.class));
assertEquals("Not opening job [unavailable_index_with_lazy_node], " +
"because not all primary shards are active for the following indices [.ml-state]",
executor.getAssignment(params, csBuilder.build()).getExplanation());
}

public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) {
addJobTask(jobId, nodeId, jobState, builder, false);
}
Expand Down