Skip to content

Commit 479502e

Browse files
author
David Roberts
committed
[ML] Fix race condition when creating multiple jobs (#40049)
If multiple jobs are created together and the anomaly results index does not exist then some of the jobs could fail to update the mappings of the results index. This lead them to fail to write their results correctly later. Although this scenario sounds rare, it is exactly what happens if the user creates their first jobs using the Nginx module in the ML UI. This change fixes the problem by updating the mappings of the results index if it is found to exist during a creation attempt. Fixes #38785
1 parent 24ad1e5 commit 479502e

File tree

3 files changed

+104
-28
lines changed

3 files changed

+104
-28
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
1717
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
1818
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
19+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
20+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
1921
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
2022
import org.elasticsearch.action.bulk.BulkAction;
2123
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -37,7 +39,6 @@
3739
import org.elasticsearch.cluster.ClusterState;
3840
import org.elasticsearch.cluster.block.ClusterBlock;
3941
import org.elasticsearch.cluster.block.ClusterBlockException;
40-
import org.elasticsearch.cluster.metadata.IndexMetaData;
4142
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
4243
import org.elasticsearch.cluster.metadata.MappingMetaData;
4344
import org.elasticsearch.common.Nullable;
@@ -300,33 +301,54 @@ public void createJobResultIndex(Job job, ClusterState state, final ActionListen
300301
// so we need to handle that possibility
301302
if (e instanceof ResourceAlreadyExistsException) {
302303
LOGGER.info("Index already exists");
303-
// Create the alias
304-
createAliasListener.onResponse(true);
304+
// Add the term field mappings and alias. The complication is that the state at the
305+
// beginning of the operation doesn't have any knowledge of the index, as it's only
306+
// just been created. So we need yet another operation to get the mappings for it.
307+
getLatestIndexMappings(indexName, ActionListener.wrap(
308+
response -> {
309+
// Expect one index. If this is not the case then it means the index
310+
// has been deleted almost immediately after being created, and this is
311+
// so unlikely that it's reasonable to fail the whole operation.
312+
Iterator<MappingMetaData> mappingsIt =
313+
response.getMappings().iterator().next().value.valuesIt();
314+
addTermsAndAliases(mappingsIt, indexName, termFields, createAliasListener);
315+
},
316+
finalListener::onFailure
317+
));
305318
} else {
306319
finalListener.onFailure(e);
307320
}
308321
}
309322
), client.admin().indices()::create);
310323
} else {
311-
long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings);
312-
if (violatedFieldCountLimit(indexName, termFields.size(), fieldCountLimit, state)) {
313-
String message = "Cannot create job in index '" + indexName + "' as the " +
314-
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated";
315-
finalListener.onFailure(new IllegalArgumentException(message));
316-
} else {
317-
updateIndexMappingWithTermFields(indexName, termFields,
318-
ActionListener.wrap(createAliasListener::onResponse, finalListener::onFailure));
319-
}
324+
Iterator<MappingMetaData> mappingsIt = state.metaData().index(indexName).getMappings().valuesIt();
325+
addTermsAndAliases(mappingsIt, indexName, termFields, createAliasListener);
326+
}
327+
}
328+
329+
private void getLatestIndexMappings(final String indexName, final ActionListener<GetMappingsResponse> listener) {
330+
331+
GetMappingsRequest getMappingsRequest = client.admin().indices().prepareGetMappings(indexName).request();
332+
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, getMappingsRequest, listener,
333+
client.admin().indices()::getMappings);
334+
}
335+
336+
private void addTermsAndAliases(final Iterator<MappingMetaData> mappingsIt, final String indexName,
337+
final Collection<String> termFields, final ActionListener<Boolean> listener) {
338+
long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings);
339+
if (violatedFieldCountLimit(termFields.size(), fieldCountLimit, mappingsIt)) {
340+
String message = "Cannot create job in index '" + indexName + "' as the " +
341+
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated";
342+
listener.onFailure(new IllegalArgumentException(message));
343+
} else {
344+
updateIndexMappingWithTermFields(indexName, termFields, listener);
320345
}
321346
}
322347

