Skip to content

Commit 8d01b11

Browse files
author
David Roberts
committed
[ML] Fix race condition when creating multiple jobs (#40049)
If multiple jobs are created together and the anomaly results index does not exist then some of the jobs could fail to update the mappings of the results index. This lead them to fail to write their results correctly later. Although this scenario sounds rare, it is exactly what happens if the user creates their first jobs using the Nginx module in the ML UI. This change fixes the problem by updating the mappings of the results index if it is found to exist during a creation attempt. Fixes #38785
1 parent 78a9754 commit 8d01b11

File tree

3 files changed

+103
-22
lines changed

3 files changed

+103
-22
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
1717
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
1818
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
19+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
20+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
1921
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
2022
import org.elasticsearch.action.bulk.BulkAction;
2123
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -37,12 +39,12 @@
3739
import org.elasticsearch.cluster.ClusterState;
3840
import org.elasticsearch.cluster.block.ClusterBlock;
3941
import org.elasticsearch.cluster.block.ClusterBlockException;
40-
import org.elasticsearch.cluster.metadata.IndexMetaData;
4142
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
4243
import org.elasticsearch.cluster.metadata.MappingMetaData;
4344
import org.elasticsearch.common.Nullable;
4445
import org.elasticsearch.common.Strings;
4546
import org.elasticsearch.common.bytes.BytesReference;
47+
import org.elasticsearch.common.collect.ImmutableOpenMap;
4648
import org.elasticsearch.common.settings.Settings;
4749
import org.elasticsearch.common.util.concurrent.ThreadContext;
4850
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
@@ -301,30 +303,53 @@ public void createJobResultIndex(Job job, ClusterState state, final ActionListen
301303
// so we need to handle that possibility
302304
if (e instanceof ResourceAlreadyExistsException) {
303305
LOGGER.info("Index already exists");
304-
// Create the alias
305-
createAliasListener.onResponse(true);
306+
// Add the term field mappings and alias. The complication is that the state at the
307+
// beginning of the operation doesn't have any knowledge of the index, as it's only
308+
// just been created. So we need yet another operation to get the mappings for it.
309+
getLatestIndexMappings(indexName, ActionListener.wrap(
310+
response -> {
311+
// Expect one index and one type. If this is not the case then it means the
312+
// index has been deleted almost immediately after being created, and this is
313+
// so unlikely that it's reasonable to fail the whole operation.
314+
ImmutableOpenMap<String, MappingMetaData> indexMappings =
315+
response.getMappings().iterator().next().value;
316+
MappingMetaData typeMappings = indexMappings.iterator().next().value;
317+
addTermsAndAliases(typeMappings, indexName, termFields, createAliasListener);
318+
},
319+
finalListener::onFailure
320+
));
306321
} else {
307322
finalListener.onFailure(e);
308323
}
309324
}
310325
), client.admin().indices()::create);
311326
} else {
312-
long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings);
313-
IndexMetaData indexMetaData = state.metaData().index(indexName);
327+
MappingMetaData mapping = state.metaData().index(indexName).mapping();
328+
addTermsAndAliases(mapping, indexName, termFields, createAliasListener);
329+
}
330+
}
314331

315-
if (violatedFieldCountLimit(termFields.size(), fieldCountLimit, indexMetaData)) {
316-
String message = "Cannot create job in index '" + indexName + "' as the " +
317-
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated";
318-
finalListener.onFailure(new IllegalArgumentException(message));
319-
} else {
320-
updateIndexMappingWithTermFields(indexName, indexMetaData.mapping().type(), termFields,
321-
ActionListener.wrap(createAliasListener::onResponse, finalListener::onFailure));
322-
}
332+
private void getLatestIndexMappings(final String indexName, final ActionListener<GetMappingsResponse> listener) {
333+
334+
GetMappingsRequest getMappingsRequest = client.admin().indices().prepareGetMappings(indexName).request();
335+
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, getMappingsRequest, listener,
336+
client.admin().indices()::getMappings);
337+
}
338+
339+
private void addTermsAndAliases(final MappingMetaData mapping, final String indexName, final Collection<String> termFields,
340+
final ActionListener<Boolean> listener) {
341+
long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings);
342+
343+
if (violatedFieldCountLimit(termFields.size(), fieldCountLimit, mapping)) {
344+
String message = "Cannot create job in index '" + indexName + "' as the " +
345+
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated";
346+
listener.onFailure(new IllegalArgumentException(message));
347+
} else {
348+
updateIndexMappingWithTermFields(indexName, mapping.type(), termFields, listener);
323349
}
324350
}
325351

