Skip to content

Commit 02fa0d4

Browse files
[6.x][ML] Ensure immutability of MlMetadata (elastic#31994)
The test failure in elastic#31916 revealed that updating rules on a job was modifying the detectors list in-place. That meant the old cluster state and the updated cluster state had no difference and thus the change was not propagated to non-master nodes. This commit fixes that and also reviews all of ML metadata in order to ensure immutability. Closes elastic#31916
1 parent efe4bde commit 02fa0d4

File tree

9 files changed

+237
-68
lines changed

9 files changed

+237
-68
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,14 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue
156156
this.jobId = jobId;
157157
this.queryDelay = queryDelay;
158158
this.frequency = frequency;
159-
this.indices = indices;
160-
this.types = types;
159+
this.indices = indices == null ? null : Collections.unmodifiableList(indices);
160+
this.types = types == null ? null : Collections.unmodifiableList(types);
161161
this.query = query;
162162
this.aggregations = aggregations;
163-
this.scriptFields = scriptFields;
163+
this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields);
164164
this.scrollSize = scrollSize;
165165
this.chunkingConfig = chunkingConfig;
166-
this.headers = Objects.requireNonNull(headers);
166+
this.headers = Collections.unmodifiableMap(headers);
167167
}
168168

169169
public DatafeedConfig(StreamInput in) throws IOException {
@@ -172,19 +172,19 @@ public DatafeedConfig(StreamInput in) throws IOException {
172172
this.queryDelay = in.readOptionalTimeValue();
173173
this.frequency = in.readOptionalTimeValue();
174174
if (in.readBoolean()) {
175-
this.indices = in.readList(StreamInput::readString);
175+
this.indices = Collections.unmodifiableList(in.readList(StreamInput::readString));
176176
} else {
177177
this.indices = null;
178178
}
179179
if (in.readBoolean()) {
180-
this.types = in.readList(StreamInput::readString);
180+
this.types = Collections.unmodifiableList(in.readList(StreamInput::readString));
181181
} else {
182182
this.types = null;
183183
}
184184
this.query = in.readNamedWriteable(QueryBuilder.class);
185185
this.aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new);
186186
if (in.readBoolean()) {
187-
this.scriptFields = in.readList(SearchSourceBuilder.ScriptField::new);
187+
this.scriptFields = Collections.unmodifiableList(in.readList(SearchSourceBuilder.ScriptField::new));
188188
} else {
189189
this.scriptFields = null;
190190
}
@@ -195,7 +195,7 @@ public DatafeedConfig(StreamInput in) throws IOException {
195195
}
196196
this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new);
197197
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
198-
this.headers = in.readMap(StreamInput::readString, StreamInput::readString);
198+
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
199199
} else {
200200
this.headers = Collections.emptyMap();
201201
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,18 @@ public String toString() {
352352
return Strings.toString(this);
353353
}
354354

355+
boolean isNoop(DatafeedConfig datafeed) {
356+
return (frequency == null || Objects.equals(frequency, datafeed.getFrequency()))
357+
&& (queryDelay == null || Objects.equals(queryDelay, datafeed.getQueryDelay()))
358+
&& (indices == null || Objects.equals(indices, datafeed.getIndices()))
359+
&& (types == null || Objects.equals(types, datafeed.getTypes()))
360+
&& (query == null || Objects.equals(query, datafeed.getQuery()))
361+
&& (scrollSize == null || Objects.equals(scrollSize, datafeed.getQueryDelay()))
362+
&& (aggregations == null || Objects.equals(aggregations, datafeed.getAggregations()))
363+
&& (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields()))
364+
&& (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()));
365+
}
366+
355367
public static class Builder {
356368

357369
private String id;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfig.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -144,29 +144,29 @@ private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, Lis
144144
this.latency = latency;
145145
this.categorizationFieldName = categorizationFieldName;
146146
this.categorizationAnalyzerConfig = categorizationAnalyzerConfig;
147-
this.categorizationFilters = categorizationFilters;
147+
this.categorizationFilters = categorizationFilters == null ? null : Collections.unmodifiableList(categorizationFilters);
148148
this.summaryCountFieldName = summaryCountFieldName;
149-
this.influencers = influencers;
149+
this.influencers = Collections.unmodifiableList(influencers);
150150
this.overlappingBuckets = overlappingBuckets;
151151
this.resultFinalizationWindow = resultFinalizationWindow;
152152
this.multivariateByFields = multivariateByFields;
153-
this.multipleBucketSpans = multipleBucketSpans;
153+
this.multipleBucketSpans = multipleBucketSpans == null ? null : Collections.unmodifiableList(multipleBucketSpans);
154154
this.usePerPartitionNormalization = usePerPartitionNormalization;
155155
}
156156

