Skip to content

Commit a3769aa

Browse files
authored
[ML] JIndex: Job exists and get job should read cluster state first. (#36305)
1 parent a2543ec commit a3769aa

File tree

3 files changed

+163
-47
lines changed

3 files changed

+163
-47
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

Lines changed: 60 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -148,21 +148,15 @@ public void groupExists(String groupId, ActionListener<Boolean> listener) {
148148
}
149149

150150
public void jobExists(String jobId, ActionListener<Boolean> listener) {
151-
jobConfigProvider.jobExists(jobId, false, ActionListener.wrap(
152-
jobFound -> {
153-
if (jobFound) {
154-
listener.onResponse(Boolean.TRUE);
155-
} else {
156-
// Look in the clusterstate for the job config
157-
if (MlMetadata.getMlMetadata(clusterService.state()).getJobs().containsKey(jobId)) {
158-
listener.onResponse(Boolean.TRUE);
159-
} else {
160-
listener.onFailure(ExceptionsHelper.missingJobException(jobId));
161-
}
162-
}
163-
},
164-
listener::onFailure
165-
));
151+
if (MlMetadata.getMlMetadata(clusterService.state()).getJobs().containsKey(jobId)) {
152+
listener.onResponse(Boolean.TRUE);
153+
} else {
154+
// check the index
155+
jobConfigProvider.jobExists(jobId, true, ActionListener.wrap(
156+
jobFound -> listener.onResponse(jobFound),
157+
listener::onFailure
158+
));
159+
}
166160
}
167161

168162
/**
@@ -173,33 +167,14 @@ public void jobExists(String jobId, ActionListener<Boolean> listener) {
173167
* a ResourceNotFoundException is returned
174168
*/
175169
public void getJob(String jobId, ActionListener<Job> jobListener) {
176-
jobConfigProvider.getJob(jobId, ActionListener.wrap(
177-
r -> jobListener.onResponse(r.build()), // TODO JIndex we shouldn't be building the job here
178-
e -> {
179-
if (e instanceof ResourceNotFoundException) {
180-
// Try to get the job from the cluster state
181-
getJobFromClusterState(jobId, jobListener);
182-
} else {
183-
jobListener.onFailure(e);
184-
}
185-
}
186-
));
187-
}
188-
189-
/**
190-
* Read a job from the cluster state.
191-
* The job is returned on the same thread even though a listener is used.
192-
*
193-
* @param jobId the jobId
194-
* @param jobListener the Job listener. If no job matches {@code jobId}
195-
* a ResourceNotFoundException is returned
196-
*/
197-
private void getJobFromClusterState(String jobId, ActionListener<Job> jobListener) {
198170
Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(jobId);
199-
if (job == null) {
200-
jobListener.onFailure(ExceptionsHelper.missingJobException(jobId));
201-
} else {
171+
if (job != null) {
202172
jobListener.onResponse(job);
173+
} else {
174+
jobConfigProvider.getJob(jobId, ActionListener.wrap(
175+
r -> jobListener.onResponse(r.build()), // TODO JIndex we shouldn't be building the job here
176+
jobListener::onFailure
177+
));
203178
}
204179
}
205180

@@ -366,6 +341,22 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist
366341
return;
367342
}
368343

344+
// Check the job id is not the same as a group Id
345+
if (currentMlMetadata.isGroupOrJob(job.getId())) {
346+
actionListener.onFailure(new
347+
ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, job.getId())));
348+
return;
349+
}
350+
351+
// and that the new job's groups are not job Ids
352+
for (String group : job.getGroups()) {
353+
if (currentMlMetadata.getJobs().containsKey(group)) {
354+
actionListener.onFailure(new
355+
ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, group)));
356+
return;
357+
}
358+
}
359+
369360
ActionListener<Boolean> putJobListener = new ActionListener<Boolean>() {
370361
@Override
371362
public void onResponse(Boolean indicesCreated) {
@@ -446,6 +437,35 @@ public void onFailure(Exception e) {
446437

447438
public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
448439
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
440+
441+
if (request.getJobUpdate().getGroups() != null && request.getJobUpdate().getGroups().isEmpty() == false) {
442+
443+
// check the new groups are not job Ids
444+
for (String group : request.getJobUpdate().getGroups()) {
445+
if (mlMetadata.getJobs().containsKey(group)) {
446+
actionListener.onFailure(new ResourceAlreadyExistsException(
447+
Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, group)));
448+
}
449+
}
450+
451+
jobConfigProvider.jobIdMatches(request.getJobUpdate().getGroups(), ActionListener.wrap(
452+
matchingIds -> {
453+
if (matchingIds.isEmpty()) {
454+
updateJobPostInitialChecks(request, mlMetadata, actionListener);
455+
} else {
456+
actionListener.onFailure(new ResourceAlreadyExistsException(
457+
Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, matchingIds.get(0))));
458+
}
459+
},
460+
actionListener::onFailure
461+
));
462+
} else {
463+
updateJobPostInitialChecks(request, mlMetadata, actionListener);
464+
}
465+
}
466+
467+
private void updateJobPostInitialChecks(UpdateJobAction.Request request, MlMetadata mlMetadata,
468+
ActionListener<PutJobAction.Response> actionListener) {
449469
if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) {
450470
updateJobClusterState(request, actionListener);
451471
} else {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
4545
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
4646
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
47+
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
4748
import org.elasticsearch.xpack.ml.MachineLearning;
4849
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests;
4950
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
@@ -497,7 +498,7 @@ public void testPutJob_AddsCreateTime() throws IOException {
497498
MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test");
498499
JobManager jobManager = createJobManager(mockClientBuilder.build());
499500

500-
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob());
501+
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJobFoo());
501502

