From f2411eced6c85e7f21f160ed20cb6c33b4cb494a Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Thu, 11 Jul 2019 12:44:29 +0200 Subject: [PATCH 1/5] Update .ml-config mappings before indexing job, datafeed or df analytics config --- .../persistence/ElasticsearchMappings.java | 12 ++-- .../ElasticsearchMappingsTests.java | 65 +++++++++++++++++++ .../TransportPutDataFrameAnalyticsAction.java | 40 ++++++++++-- .../ml/action/TransportPutDatafeedAction.java | 22 ++++++- .../xpack/ml/job/JobManager.java | 17 ++++- 5 files changed, 142 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index 77073a23491e6..894741791124b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -18,7 +18,7 @@ import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.common.CheckedBiFunction; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.Index; @@ -140,9 +140,13 @@ private ElasticsearchMappings() { } public static XContentBuilder configMapping() throws IOException { + return configMapping(SINGLE_MAPPING_NAME); + } + + public static XContentBuilder configMapping(String mappingType) throws IOException { XContentBuilder builder = jsonBuilder(); builder.startObject(); - builder.startObject(SINGLE_MAPPING_NAME); + builder.startObject(mappingType); addMetaInformation(builder); addDefaultMapping(builder); builder.startObject(PROPERTIES); @@ -1146,7 +1150,7 @@ static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndic } public static void addDocMappingIfMissing(String alias, - CheckedBiFunction, XContentBuilder, IOException> mappingSupplier, + CheckedFunction mappingSupplier, Client client, ClusterState state, ActionListener listener) { AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); if (aliasOrIndex == null) { @@ -1170,7 +1174,7 @@ public static void addDocMappingIfMissing(String alias, IndexMetaData indexMetaData = state.metaData().index(indicesThatRequireAnUpdate[0]); String mappingType = indexMetaData.mapping().type(); - try (XContentBuilder mapping = mappingSupplier.apply(mappingType, Collections.emptyList())) { + try (XContentBuilder mapping = mappingSupplier.apply(mappingType)) { PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate); putMappingRequest.type(mappingType); putMappingRequest.source(mapping); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java index ee8d921485996..13ce6f2ab610d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java @@ -10,6 +10,11 @@ import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -17,11 +22,13 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -35,6 +42,7 @@ import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.core.ml.job.results.ReservedFieldNames; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.mockito.ArgumentCaptor; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; @@ -48,7 +56,16 @@ import java.util.Map; import java.util.Set; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; public class ElasticsearchMappingsTests extends ESTestCase { @@ -207,6 +224,54 @@ public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOExcepti ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion())); } + public void testAddDocMappingIfMissing() throws IOException { + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + Client client = mock(Client.class); + when(client.threadPool()).thenReturn(threadPool); + doAnswer( + invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new AcknowledgedResponse(true)); + return null; + }) + .when(client).execute(eq(PutMappingAction.INSTANCE), any(), any(ActionListener.class)); + + ClusterState clusterState = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("index-name", "0.0")); + ElasticsearchMappings.addDocMappingIfMissing( + "index-name", + ElasticsearchMappingsTests::fakeMapping, + client, + clusterState, + ActionListener.wrap( + ok -> assertTrue(ok), + e -> fail(e.toString()) + ) + ); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PutMappingRequest.class); + verify(client).threadPool(); + verify(client).execute(eq(PutMappingAction.INSTANCE), requestCaptor.capture(), any(ActionListener.class)); + verifyNoMoreInteractions(client); + + PutMappingRequest request = requestCaptor.getValue(); + assertThat(request.type(), equalTo("_doc")); + assertThat(request.indices(), equalTo(new String[] { "index-name" })); + assertThat(request.source(), equalTo("{\"_doc\":{\"properties\":{\"some-field\":{\"type\":\"long\"}}}}")); + } + + private static XContentBuilder fakeMapping(String mappingType) throws IOException { + return jsonBuilder() + .startObject() + .startObject(mappingType) + .startObject(ElasticsearchMappings.PROPERTIES) + .startObject("some-field") + .field(ElasticsearchMappings.TYPE, ElasticsearchMappings.LONG) + .endObject() + .endObject() + .endObject() + .endObject(); + } private ClusterState getClusterStateWithMappingsWithMetaData(Map namesAndVersions) throws IOException { MetaData.Builder metaDataBuilder = MetaData.builder(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java index d8f5dbb469f5f..3c6dd8f8345ec 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java @@ -7,9 +7,11 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -29,6 +31,8 @@ import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.security.SecurityContext; @@ -43,6 +47,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.Map; import java.util.Objects; import java.util.function.Supplier; @@ -58,6 +63,7 @@ public class TransportPutDataFrameAnalyticsAction private final IndexNameExpressionResolver indexNameExpressionResolver; private volatile ByteSizeValue maxModelMemoryLimit; + private volatile ClusterState clusterState; @Inject public TransportPutDataFrameAnalyticsAction(Settings settings, TransportService transportService, ActionFilters actionFilters, @@ -78,6 +84,8 @@ public TransportPutDataFrameAnalyticsAction(Settings settings, TransportService maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearningField.MAX_MODEL_MEMORY_LIMIT, this::setMaxModelMemoryLimit); + clusterState = clusterService.state(); + clusterService.addListener(event -> clusterState = event.state()); } private void setMaxModelMemoryLimit(ByteSizeValue maxModelMemoryLimit) { @@ -97,6 +105,7 @@ protected void doExecute(Task task, PutDataFrameAnalyticsAction.Request request, .setCreateTime(Instant.now()) .setVersion(Version.CURRENT) .build(); + if (licenseState.isAuthAllowed()) { final String username = securityContext.getUser().principal(); RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder() @@ -120,9 +129,12 @@ protected void doExecute(Task task, PutDataFrameAnalyticsAction.Request request, client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); } else { - configProvider.put(memoryCappedConfig, threadPool.getThreadContext().getHeaders(), ActionListener.wrap( - indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)), - listener::onFailure + updateDocMappingAndPutConfig( + memoryCappedConfig, + threadPool.getThreadContext().getHeaders(), + ActionListener.wrap( + indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)), + listener::onFailure )); } } @@ -131,9 +143,12 @@ private void handlePrivsResponse(String username, DataFrameAnalyticsConfig memor HasPrivilegesResponse response, ActionListener listener) throws IOException { if (response.isCompleteMatch()) { - configProvider.put(memoryCappedConfig, threadPool.getThreadContext().getHeaders(), ActionListener.wrap( - indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)), - listener::onFailure + updateDocMappingAndPutConfig( + memoryCappedConfig, + threadPool.getThreadContext().getHeaders(), + ActionListener.wrap( + indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)), + listener::onFailure )); } else { XContentBuilder builder = JsonXContent.contentBuilder(); @@ -150,6 +165,19 @@ private void handlePrivsResponse(String username, DataFrameAnalyticsConfig memor } } + private void updateDocMappingAndPutConfig(DataFrameAnalyticsConfig config, + Map headers, + ActionListener listener) { + ElasticsearchMappings.addDocMappingIfMissing( + AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings::configMapping, + client, + clusterState, + ActionListener.wrap( + unused -> configProvider.put(config, headers, listener), + listener::onFailure)); + } + private void validateConfig(DataFrameAnalyticsConfig config) { if (MlStrings.isValidId(config.getId()) == false) { throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.INVALID_ID, DataFrameAnalyticsConfig.ID, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 5b8e91cc65280..5d6ea7d7daaea 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -36,6 +36,8 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction; import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction; @@ -65,6 +67,8 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction clusterState = event.state()); } @Override @@ -181,13 +187,25 @@ private void putDatafeed(PutDatafeedAction.Request request, Map } DatafeedConfig.validateAggregations(request.getDatafeed().getParsedAggregations(xContentRegistry)); - CheckedConsumer validationOk = ok -> { - datafeedConfigProvider.putDatafeedConfig(request.getDatafeed(), headers, ActionListener.wrap( + CheckedConsumer mappingsUpdated = ok -> { + datafeedConfigProvider.putDatafeedConfig( + request.getDatafeed(), + headers, + ActionListener.wrap( indexResponse -> listener.onResponse(new PutDatafeedAction.Response(request.getDatafeed())), listener::onFailure )); }; + CheckedConsumer validationOk = ok -> { + ElasticsearchMappings.addDocMappingIfMissing( + AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings::configMapping, + client, + clusterState, + ActionListener.wrap(mappingsUpdated, listener::onFailure)); + }; + CheckedConsumer jobOk = ok -> jobConfigProvider.validateDatafeedJob(request.getDatafeed(), ActionListener.wrap(validationOk, listener::onFailure)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 7c2f15591b94f..a5e79ffbb32d8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -45,6 +45,8 @@ import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -97,6 +99,7 @@ public class JobManager { private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck; private volatile ByteSizeValue maxModelMemoryLimit; + private volatile ClusterState clusterState; /** * Create a JobManager @@ -118,6 +121,8 @@ public JobManager(Environment environment, Settings settings, JobResultsProvider maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearningField.MAX_MODEL_MEMORY_LIMIT, this::setMaxModelMemoryLimit); + clusterState = clusterService.state(); + clusterService.addListener(event -> clusterState = event.state()); } private void setMaxModelMemoryLimit(ByteSizeValue maxModelMemoryLimit) { @@ -256,7 +261,7 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist ActionListener putJobListener = new ActionListener() { @Override - public void onResponse(Boolean indicesCreated) { + public void onResponse(Boolean mappingsUpdated) { jobConfigProvider.putJob(job, ActionListener.wrap( response -> { @@ -283,10 +288,18 @@ public void onFailure(Exception e) { } }; + ActionListener addDocMappingsListener = ActionListener.wrap( + indicesCreated -> { + ElasticsearchMappings.addDocMappingIfMissing( + AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings::configMapping, client, clusterState, putJobListener); + }, + actionListener::onFailure + ); + ActionListener> checkForLeftOverDocs = ActionListener.wrap( matchedIds -> { if (matchedIds.isEmpty()) { - jobResultsProvider.createJobResultIndex(job, state, putJobListener); + jobResultsProvider.createJobResultIndex(job, state, addDocMappingsListener); } else { // A job has the same Id as one of the group names // error with the first in the list From 91075ecbae1b09d17a576dfc39203b80722a7da6 Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Fri, 12 Jul 2019 09:38:51 +0200 Subject: [PATCH 2/5] Do not fetch cluster state prematurely. Log warning if it was impossible to obtain cluster state before updating mappings --- .../TransportPutDataFrameAnalyticsAction.java | 14 +++++++++++--- .../ml/action/TransportPutDatafeedAction.java | 10 +++++++++- .../org/elasticsearch/xpack/ml/job/JobManager.java | 6 +++++- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java index 3c6dd8f8345ec..8910763f71c19 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexResponse; @@ -54,6 +56,8 @@ public class TransportPutDataFrameAnalyticsAction extends HandledTransportAction { + private static final Logger logger = LogManager.getLogger(TransportPutDataFrameAnalyticsAction.class); + private final XPackLicenseState licenseState; private final DataFrameAnalyticsConfigProvider configProvider; private final ThreadPool threadPool; @@ -84,7 +88,6 @@ public TransportPutDataFrameAnalyticsAction(Settings settings, TransportService maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearningField.MAX_MODEL_MEMORY_LIMIT, this::setMaxModelMemoryLimit); - clusterState = clusterService.state(); clusterService.addListener(event -> clusterState = event.state()); } @@ -166,8 +169,13 @@ private void handlePrivsResponse(String username, DataFrameAnalyticsConfig memor } private void updateDocMappingAndPutConfig(DataFrameAnalyticsConfig config, - Map headers, - ActionListener listener) { + Map headers, + ActionListener listener) { + if (clusterState == null) { + logger.warn("Cannot update doc mapping because clusterState == null"); + configProvider.put(config, headers, listener); + return; + } ElasticsearchMappings.addDocMappingIfMissing( AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings::configMapping, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 5d6ea7d7daaea..53aaa39b35d3b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; @@ -60,6 +62,8 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(TransportPutDatafeedAction.class); + private final XPackLicenseState licenseState; private final Client client; private final SecurityContext securityContext; @@ -84,7 +88,6 @@ public TransportPutDatafeedAction(Settings settings, TransportService transportS this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry); this.jobConfigProvider = new JobConfigProvider(client, xContentRegistry); this.xContentRegistry = xContentRegistry; - clusterState = clusterService.state(); clusterService.addListener(event -> clusterState = event.state()); } @@ -198,6 +201,11 @@ private void putDatafeed(PutDatafeedAction.Request request, Map }; CheckedConsumer validationOk = ok -> { + if (clusterState == null) { + logger.warn("Cannot update doc mapping because clusterState == null"); + mappingsUpdated.accept(false); + return; + } ElasticsearchMappings.addDocMappingIfMissing( AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings::configMapping, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index a5e79ffbb32d8..5b2ebb212f8e8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -121,7 +121,6 @@ public JobManager(Environment environment, Settings settings, JobResultsProvider maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearningField.MAX_MODEL_MEMORY_LIMIT, this::setMaxModelMemoryLimit); - clusterState = clusterService.state(); clusterService.addListener(event -> clusterState = event.state()); } @@ -290,6 +289,11 @@ public void onFailure(Exception e) { ActionListener addDocMappingsListener = ActionListener.wrap( indicesCreated -> { + if (clusterState == null) { + logger.warn("Cannot update doc mapping because clusterState == null"); + putJobListener.onResponse(false); + return; + } ElasticsearchMappings.addDocMappingIfMissing( AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings::configMapping, client, clusterState, putJobListener); }, From d627a25c6e4760e0e2a57803960dd291abba9f15 Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Fri, 12 Jul 2019 11:29:16 +0200 Subject: [PATCH 3/5] Reuse exception message's parsing logic from putJobListener in addDocMappingsListener This fixes testCreateJob_WithClashingFieldMappingsFail tests --- .../main/java/org/elasticsearch/xpack/ml/job/JobManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 5b2ebb212f8e8..7b1690eb588c8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -297,7 +297,7 @@ public void onFailure(Exception e) { ElasticsearchMappings.addDocMappingIfMissing( AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings::configMapping, client, clusterState, putJobListener); }, - actionListener::onFailure + putJobListener::onFailure ); ActionListener> checkForLeftOverDocs = ActionListener.wrap( From 11177770d37531e8a859eade2013ca0ab873a6bc Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Fri, 12 Jul 2019 11:40:07 +0200 Subject: [PATCH 4/5] Use cluster state directly when it is provided by masterOperation --- .../ml/action/TransportPutDatafeedAction.java | 17 +++++++++-------- .../elasticsearch/xpack/ml/job/JobManager.java | 6 ++---- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 53aaa39b35d3b..f6d400649beb9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -71,8 +71,6 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction clusterState = event.state()); } @Override @@ -120,7 +117,7 @@ protected void masterOperation(Task task, PutDatafeedAction.Request request, Clu .indices(indices); ActionListener privResponseListener = ActionListener.wrap( - r -> handlePrivsResponse(username, request, r, listener), + r -> handlePrivsResponse(username, request, r, state, listener), listener::onFailure); ActionListener getRollupIndexCapsActionHandler = ActionListener.wrap( @@ -154,15 +151,17 @@ protected void masterOperation(Task task, PutDatafeedAction.Request request, Clu } } else { - putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener); + putDatafeed(request, threadPool.getThreadContext().getHeaders(), state, listener); } } - private void handlePrivsResponse(String username, PutDatafeedAction.Request request, + private void handlePrivsResponse(String username, + PutDatafeedAction.Request request, HasPrivilegesResponse response, + ClusterState clusterState, ActionListener listener) throws IOException { if (response.isCompleteMatch()) { - putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener); + putDatafeed(request, threadPool.getThreadContext().getHeaders(), clusterState, listener); } else { XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -178,7 +177,9 @@ private void handlePrivsResponse(String username, PutDatafeedAction.Request requ } } - private void putDatafeed(PutDatafeedAction.Request request, Map headers, + private void putDatafeed(PutDatafeedAction.Request request, + Map headers, + ClusterState clusterState, ActionListener listener) { String datafeedId = request.getDatafeed().getId(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 7b1690eb588c8..683fbb7c65c17 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -99,7 +99,6 @@ public class JobManager { private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck; private volatile ByteSizeValue maxModelMemoryLimit; - private volatile ClusterState clusterState; /** * Create a JobManager @@ -121,7 +120,6 @@ public JobManager(Environment environment, Settings settings, JobResultsProvider maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearningField.MAX_MODEL_MEMORY_LIMIT, this::setMaxModelMemoryLimit); - clusterService.addListener(event -> clusterState = event.state()); } private void setMaxModelMemoryLimit(ByteSizeValue maxModelMemoryLimit) { @@ -289,13 +287,13 @@ public void onFailure(Exception e) { ActionListener addDocMappingsListener = ActionListener.wrap( indicesCreated -> { - if (clusterState == null) { + if (state == null) { logger.warn("Cannot update doc mapping because clusterState == null"); putJobListener.onResponse(false); return; } ElasticsearchMappings.addDocMappingIfMissing( - AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings::configMapping, client, clusterState, putJobListener); + AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings::configMapping, client, state, putJobListener); }, putJobListener::onFailure ); From f19c6aea329f2b75f48148eb86c9e9c479f0b4d7 Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Fri, 12 Jul 2019 12:28:49 +0200 Subject: [PATCH 5/5] Fetch cluster state right before updating mappings --- .../xpack/ml/action/TransportPutDataFrameAnalyticsAction.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java index 8910763f71c19..0eda67644ca6c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java @@ -67,7 +67,6 @@ public class TransportPutDataFrameAnalyticsAction private final IndexNameExpressionResolver indexNameExpressionResolver; private volatile ByteSizeValue maxModelMemoryLimit; - private volatile ClusterState clusterState; @Inject public TransportPutDataFrameAnalyticsAction(Settings settings, TransportService transportService, ActionFilters actionFilters, @@ -88,7 +87,6 @@ public TransportPutDataFrameAnalyticsAction(Settings settings, TransportService maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearningField.MAX_MODEL_MEMORY_LIMIT, this::setMaxModelMemoryLimit); - clusterService.addListener(event -> clusterState = event.state()); } private void setMaxModelMemoryLimit(ByteSizeValue maxModelMemoryLimit) { @@ -171,6 +169,7 @@ private void handlePrivsResponse(String username, DataFrameAnalyticsConfig memor private void updateDocMappingAndPutConfig(DataFrameAnalyticsConfig config, Map headers, ActionListener listener) { + ClusterState clusterState = clusterService.state(); if (clusterState == null) { logger.warn("Cannot update doc mapping because clusterState == null"); configProvider.put(config, headers, listener);