157157
public AnalysisConfig(StreamInput in) throws IOException {
158158
bucketSpan = in.readTimeValue();
159159
categorizationFieldName = in.readOptionalString();
160-
categorizationFilters = in.readBoolean() ? in.readList(StreamInput::readString) : null;
160+
categorizationFilters = in.readBoolean() ? Collections.unmodifiableList(in.readList(StreamInput::readString)) : null;
161161
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
162162
categorizationAnalyzerConfig = in.readOptionalWriteable(CategorizationAnalyzerConfig::new);
163163
} else {
164164
categorizationAnalyzerConfig = null;
165165
}
166166
latency = in.readOptionalTimeValue();
167167
summaryCountFieldName = in.readOptionalString();
168-
detectors = in.readList(Detector::new);
169-
influencers = in.readList(StreamInput::readString);
168+
detectors = Collections.unmodifiableList(in.readList(Detector::new));
169+
influencers = Collections.unmodifiableList(in.readList(StreamInput::readString));
170170
overlappingBuckets = in.readOptionalBoolean();
171171
resultFinalizationWindow = in.readOptionalLong();
172172
multivariateByFields = in.readOptionalBoolean();
@@ -176,7 +176,7 @@ public AnalysisConfig(StreamInput in) throws IOException {
176176
for (int i = 0; i < arraySize; i++) {
177177
spans.add(in.readTimeValue());
178178
}
179-
multipleBucketSpans = spans;
179+
multipleBucketSpans = Collections.unmodifiableList(spans);
180180
} else {
181181
multipleBucketSpans = null;
182182
}
@@ -487,18 +487,20 @@ public Builder(List<Detector> detectors) {
487487
}
488488

489489
public Builder(AnalysisConfig analysisConfig) {
490-
this.detectors = analysisConfig.detectors;
490+
this.detectors = new ArrayList<>(analysisConfig.detectors);
491491
this.bucketSpan = analysisConfig.bucketSpan;
492492
this.latency = analysisConfig.latency;
493493
this.categorizationFieldName = analysisConfig.categorizationFieldName;
494-
this.categorizationFilters = analysisConfig.categorizationFilters;
494+
this.categorizationFilters = analysisConfig.categorizationFilters == null ? null
495+
: new ArrayList<>(analysisConfig.categorizationFilters);
495496
this.categorizationAnalyzerConfig = analysisConfig.categorizationAnalyzerConfig;
496497
this.summaryCountFieldName = analysisConfig.summaryCountFieldName;
497-
this.influencers = analysisConfig.influencers;
498+
this.influencers = new ArrayList<>(analysisConfig.influencers);
498499
this.overlappingBuckets = analysisConfig.overlappingBuckets;
499500
this.resultFinalizationWindow = analysisConfig.resultFinalizationWindow;
500501
this.multivariateByFields = analysisConfig.multivariateByFields;
501-
this.multipleBucketSpans = analysisConfig.multipleBucketSpans;
502+
this.multipleBucketSpans = analysisConfig.multipleBucketSpans == null ? null
503+
: new ArrayList<>(analysisConfig.multipleBucketSpans);
502504
this.usePerPartitionNormalization = analysisConfig.usePerPartitionNormalization;
503505
}
504506