502503
doAnswer(invocation -> {
503504
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocation.getArguments()[1];
@@ -544,7 +545,7 @@ public void testJobExists_GivenMissingJob() {
544545

545546
doAnswer(invocationOnMock -> {
546547
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
547-
listener.onResponse(false);
548+
listener.onFailure(ExceptionsHelper.missingJobException("non-job"));
548549
return null;
549550
}).when(jobConfigProvider).jobExists(anyString(), anyBoolean(), any());
550551

@@ -579,18 +580,43 @@ public void testJobExists_GivenJobIsInClusterState() {
579580
Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT));
580581
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
581582

583+
584+
JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService,
585+
auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider);
586+
587+
AtomicBoolean jobExistsHolder = new AtomicBoolean();
588+
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
589+
jobManager.jobExists("cs-job", ActionListener.wrap(
590+
jobExistsHolder::set,
591+
exceptionHolder::set
592+
));
593+
594+
assertTrue(jobExistsHolder.get());
595+
assertNull(exceptionHolder.get());
596+
verify(jobConfigProvider, never()).jobExists(anyString(), anyBoolean(), any());
597+
}
598+
599+
public void testJobExists_GivenJobIsInIndex() {
600+
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
601+
when(clusterService.state()).thenReturn(clusterState);
602+
603+
ClusterSettings clusterSettings = new ClusterSettings(environment.settings(),
604+
Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT));
605+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
606+
607+
JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class);
582608
doAnswer(invocationOnMock -> {
583609
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
584-
listener.onResponse(false);
610+
listener.onResponse(true);
585611
return null;
586-
}).when(jobConfigProvider).jobExists(anyString(), anyBoolean(), any());
612+
}).when(jobConfigProvider).jobExists(eq("index-job"), anyBoolean(), any());
587613

588614
JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService,
589615
auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider);
590616