323-
public static boolean violatedFieldCountLimit(
324-
String indexName, long additionalFieldCount, long fieldCountLimit, ClusterState clusterState) {
348+
public static boolean violatedFieldCountLimit(long additionalFieldCount, long fieldCountLimit, Iterator<MappingMetaData> mappingsIt) {
325349
long numFields = 0;
326-
IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
327-
Iterator<MappingMetaData> mappings = indexMetaData.getMappings().valuesIt();
328-
while (mappings.hasNext()) {
329-
MappingMetaData mapping = mappings.next();
350+
while (mappingsIt.hasNext()) {
351+
MappingMetaData mapping = mappingsIt.next();
330352
numFields += countFields(mapping.sourceAsMap());
331353
}
332354
return numFields + additionalFieldCount > fieldCountLimit;

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,19 @@
55
*/
66
package org.elasticsearch.xpack.ml.integration;
77

8+
import org.elasticsearch.action.ActionFuture;
89
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
11+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
12+
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
913
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1014
import org.elasticsearch.action.bulk.BulkResponse;
1115
import org.elasticsearch.action.index.IndexRequest;
1216
import org.elasticsearch.action.support.WriteRequest;
17+
import org.elasticsearch.cluster.metadata.MappingMetaData;
1318
import org.elasticsearch.cluster.routing.UnassignedInfo;
1419
import org.elasticsearch.common.Strings;
20+
import org.elasticsearch.common.collect.ImmutableOpenMap;
1521
import org.elasticsearch.common.settings.Settings;
1622
import org.elasticsearch.common.unit.TimeValue;
1723
import org.elasticsearch.common.xcontent.ToXContent;
@@ -32,6 +38,7 @@
3238
import org.elasticsearch.xpack.core.ml.job.config.RuleAction;
3339
import org.elasticsearch.xpack.core.ml.job.config.RuleScope;
3440
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
41+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
3542
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
3643
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCountsTests;
3744
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
@@ -55,6 +62,7 @@
5562
import java.util.Date;
5663
import java.util.HashSet;
5764
import java.util.List;
65+
import java.util.Map;
5866
import java.util.Set;
5967
import java.util.concurrent.CountDownLatch;
6068
import java.util.concurrent.atomic.AtomicReference;
@@ -79,6 +87,54 @@ public void createComponents() throws Exception {
7987
waitForMlTemplates();
8088
}
8189

90+
public void testMultipleSimultaneousJobCreations() {
91+
92+
int numJobs = randomIntBetween(4, 7);
93+
94+
// Each job should result in one extra field being added to the results index mappings: field1, field2, field3, etc.
95+
// Due to all being created simultaneously this test may reveal race conditions in the code that updates the mappings.
96+
List<PutJobAction.Request> requests = new ArrayList<>(numJobs);
97+
for (int i = 1; i <= numJobs; ++i) {
98+
Job.Builder builder = new Job.Builder("job" + i);
99+
AnalysisConfig.Builder ac = createAnalysisConfig("field" + i, Collections.emptyList());
100+
DataDescription.Builder dc = new DataDescription.Builder();
101+
builder.setAnalysisConfig(ac);
102+
builder.setDataDescription(dc);
103+
104+
requests.add(new PutJobAction.Request(builder));
105+
}
106+
107+
// Start the requests as close together as possible, without waiting for each to complete before starting the next one.
108+
List<ActionFuture<PutJobAction.Response>> futures = new ArrayList<>(numJobs);
109+
for (PutJobAction.Request request : requests) {
110+
futures.add(client().execute(PutJobAction.INSTANCE, request));
111+
}
112+
113+
// Only after all requests are in-flight, wait for all the requests to complete.
114+
for (ActionFuture<PutJobAction.Response> future : futures) {
115+
future.actionGet();
116+
}
117+
118+
// Assert that the mappings contain all the additional fields: field1, field2, field3, etc.
119+
String sharedResultsIndex = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
120+
GetMappingsRequest request = new GetMappingsRequest().indices(sharedResultsIndex);
121+
GetMappingsResponse response = client().execute(GetMappingsAction.INSTANCE, request).actionGet();
122+
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> indexMappings = response.getMappings();
123+
assertNotNull(indexMappings);
124+
ImmutableOpenMap<String, MappingMetaData> typeMappings = indexMappings.get(sharedResultsIndex);
125+
assertNotNull("expected " + sharedResultsIndex + " in " + indexMappings, typeMappings);
126+
assertEquals("expected 1 type in " + typeMappings, 1, typeMappings.size());
127+
Map<String, Object> mappings = typeMappings.iterator().next().value.getSourceAsMap();
128+
assertNotNull(mappings);
129+
@SuppressWarnings("unchecked")
130+
Map<String, Object> properties = (Map<String, Object>) mappings.get("properties");
131+
assertNotNull("expected 'properties' field in " + mappings, properties);
132+
for (int i = 1; i <= numJobs; ++i) {
133+
String fieldName = "field" + i;
134+
assertNotNull("expected '" + fieldName + "' field in " + properties, properties.get(fieldName));
135+
}
136+
}
137+
82138
public void testGetCalandarByJobId() throws Exception {
83139
List<Calendar> calendars = new ArrayList<>();
84140
calendars.add(new Calendar("empty calendar", Collections.emptyList(), null));
@@ -468,7 +524,7 @@ private Job.Builder createJob(String jobId, List<String> filterIds) {
468524
private Job.Builder createJob(String jobId, List<String> filterIds, List<String> jobGroups) {
469525
Job.Builder builder = new Job.Builder(jobId);
470526
builder.setGroups(jobGroups);
471-
AnalysisConfig.Builder ac = createAnalysisConfig(filterIds);
527+
AnalysisConfig.Builder ac = createAnalysisConfig("by_field", filterIds);
472528
DataDescription.Builder dc = new DataDescription.Builder();
473529
builder.setAnalysisConfig(ac);
474530
builder.setDataDescription(dc);
@@ -478,14 +534,14 @@ private Job.Builder createJob(String jobId, List<String> filterIds, List<String>
478534
return builder;
479535
}
480536

481-
private AnalysisConfig.Builder createAnalysisConfig(List<String> filterIds) {
537+
private AnalysisConfig.Builder createAnalysisConfig(String byFieldName, List<String> filterIds) {
482538
Detector.Builder detector = new Detector.Builder("mean", "field");
483-
detector.setByFieldName("by_field");
539+
detector.setByFieldName(byFieldName);
484540
List<DetectionRule> rules = new ArrayList<>();
485541

486542
for (String filterId : filterIds) {
487543
RuleScope.Builder ruleScope = RuleScope.builder();
488-
ruleScope.include("by_field", filterId);
544+
ruleScope.include(byFieldName, filterId);
489545

490546
rules.add(new DetectionRule.Builder(ruleScope).setActions(RuleAction.SKIP_RESULT).build());
491547
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -783,12 +783,10 @@ public void testViolatedFieldCountLimit() throws Exception {
783783
MetaData metaData = MetaData.builder()
784784
.put(indexMetaData1)
785785
.build();
786-
boolean result = JobResultsProvider.violatedFieldCountLimit("index1", 0, 10,
787-
ClusterState.builder(new ClusterName("_name")).metaData(metaData).build());
786+
boolean result = JobResultsProvider.violatedFieldCountLimit(0, 10, metaData.index("index1").getMappings().valuesIt());
788787
assertFalse(result);
789788

790-
result = JobResultsProvider.violatedFieldCountLimit("index1", 1, 10,
791-
ClusterState.builder(new ClusterName("_name")).metaData(metaData).build());
789+
result = JobResultsProvider.violatedFieldCountLimit(1, 10, metaData.index("index1").getMappings().valuesIt());
792790
assertTrue(result);
793791

794792
IndexMetaData.Builder indexMetaData2 = new IndexMetaData.Builder("index1")
@@ -801,8 +799,8 @@ public void testViolatedFieldCountLimit() throws Exception {
801799
metaData = MetaData.builder()
802800
.put(indexMetaData2)
803801
.build();
804-
result = JobResultsProvider.violatedFieldCountLimit("index1", 0, 19,
805-
ClusterState.builder(new ClusterName("_name")).metaData(metaData).build());
802+
803+
result = JobResultsProvider.violatedFieldCountLimit(0, 19, metaData.index("index1").getMappings().valuesIt());
806804
assertTrue(result);
807805
}
808806

0 commit comments

Comments
 (0)