@@ -518,6 +520,10 @@ public void setDetectors(List<Detector> detectors) {
518520
this.detectors = sequentialIndexDetectors;
519521
}
520522

523+
public void setDetector(int detectorIndex, Detector detector) {
524+
detectors.set(detectorIndex, detector);
525+
}
526+
521527
public void setBucketSpan(TimeValue bucketSpan) {
522528
this.bucketSpan = bucketSpan;
523529
}
@@ -543,7 +549,7 @@ public void setSummaryCountFieldName(String summaryCountFieldName) {
543549
}
544550

545551
public void setInfluencers(List<String> influencers) {
546-
this.influencers = influencers;
552+
this.influencers = ExceptionsHelper.requireNonNull(influencers, INFLUENCERS.getPreferredName());
547553
}
548554

549555
public void setOverlappingBuckets(Boolean overlappingBuckets) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public Detector(StreamInput in) throws IOException {
253253
useNull = in.readBoolean();
254254
excludeFrequent = in.readBoolean() ? ExcludeFrequent.readFromStream(in) : null;
255255
if (in.getVersion().onOrAfter(DetectionRule.VERSION_INTRODUCED)) {
256-
rules = in.readList(DetectionRule::new);
256+
rules = Collections.unmodifiableList(in.readList(DetectionRule::new));
257257
} else {
258258
in.readList(DetectionRule::readOldFormat);
259259
rules = Collections.emptyList();
@@ -513,7 +513,7 @@ public Builder(Detector detector) {
513513
partitionFieldName = detector.partitionFieldName;
514514
useNull = detector.useNull;
515515
excludeFrequent = detector.excludeFrequent;
516-
rules = new ArrayList<>(detector.getRules());
516+
rules = new ArrayList<>(detector.rules);
517517
detectorIndex = detector.detectorIndex;
518518
}
519519

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ private Job(String jobId, String jobType, Version jobVersion, List<String> group
190190
this.jobId = jobId;
191191
this.jobType = jobType;
192192
this.jobVersion = jobVersion;
193-
this.groups = groups;
193+
this.groups = Collections.unmodifiableList(groups);
194194
this.description = description;
195195
this.createTime = createTime;
196196
this.finishedTime = finishedTime;
@@ -204,7 +204,7 @@ private Job(String jobId, String jobType, Version jobVersion, List<String> group
204204
this.backgroundPersistInterval = backgroundPersistInterval;
205205
this.modelSnapshotRetentionDays = modelSnapshotRetentionDays;
206206
this.resultsRetentionDays = resultsRetentionDays;
207-
this.customSettings = customSettings;
207+
this.customSettings = customSettings == null ? null : Collections.unmodifiableMap(customSettings);
208208
this.modelSnapshotId = modelSnapshotId;
209209
this.resultsIndexName = resultsIndexName;
210210
this.deleted = deleted;
@@ -219,7 +219,7 @@ public Job(StreamInput in) throws IOException {
219219
jobVersion = null;
220220
}
221221
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
222-
groups = in.readList(StreamInput::readString);
222+
groups = Collections.unmodifiableList(in.readList(StreamInput::readString));
223223
} else {
224224
groups = Collections.emptyList();
225225
}
@@ -240,7 +240,8 @@ public Job(StreamInput in) throws IOException {
240240
backgroundPersistInterval = in.readOptionalTimeValue();
241241
modelSnapshotRetentionDays = in.readOptionalLong();
242242
resultsRetentionDays = in.readOptionalLong();
243-
customSettings = in.readMap();
243+
Map<String, Object> readCustomSettings = in.readMap();
244+
customSettings = readCustomSettings == null ? null : Collections.unmodifiableMap(readCustomSettings);
244245
modelSnapshotId = in.readOptionalString();
245246
resultsIndexName = in.readString();
246247
deleted = in.readBoolean();
@@ -603,7 +604,8 @@ public boolean equals(Object other) {
603604
&& Objects.equals(this.lastDataTime, that.lastDataTime)
604605
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory)
605606
&& Objects.equals(this.analysisConfig, that.analysisConfig)
606-
&& Objects.equals(this.analysisLimits, that.analysisLimits) && Objects.equals(this.dataDescription, that.dataDescription)
607+
&& Objects.equals(this.analysisLimits, that.analysisLimits)
608+
&& Objects.equals(this.dataDescription, that.dataDescription)
607609
&& Objects.equals(this.modelPlotConfig, that.modelPlotConfig)
608610
&& Objects.equals(this.renormalizationWindowDays, that.renormalizationWindowDays)
609611
&& Objects.equals(this.backgroundPersistInterval, that.backgroundPersistInterval)
@@ -1002,6 +1004,7 @@ public boolean equals(Object o) {
10021004
return Objects.equals(this.id, that.id)
10031005
&& Objects.equals(this.jobType, that.jobType)
10041006
&& Objects.equals(this.jobVersion, that.jobVersion)
1007+
&& Objects.equals(this.groups, that.groups)
10051008
&& Objects.equals(this.description, that.description)
10061009
&& Objects.equals(this.analysisConfig, that.analysisConfig)
10071010
&& Objects.equals(this.analysisLimits, that.analysisLimits)
@@ -1023,7 +1026,7 @@ public boolean equals(Object o) {
10231026

10241027
@Override
10251028
public int hashCode() {
1026-
return Objects.hash(id, jobType, jobVersion, description, analysisConfig, analysisLimits, dataDescription, createTime,
1029+
return Objects.hash(id, jobType, jobVersion, groups, description, analysisConfig, analysisLimits, dataDescription, createTime,
10271030
finishedTime, lastDataTime, establishedModelMemory, modelPlotConfig, renormalizationWindowDays,
10281031
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId,
10291032
resultsIndexName, deleted);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -346,33 +346,33 @@ public Set<String> getUpdateFields() {
346346
*/
347347
public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) {
348348
Job.Builder builder = new Job.Builder(source);
349+
AnalysisConfig currentAnalysisConfig = source.getAnalysisConfig();
350+
AnalysisConfig.Builder newAnalysisConfig = new AnalysisConfig.Builder(currentAnalysisConfig);
351+
349352
if (groups != null) {
350353
builder.setGroups(groups);
351354
}
352355
if (description != null) {
353356
builder.setDescription(description);
354357
}
355358
if (detectorUpdates != null && detectorUpdates.isEmpty() == false) {
356-
AnalysisConfig ac = source.getAnalysisConfig();
357-
int numDetectors = ac.getDetectors().size();
359+
int numDetectors = currentAnalysisConfig.getDetectors().size();
358360
for (DetectorUpdate dd : detectorUpdates) {
359361
if (dd.getDetectorIndex() >= numDetectors) {
360362
throw ExceptionsHelper.badRequestException("Supplied detector_index [{}] is >= the number of detectors [{}]",
361363
dd.getDetectorIndex(), numDetectors);
362364
}
363365

364-
Detector.Builder detectorbuilder = new Detector.Builder(ac.getDetectors().get(dd.getDetectorIndex()));
366+
Detector.Builder detectorBuilder = new Detector.Builder(currentAnalysisConfig.getDetectors().get(dd.getDetectorIndex()));
365367
if (dd.getDescription() != null) {
366-
detectorbuilder.setDetectorDescription(dd.getDescription());
368+
detectorBuilder.setDetectorDescription(dd.getDescription());
367369
}
368370
if (dd.getRules() != null) {
369-
detectorbuilder.setRules(dd.getRules());
371+
detectorBuilder.setRules(dd.getRules());
370372
}
371-
ac.getDetectors().set(dd.getDetectorIndex(), detectorbuilder.build());
372-
}
373373

374-
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(ac);
375-
builder.setAnalysisConfig(acBuilder);
374+
newAnalysisConfig.setDetector(dd.getDetectorIndex(), detectorBuilder.build());
375+
}
376376
}
377377
if (modelPlotConfig != null) {
378378
builder.setModelPlotConfig(modelPlotConfig);
@@ -395,9 +395,7 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) {
395395
builder.setResultsRetentionDays(resultsRetentionDays);
396396
}
397397
if (categorizationFilters != null) {
398-
AnalysisConfig.Builder analysisConfigBuilder = new AnalysisConfig.Builder(source.getAnalysisConfig());
399-
analysisConfigBuilder.setCategorizationFilters(categorizationFilters);
400-
builder.setAnalysisConfig(analysisConfigBuilder);
398+
newAnalysisConfig.setCategorizationFilters(categorizationFilters);
401399
}
402400
if (customSettings != null) {
403401
builder.setCustomSettings(customSettings);
@@ -416,9 +414,47 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) {
416414
if (jobVersion != null) {
417415
builder.setJobVersion(jobVersion);
418416
}
417+
418+
builder.setAnalysisConfig(newAnalysisConfig);
419419
return builder.build();
420420
}
421421

422+
boolean isNoop(Job job) {
423+
return (groups == null || Objects.equals(groups, job.getGroups()))
424+
&& (description == null || Objects.equals(description, job.getDescription()))
425+
&& (modelPlotConfig == null || Objects.equals(modelPlotConfig, job.getModelPlotConfig()))
426+
&& (analysisLimits == null || Objects.equals(analysisLimits, job.getAnalysisLimits()))
427+
&& updatesDetectors(job) == false
428+
&& (renormalizationWindowDays == null || Objects.equals(renormalizationWindowDays, job.getRenormalizationWindowDays()))
429+
&& (backgroundPersistInterval == null || Objects.equals(backgroundPersistInterval, job.getBackgroundPersistInterval()))
430+
&& (modelSnapshotRetentionDays == null || Objects.equals(modelSnapshotRetentionDays, job.getModelSnapshotRetentionDays()))
431+
&& (resultsRetentionDays == null || Objects.equals(resultsRetentionDays, job.getResultsRetentionDays()))
432+
&& (categorizationFilters == null
433+
|| Objects.equals(categorizationFilters, job.getAnalysisConfig().getCategorizationFilters()))
434+
&& (customSettings == null || Objects.equals(customSettings, job.getCustomSettings()))
435+
&& (modelSnapshotId == null || Objects.equals(modelSnapshotId, job.getModelSnapshotId()))
436+
&& (establishedModelMemory == null || Objects.equals(establishedModelMemory, job.getEstablishedModelMemory()))
437+
&& (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion()));
438+
}
439+
440+
boolean updatesDetectors(Job job) {
441+
AnalysisConfig analysisConfig = job.getAnalysisConfig();
442+
if (detectorUpdates == null) {
443+
return false;
444+
}
445+
for (DetectorUpdate detectorUpdate : detectorUpdates) {
446+
if (detectorUpdate.description == null && detectorUpdate.rules == null) {
447+
continue;
448+
}
449+
Detector detector = analysisConfig.getDetectors().get(detectorUpdate.detectorIndex);
450+
if (Objects.equals(detectorUpdate.description, detector.getDetectorDescription()) == false
451+
|| Objects.equals(detectorUpdate.rules, detector.getRules()) == false) {
452+
return true;
453+
}
454+
}
455+
return false;
456+
}
457+
422458
@Override
423459
public boolean equals(Object other) {
424460
if (this == other) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/RuleScope.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public RuleScope() {
6060
}
6161

6262
public RuleScope(Map<String, FilterRef> scope) {
63-
this.scope = Objects.requireNonNull(scope);
63+
this.scope = Collections.unmodifiableMap(scope);
6464
}
6565

6666
public RuleScope(StreamInput in) throws IOException {

0 commit comments

Comments
 (0)