591617
AtomicBoolean jobExistsHolder = new AtomicBoolean();
592618
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
593-
jobManager.jobExists("cs-job", ActionListener.wrap(
619+
jobManager.jobExists("index-job", ActionListener.wrap(
594620
jobExistsHolder::set,
595621
exceptionHolder::set
596622
));
@@ -603,7 +629,7 @@ public void testPutJob_ThrowsIfJobExistsInClusterState() throws IOException {
603629
MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test");
604630
JobManager jobManager = createJobManager(mockClientBuilder.build());
605631

606-
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob());
632+
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJobFoo());
607633

608634
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
609635
mlMetadata.putJob(buildJobBuilder("foo").build(), false);
@@ -623,6 +649,54 @@ public void onFailure(Exception e) {
623649
});
624650
}
625651

652+
public void testPutJob_ThrowsIfIdIsTheSameAsAGroup() throws IOException {
653+
MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test");
654+
JobManager jobManager = createJobManager(mockClientBuilder.build());
655+
656+
657+
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
658+
Job.Builder jobBuilder = buildJobBuilder("job-with-group-foo");
659+
jobBuilder.setGroups(Collections.singletonList("foo"));
660+
mlMetadata.putJob(jobBuilder.build(), false);
661+
ClusterState clusterState = ClusterState.builder(new ClusterName("name"))
662+
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata.build())).build();
663+
664+
// job id cannot be a group
665+
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJobFoo());
666+
jobManager.putJob(putJobRequest, analysisRegistry, clusterState, new ActionListener<PutJobAction.Response>() {
667+
@Override
668+
public void onResponse(PutJobAction.Response response) {
669+
fail("should have got an error");
670+
}
671+
672+
@Override
673+
public void onFailure(Exception e) {
674+
assertTrue(e instanceof ResourceAlreadyExistsException);
675+
assertEquals("job and group names must be unique but job [foo] and group [foo] have the same name", e.getMessage());
676+
}
677+
});
678+
679+
// the job's groups cannot be job Ids
680+
jobBuilder = buildJobBuilder("job-with-clashing-group-name");
681+
jobBuilder.setCreateTime(null);
682+
jobBuilder.setGroups(Collections.singletonList("job-with-group-foo"));
683+
putJobRequest = new PutJobAction.Request(jobBuilder);
684+
685+
jobManager.putJob(putJobRequest, analysisRegistry, clusterState, new ActionListener<PutJobAction.Response>() {
686+
@Override
687+
public void onResponse(PutJobAction.Response response) {
688+
fail("should have got an error");
689+
}
690+
691+
@Override
692+
public void onFailure(Exception e) {
693+
assertTrue(e instanceof ResourceAlreadyExistsException);
694+
assertEquals("job and group names must be unique but job [job-with-group-foo] and " +
695+
"group [job-with-group-foo] have the same name", e.getMessage());
696+
}
697+
});
698+
}
699+
626700
public void testNotifyFilterChangedGivenNoop() {
627701
MlFilter filter = MlFilter.builder("my_filter").build();
628702
MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test");
@@ -900,7 +974,7 @@ public void testRevertSnapshot_GivenJobInClusterState() {
900974
verify(jobConfigProvider, never()).updateJob(any(), any(), any(), any());
901975
}
902976

903-
private Job.Builder createJob() {
977+
private Job.Builder createJobFoo() {
904978
Detector.Builder d1 = new Detector.Builder("info_content", "domain");
905979
d1.setOverFieldName("client");
906980
AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Collections.singletonList(d1.build()));

x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,28 @@
397397
"description":"Can't update all description"
398398
}
399399
400+
- do:
401+
xpack.ml.put_job:
402+
job_id: job-crud-update-group-name-clash
403+
body: >
404+
{
405+
"analysis_config" : {
406+
"bucket_span": "1h",
407+
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
408+
},
409+
"data_description" : {
410+
}
411+
}
412+
413+
- do:
414+
catch: "/job and group names must be unique/"
415+
xpack.ml.update_job:
416+
job_id: jobs-crud-update-job
417+
body: >
418+
{
419+
"groups": ["job-crud-update-group-name-clash"]
420+
}
421+
400422
---
401423
"Test cannot decrease model_memory_limit below current usage":
402424
- skip:

0 commit comments

Comments
 (0)