326-
public static boolean violatedFieldCountLimit(long additionalFieldCount, long fieldCountLimit, IndexMetaData indexMetaData) {
327-
MappingMetaData mapping = indexMetaData.mapping();
352+
public static boolean violatedFieldCountLimit(long additionalFieldCount, long fieldCountLimit, MappingMetaData mapping) {
328353
long numFields = countFields(mapping.sourceAsMap());
329354
return numFields + additionalFieldCount > fieldCountLimit;
330355
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,19 @@
55
*/
66
package org.elasticsearch.xpack.ml.integration;
77

8+
import org.elasticsearch.action.ActionFuture;
89
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
11+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
12+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
913
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1014
import org.elasticsearch.action.bulk.BulkResponse;
1115
import org.elasticsearch.action.index.IndexRequest;
1216
import org.elasticsearch.action.support.WriteRequest;
17+
import org.elasticsearch.cluster.metadata.MappingMetaData;
1318
import org.elasticsearch.cluster.routing.UnassignedInfo;
1419
import org.elasticsearch.common.Strings;
20+
import org.elasticsearch.common.collect.ImmutableOpenMap;
1521
import org.elasticsearch.common.settings.Settings;
1622
import org.elasticsearch.common.unit.TimeValue;
1723
import org.elasticsearch.common.xcontent.ToXContent;
@@ -32,6 +38,7 @@
3238
import org.elasticsearch.xpack.core.ml.job.config.RuleAction;
3339
import org.elasticsearch.xpack.core.ml.job.config.RuleScope;
3440
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
41+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
3542
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
3643
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCountsTests;
3744
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
@@ -55,6 +62,7 @@
5562
import java.util.Date;
5663
import java.util.HashSet;
5764
import java.util.List;
65+
import java.util.Map;
5866
import java.util.Set;
5967
import java.util.concurrent.CountDownLatch;
6068
import java.util.concurrent.atomic.AtomicReference;
@@ -79,6 +87,54 @@ public void createComponents() throws Exception {
7987
waitForMlTemplates();
8088
}
8189

90+
public void testMultipleSimultaneousJobCreations() {
91+
92+
int numJobs = randomIntBetween(4, 7);
93+
94+
// Each job should result in one extra field being added to the results index mappings: field1, field2, field3, etc.
95+
// Due to all being created simultaneously this test may reveal race conditions in the code that updates the mappings.
96+
List<PutJobAction.Request> requests = new ArrayList<>(numJobs);
97+
for (int i = 1; i <= numJobs; ++i) {
98+
Job.Builder builder = new Job.Builder("job" + i);
99+
AnalysisConfig.Builder ac = createAnalysisConfig("field" + i, Collections.emptyList());
100+
DataDescription.Builder dc = new DataDescription.Builder();
101+
builder.setAnalysisConfig(ac);
102+
builder.setDataDescription(dc);
103+
104+
requests.add(new PutJobAction.Request(builder));
105+
}
106+
107+
// Start the requests as close together as possible, without waiting for each to complete before starting the next one.
108+
List<ActionFuture<PutJobAction.Response>> futures = new ArrayList<>(numJobs);
109+
for (PutJobAction.Request request : requests) {
110+
futures.add(client().execute(PutJobAction.INSTANCE, request));
111+
}
112+
113+
// Only after all requests are in-flight, wait for all the requests to complete.
114+
for (ActionFuture<PutJobAction.Response> future : futures) {
115+
future.actionGet();
116+
}
117+
118+
// Assert that the mappings contain all the additional fields: field1, field2, field3, etc.
119+
String sharedResultsIndex = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
120+
GetMappingsRequest request = new GetMappingsRequest().indices(sharedResultsIndex);
121+
GetMappingsResponse response = client().execute(GetMappingsAction.INSTANCE, request).actionGet();
122+
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> indexMappings = response.getMappings();
123+
assertNotNull(indexMappings);
124+
ImmutableOpenMap<String, MappingMetaData> typeMappings = indexMappings.get(sharedResultsIndex);
125+
assertNotNull("expected " + sharedResultsIndex + " in " + indexMappings, typeMappings);
126+
assertEquals("expected 1 type in " + typeMappings, 1, typeMappings.size());
127+
Map<String, Object> mappings = typeMappings.iterator().next().value.getSourceAsMap();
128+
assertNotNull(mappings);
129+
@SuppressWarnings("unchecked")
130+
Map<String, Object> properties = (Map<String, Object>) mappings.get("properties");
131+
assertNotNull("expected 'properties' field in " + mappings, properties);
132+
for (int i = 1; i <= numJobs; ++i) {
133+
String fieldName = "field" + i;
134+
assertNotNull("expected '" + fieldName + "' field in " + properties, properties.get(fieldName));
135+
}
136+
}
137+
82138
public void testGetCalandarByJobId() throws Exception {
83139
List<Calendar> calendars = new ArrayList<>();
84140
calendars.add(new Calendar("empty calendar", Collections.emptyList(), null));
@@ -473,7 +529,7 @@ private Job.Builder createJob(String jobId, List<String> filterIds) {
473529
private Job.Builder createJob(String jobId, List<String> filterIds, List<String> jobGroups) {
474530
Job.Builder builder = new Job.Builder(jobId);
475531
builder.setGroups(jobGroups);
476-
AnalysisConfig.Builder ac = createAnalysisConfig(filterIds);
532+
AnalysisConfig.Builder ac = createAnalysisConfig("by_field", filterIds);
477533
DataDescription.Builder dc = new DataDescription.Builder();
478534
builder.setAnalysisConfig(ac);
479535
builder.setDataDescription(dc);
@@ -483,14 +539,14 @@ private Job.Builder createJob(String jobId, List<String> filterIds, List<String>
483539
return builder;
484540
}
485541

486-
private AnalysisConfig.Builder createAnalysisConfig(List<String> filterIds) {
542+
private AnalysisConfig.Builder createAnalysisConfig(String byFieldName, List<String> filterIds) {
487543
Detector.Builder detector = new Detector.Builder("mean", "field");
488-
detector.setByFieldName("by_field");
544+
detector.setByFieldName(byFieldName);
489545
List<DetectionRule> rules = new ArrayList<>();
490546

491547
for (String filterId : filterIds) {
492548
RuleScope.Builder ruleScope = RuleScope.builder();
493-
ruleScope.include("by_field", filterId);
549+
ruleScope.include(byFieldName, filterId);
494550

495551
rules.add(new DetectionRule.Builder(ruleScope).setActions(RuleAction.SKIP_RESULT).build());
496552
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -783,10 +783,10 @@ public void testViolatedFieldCountLimit() throws Exception {
783783
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
784784
.putMapping(new MappingMetaData("type1", Collections.singletonMap("properties", mapping)))
785785
.build();
786-
boolean result = JobResultsProvider.violatedFieldCountLimit(0, 10, indexMetaData1);
786+
boolean result = JobResultsProvider.violatedFieldCountLimit(0, 10, indexMetaData1.mapping());
787787
assertFalse(result);
788788

789-
result = JobResultsProvider.violatedFieldCountLimit(1, 10, indexMetaData1);
789+
result = JobResultsProvider.violatedFieldCountLimit(1, 10, indexMetaData1.mapping());
790790
assertTrue(result);
791791

792792
for (; i < 20; i++) {
@@ -801,7 +801,7 @@ public void testViolatedFieldCountLimit() throws Exception {
801801
.putMapping(new MappingMetaData("type1", Collections.singletonMap("properties", mapping)))
802802
.build();
803803

804-
result = JobResultsProvider.violatedFieldCountLimit(0, 19, indexMetaData2);
804+
result = JobResultsProvider.violatedFieldCountLimit(0, 19, indexMetaData2.mapping());
805805
assertTrue(result);
806806
}
807807

0 commit comments

Comments
 (0)