From cfafe3c6d8e8447b372aef315ff6a7c63318d33f Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 18 Sep 2018 12:48:55 +0100 Subject: [PATCH] [ML] Change Datafeed actions to read config from the config index (#33273) --- .../xpack/core/ml/MlMetadata.java | 3 +- .../core/ml/datafeed/DatafeedUpdate.java | 2 +- .../xpack/core/ml/job/messages/Messages.java | 1 + .../xpack/core/ml/utils/ExceptionsHelper.java | 4 + .../action/TransportDeleteDatafeedAction.java | 59 ++++--- .../action/TransportGetDatafeedsAction.java | 61 ++++++- .../ml/action/TransportPutDatafeedAction.java | 93 +++++++---- .../action/TransportUpdateDatafeedAction.java | 95 +++++++---- .../persistence/DatafeedConfigProvider.java | 151 ++++++++++++++---- .../ml/job/persistence/JobConfigProvider.java | 21 +++ .../integration/DatafeedConfigProviderIT.java | 139 +++++++++++++--- .../ml/integration/JobConfigProviderIT.java | 38 +++++ .../xpack/ml/job/JobManagerTests.java | 59 +++---- .../ml/job/persistence/MockClientBuilder.java | 39 +++++ 14 files changed, 578 insertions(+), 187 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 9e5bceb0abee2..78adf66819e94 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.core.ml; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; @@ -295,7 +294,7 @@ public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) { public Builder putDatafeed(DatafeedConfig datafeedConfig, Map headers) { if (datafeeds.containsKey(datafeedConfig.getId())) { - throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists"); + throw ExceptionsHelper.datafeedAlreadyExists(datafeedConfig.getId()); } String jobId = datafeedConfig.getJobId(); checkJobIsAvailableForDatafeed(jobId); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index f3748cefc51bc..b2dc0c0f165c5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -204,7 +204,7 @@ private void addOptionalField(XContentBuilder builder, ParseField field, Object } } - String getJobId() { + public String getJobId() { return jobId; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 3c571c9d60509..d56b77d7a3bfa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -41,6 +41,7 @@ public final class Messages { public static final String DATAFEED_MISSING_MAX_AGGREGATION_FOR_TIME_FIELD = "Missing max aggregation for time_field [{0}]"; public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL = "Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]"; + public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists"; public static final String FILTER_NOT_FOUND = "No filter with id [{0}] exists"; public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java index d5b83d25ce315..83bbe79a7b470 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java @@ -30,6 +30,10 @@ public static ResourceNotFoundException missingDatafeedException(String datafeed return new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId)); } + public static ResourceAlreadyExistsException datafeedAlreadyExists(String datafeedId) { + return new ResourceAlreadyExistsException(Messages.getMessage(Messages.DATAFEED_ID_ALREADY_TAKEN, datafeedId)); + } + public static ElasticsearchException serverError(String msg) { return new ElasticsearchException(msg); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java index 37210ce3c6ca2..f441e42acc517 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java @@ -11,42 +11,48 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.XPackPlugin; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public class TransportDeleteDatafeedAction extends TransportMasterNodeAction { - private Client client; - private PersistentTasksService persistentTasksService; + private final Client client; + private final DatafeedConfigProvider datafeedConfigProvider; + private final ClusterService clusterService; + private final PersistentTasksService persistentTasksService; @Inject public TransportDeleteDatafeedAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Client client, PersistentTasksService persistentTasksService) { + Client client, PersistentTasksService persistentTasksService, + NamedXContentRegistry xContentRegistry) { super(settings, DeleteDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, DeleteDatafeedAction.Request::new); this.client = client; + this.datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); this.persistentTasksService = persistentTasksService; + this.clusterService = clusterService; } @Override @@ -65,14 +71,14 @@ protected void masterOperation(DeleteDatafeedAction.Request request, ClusterStat if (request.isForce()) { forceDeleteDatafeed(request, state, listener); } else { - deleteDatafeedFromMetadata(request, listener); + deleteDatafeedConfig(request, listener); } } private void forceDeleteDatafeed(DeleteDatafeedAction.Request request, ClusterState state, ActionListener listener) { ActionListener finalListener = ActionListener.wrap( - response -> deleteDatafeedFromMetadata(request, listener), + response -> deleteDatafeedConfig(request, listener), listener::onFailure ); @@ -111,28 +117,19 @@ public void onFailure(Exception e) { } } - private void deleteDatafeedFromMetadata(DeleteDatafeedAction.Request request, ActionListener listener) { - clusterService.submitStateUpdateTask("delete-datafeed-" + request.getDatafeedId(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return new AcknowledgedResponse(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) { - XPackPlugin.checkReadyForXPackCustomMetadata(currentState); - MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState); - PersistentTasksCustomMetaData persistentTasks = - currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) - .removeDatafeed(request.getDatafeedId(), persistentTasks).build(); - return ClusterState.builder(currentState).metaData( - MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build()) - .build(); - } - }); + private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionListener listener) { + // Check datafeed is stopped + PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (MlTasks.getDatafeedTask(request.getDatafeedId(), tasks) != null) { + listener.onFailure(ExceptionsHelper.conflictStatusException( + Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, request.getDatafeedId(), DatafeedState.STARTED))); + return; + } + + datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap( + deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)), + listener::onFailure + )); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java index 91c098e4b2ad3..5d91df7a22a18 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -15,27 +16,38 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; public class TransportGetDatafeedsAction extends TransportMasterNodeReadAction { + private final DatafeedConfigProvider datafeedConfigProvider; + @Inject public TransportGetDatafeedsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { + IndexNameExpressionResolver indexNameExpressionResolver, + Client client, NamedXContentRegistry xContentRegistry) { super(settings, GetDatafeedsAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetDatafeedsAction.Request::new); + + datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); } @Override @@ -50,18 +62,51 @@ protected GetDatafeedsAction.Response newResponse() { @Override protected void masterOperation(GetDatafeedsAction.Request request, ClusterState state, - ActionListener listener) throws Exception { + ActionListener listener) { logger.debug("Get datafeed '{}'", request.getDatafeedId()); - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds()); - List datafeedConfigs = new ArrayList<>(); + Map clusterStateConfigs = + expandClusterStateDatafeeds(request.getDatafeedId(), request.allowNoDatafeeds(), state); + + datafeedConfigProvider.expandDatafeedConfigs(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( + datafeedBuilders -> { + // Check for duplicate datafeeds + for (DatafeedConfig.Builder datafeed : datafeedBuilders) { + if (clusterStateConfigs.containsKey(datafeed.getId())) { + listener.onFailure(new IllegalStateException("Datafeed [" + datafeed.getId() + "] configuration " + + "exists in both clusterstate and index")); + return; + } + } + + // Merge cluster state and index configs + List datafeeds = new ArrayList<>(datafeedBuilders.size() + clusterStateConfigs.values().size()); + for (DatafeedConfig.Builder builder: datafeedBuilders) { + datafeeds.add(builder.build()); + } + + datafeeds.addAll(clusterStateConfigs.values()); + Collections.sort(datafeeds, Comparator.comparing(DatafeedConfig::getId)); + listener.onResponse(new GetDatafeedsAction.Response(new QueryPage<>(datafeeds, datafeeds.size(), + DatafeedConfig.RESULTS_FIELD))); + }, + listener::onFailure + )); + } + + Map expandClusterStateDatafeeds(String datafeedExpression, boolean allowNoDatafeeds, + ClusterState clusterState) { + + Map configById = new HashMap<>(); + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression, allowNoDatafeeds); + for (String expandedDatafeedId : expandedDatafeedIds) { - datafeedConfigs.add(mlMetadata.getDatafeed(expandedDatafeedId)); + configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId)); } - listener.onResponse(new GetDatafeedsAction.Response(new QueryPage<>(datafeedConfigs, datafeedConfigs.size(), - DatafeedConfig.RESULTS_FIELD))); + return configById; } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 60b8235ec84b7..dbde3d61d42b0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -5,21 +5,23 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.license.LicenseUtils; @@ -28,16 +30,18 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; -import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse; import org.elasticsearch.xpack.core.security.authz.RoleDescriptor; import org.elasticsearch.xpack.core.security.support.Exceptions; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import java.io.IOException; import java.util.Map; @@ -46,20 +50,26 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction headers, ActionListener listener) { - clusterService.submitStateUpdateTask( - "put-datafeed-" + request.getDatafeed().getId(), - new AckedClusterStateUpdateTask(request, listener) { + String datafeedId = request.getDatafeed().getId(); + String jobId = request.getDatafeed().getJobId(); + ElasticsearchException validationError = checkConfigsAreNotDefinedInClusterState(datafeedId, jobId); + if (validationError != null) { + listener.onFailure(validationError); + return; + } - @Override - protected PutDatafeedAction.Response newResponse(boolean acknowledged) { - if (acknowledged) { - logger.info("Created datafeed [{}]", request.getDatafeed().getId()); - } - return new PutDatafeedAction.Response(request.getDatafeed()); - } + CheckedConsumer validationOk = ok -> { + datafeedConfigProvider.putDatafeedConfig(request.getDatafeed(), headers, ActionListener.wrap( + indexResponse -> listener.onResponse(new PutDatafeedAction.Response(request.getDatafeed())), + listener::onFailure + )); + }; - @Override - public ClusterState execute(ClusterState currentState) { - return putDatafeed(request, headers, currentState); - } - }); + CheckedConsumer jobOk = ok -> + jobConfigProvider.validateDatafeedJob(request.getDatafeed(), ActionListener.wrap(validationOk, listener::onFailure)); + + checkJobDoesNotHaveADatafeed(jobId, ActionListener.wrap(jobOk, listener::onFailure)); + } + + /** + * Returns an exception if a datafeed with the same Id is defined in the + * cluster state or the job is in the cluster state and already has a datafeed + */ + @Nullable + private ElasticsearchException checkConfigsAreNotDefinedInClusterState(String datafeedId, String jobId) { + ClusterState clusterState = clusterService.state(); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + + if (mlMetadata.getDatafeed(datafeedId) != null) { + return ExceptionsHelper.datafeedAlreadyExists(datafeedId); + } + + if (mlMetadata.getDatafeedByJobId(jobId).isPresent()) { + return ExceptionsHelper.conflictStatusException("Cannot create datafeed [" + datafeedId + "] as a " + + "job [" + jobId + "] defined in the cluster state references a datafeed with the same Id"); + } + + return null; } - private ClusterState putDatafeed(PutDatafeedAction.Request request, Map headers, ClusterState clusterState) { - XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); - MlMetadata currentMetadata = MlMetadata.getMlMetadata(clusterState); - MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) - .putDatafeed(request.getDatafeed(), headers).build(); - return ClusterState.builder(clusterState).metaData( - MetaData.builder(clusterState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build()) - .build(); + private void checkJobDoesNotHaveADatafeed(String jobId, ActionListener listener) { + datafeedConfigProvider.findDatafeedForJobId(jobId, ActionListener.wrap( + datafeedIds -> { + if (datafeedIds.isEmpty()) { + listener.onResponse(Boolean.TRUE); + } else { + listener.onFailure(ExceptionsHelper.conflictStatusException("A datafeed [" + datafeedIds.iterator().next() + + "] already exists for job [" + jobId + "]")); + } + }, + listener::onFailure + )); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index 8cf917c4405ea..6b17721b20d68 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -8,34 +8,45 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import java.util.Map; public class TransportUpdateDatafeedAction extends TransportMasterNodeAction { + private final DatafeedConfigProvider datafeedConfigProvider; + private final JobConfigProvider jobConfigProvider; + @Inject public TransportUpdateDatafeedAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { + IndexNameExpressionResolver indexNameExpressionResolver, + Client client, NamedXContentRegistry xContentRegistry) { super(settings, UpdateDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, UpdateDatafeedAction.Request::new); + + datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); + jobConfigProvider = new JobConfigProvider(client, settings); } @Override @@ -50,34 +61,60 @@ protected PutDatafeedAction.Response newResponse() { @Override protected void masterOperation(UpdateDatafeedAction.Request request, ClusterState state, - ActionListener listener) { + ActionListener listener) throws Exception { final Map headers = threadPool.getThreadContext().getHeaders(); - clusterService.submitStateUpdateTask("update-datafeed-" + request.getUpdate().getId(), - new AckedClusterStateUpdateTask(request, listener) { - private volatile DatafeedConfig updatedDatafeed; + // Check datafeed is stopped + PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (MlTasks.getDatafeedTask(request.getUpdate().getId(), tasks) != null) { + listener.onFailure(ExceptionsHelper.conflictStatusException( + Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE, + request.getUpdate().getId(), DatafeedState.STARTED))); + return; + } - @Override - protected PutDatafeedAction.Response newResponse(boolean acknowledged) { - if (acknowledged) { - logger.info("Updated datafeed [{}]", request.getUpdate().getId()); - } - return new PutDatafeedAction.Response(updatedDatafeed); - } + String datafeedId = request.getUpdate().getId(); + + CheckedConsumer updateConsumer = ok -> { + datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers, + jobConfigProvider::validateDatafeedJob, + ActionListener.wrap( + updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)), + listener::onFailure + )); + }; + + + if (request.getUpdate().getJobId() != null) { + checkJobDoesNotHaveADifferentDatafeed(request.getUpdate().getJobId(), datafeedId, + ActionListener.wrap(updateConsumer, listener::onFailure)); + } else { + updateConsumer.accept(Boolean.TRUE); + } + } - @Override - public ClusterState execute(ClusterState currentState) { - DatafeedUpdate update = request.getUpdate(); - MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState); - PersistentTasksCustomMetaData persistentTasks = - currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) - .updateDatafeed(update, persistentTasks, headers).build(); - updatedDatafeed = newMetadata.getDatafeed(update.getId()); - return ClusterState.builder(currentState).metaData( - MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build()).build(); + /* + * This is a check against changing the datafeed's jobId and that job + * already having a datafeed. + * The job the updated datafeed refers to should have no datafeed or + * if it does have a datafeed it must be the one we are updating + */ + private void checkJobDoesNotHaveADifferentDatafeed(String jobId, String datafeedId, ActionListener listener) { + datafeedConfigProvider.findDatafeedForJobId(jobId, ActionListener.wrap( + datafeedIds -> { + if (datafeedIds.isEmpty()) { + // Ok the job does not have a datafeed + listener.onResponse(Boolean.TRUE); + } else if (datafeedIds.size() == 1 && datafeedIds.contains(datafeedId)) { + // Ok the job has the datafeed being updated + listener.onResponse(Boolean.TRUE); + } else { + listener.onFailure(ExceptionsHelper.conflictStatusException("A datafeed [" + datafeedIds.iterator().next() + + "] already exists for job [" + jobId + "]")); } - }); + }, + listener::onFailure + )); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index 9702f1096ecf4..f0402ed869224 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; @@ -41,8 +42,10 @@ import org.elasticsearch.index.query.WildcardQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -57,6 +60,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -86,17 +91,40 @@ public DatafeedConfigProvider(Client client, Settings settings, NamedXContentReg * @param config The datafeed configuration * @param listener Index response listener */ - public void putDatafeedConfig(DatafeedConfig config, ActionListener listener) { + public void putDatafeedConfig(DatafeedConfig config, Map headers, ActionListener listener) { + + if (headers.isEmpty() == false) { + // Filter any values in headers that aren't security fields + DatafeedConfig.Builder builder = new DatafeedConfig.Builder(config); + Map securityHeaders = headers.entrySet().stream() + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + builder.setHeaders(securityHeaders); + config = builder.build(); + } + + final String datafeedId = config.getId(); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder source = config.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), - ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(config.getId())) + ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId)) .setSource(source) .setOpType(DocWriteRequest.OpType.CREATE) .request(); - executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener); + executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap( + listener::onResponse, + e -> { + if (e instanceof VersionConflictEngineException) { + // the dafafeed already exists + listener.onFailure(ExceptionsHelper.datafeedAlreadyExists(datafeedId)); + } else { + listener.onFailure(e); + } + } + )); } catch (IOException e) { listener.onFailure(new ElasticsearchParseException("Failed to serialise datafeed config with id [" + config.getId() + "]", e)); @@ -131,6 +159,43 @@ public void onFailure(Exception e) { }); } + /** + * Find any datafeeds that are used by job {@code jobid} i.e. the + * datafeed that references job {@code jobid}. + * + * In theory there should never be more than one datafeed referencing a + * particular job. + * + * @param jobId The job to find + * @param listener Datafeed Id listener + */ + public void findDatafeedForJobId(String jobId, ActionListener> listener) { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedJobIdQuery(jobId)); + sourceBuilder.fetchSource(false); + sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName()); + + SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + Set datafeedIds = new HashSet<>(); + SearchHit[] hits = response.getHits().getHits(); + // There should be 0 or 1 datafeeds referencing the same job + assert hits.length <= 1; + + for (SearchHit hit : hits) { + datafeedIds.add(hit.field(DatafeedConfig.ID.getPreferredName()).getValue()); + } + + listener.onResponse(datafeedIds); + }, + listener::onFailure) + , client::search); + } + /** * Delete the datafeed config document * @@ -161,12 +226,19 @@ public void onFailure(Exception e) { * Get the datafeed config and apply the {@code update} * then index the modified config setting the version in the request. * + * The {@code validator} consumer can be used to perform extra validation + * but it must call the passed ActionListener. For example a no-op validator + * would be {@code (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE)} + * * @param datafeedId The Id of the datafeed to update * @param update The update * @param headers Datafeed headers applied with the update + * @param validator BiConsumer that accepts the updated config and can perform + * extra validations. {@code validator} must call the passed listener * @param updatedConfigListener Updated datafeed config listener */ public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map headers, + BiConsumer> validator, ActionListener updatedConfigListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId)); @@ -197,26 +269,19 @@ public void onResponse(GetResponse getResponse) { return; } - try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - XContentBuilder updatedSource = updatedConfig.toXContent(builder, ToXContent.EMPTY_PARAMS); - IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), - ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(updatedConfig.getId())) - .setSource(updatedSource) - .setVersion(version) - .request(); - - executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap( - indexResponse -> { - assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED; - updatedConfigListener.onResponse(updatedConfig); - }, - updatedConfigListener::onFailure - )); + ActionListener validatedListener = ActionListener.wrap( + ok -> { + indexUpdatedConfig(updatedConfig, version, ActionListener.wrap( + indexResponse -> { + assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED; + updatedConfigListener.onResponse(updatedConfig); + }, + updatedConfigListener::onFailure)); + }, + updatedConfigListener::onFailure + ); - } catch (IOException e) { - updatedConfigListener.onFailure( - new ElasticsearchParseException("Failed to serialise datafeed config with id [" + datafeedId + "]", e)); - } + validator.accept(updatedConfig, validatedListener); } @Override @@ -226,6 +291,23 @@ public void onFailure(Exception e) { }); } + private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, ActionListener listener) { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder updatedSource = updatedConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); + IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(updatedConfig.getId())) + .setSource(updatedSource) + .setVersion(version) + .request(); + + executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener); + + } catch (IOException e) { + listener.onFailure( + new ElasticsearchParseException("Failed to serialise datafeed config with id [" + updatedConfig.getId() + "]", e)); + } + } + /** * Expands an expression into the set of matching names. {@code expresssion} * may be a wildcard, a datafeed ID or a list of those. @@ -252,10 +334,10 @@ public void onFailure(Exception e) { */ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ActionListener> listener) { String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens)); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens)); sourceBuilder.sort(DatafeedConfig.ID.getPreferredName()); - String [] includes = new String[] {DatafeedConfig.ID.getPreferredName()}; - sourceBuilder.fetchSource(includes, null); + sourceBuilder.fetchSource(false); + sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName()); SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) @@ -269,7 +351,7 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Actio Set datafeedIds = new HashSet<>(); SearchHit[] hits = response.getHits().getHits(); for (SearchHit hit : hits) { - datafeedIds.add((String)hit.getSourceAsMap().get(DatafeedConfig.ID.getPreferredName())); + datafeedIds.add(hit.field(DatafeedConfig.ID.getPreferredName()).getValue()); } requiredMatches.filterMatchedIds(datafeedIds); @@ -301,7 +383,7 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Actio // NORELEASE datafeed configs should be paged or have a mechanism to return all jobs if there are many of them public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ActionListener> listener) { String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens)); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens)); sourceBuilder.sort(DatafeedConfig.ID.getPreferredName()); SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) @@ -342,15 +424,15 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, A } - private QueryBuilder buildQuery(String [] tokens) { - QueryBuilder jobQuery = new TermQueryBuilder(DatafeedConfig.CONFIG_TYPE.getPreferredName(), DatafeedConfig.TYPE); + private QueryBuilder buildDatafeedIdQuery(String [] tokens) { + QueryBuilder datafeedQuery = new TermQueryBuilder(DatafeedConfig.CONFIG_TYPE.getPreferredName(), DatafeedConfig.TYPE); if (Strings.isAllOrWildcard(tokens)) { // match all - return jobQuery; + return datafeedQuery; } BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); - boolQueryBuilder.filter(jobQuery); + boolQueryBuilder.filter(datafeedQuery); BoolQueryBuilder shouldQueries = new BoolQueryBuilder(); List terms = new ArrayList<>(); @@ -373,6 +455,13 @@ private QueryBuilder buildQuery(String [] tokens) { return boolQueryBuilder; } + private QueryBuilder buildDatafeedJobIdQuery(String jobId) { + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + boolQueryBuilder.filter(new TermQueryBuilder(DatafeedConfig.CONFIG_TYPE.getPreferredName(), DatafeedConfig.TYPE)); + boolQueryBuilder.filter(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); + return boolQueryBuilder; + } + private void parseLenientlyFromSource(BytesReference source, ActionListener datafeedConfigListener) { try (InputStream stream = source.streamInput(); XContentParser parser = XContentFactory.xContent(XContentType.JSON) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index ae13d0371a3a5..ef593417a19e3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -46,6 +46,8 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -545,6 +547,25 @@ public void findJobsWithCustomRules(ActionListener> listener) { , client::search); } + /** + * Get the job reference by the datafeed and validate the datafeed config against it + * @param config Datafeed config + * @param listener Validation listener + */ + public void validateDatafeedJob(DatafeedConfig config, ActionListener listener) { + getJob(config.getJobId(), ActionListener.wrap( + jobBuilder -> { + try { + DatafeedJobValidator.validate(config, jobBuilder.build()); + listener.onResponse(Boolean.TRUE); + } catch (Exception e) { + listener.onFailure(e); + } + }, + listener::onFailure + )); + } + private void parseJobLenientlyFromSource(BytesReference source, ActionListener jobListener) { try (InputStream stream = source.streamInput(); XContentParser parser = XContentFactory.xContent(XContentType.JSON) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java index 8eeeb2908cf88..3f70baf4771c3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java @@ -5,12 +5,13 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -29,15 +30,18 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; public class DatafeedConfigProviderIT extends MlSingleNodeTestCase { - private DatafeedConfigProvider datafeedConfigProvider; @Before @@ -53,8 +57,8 @@ public void testCrud() throws InterruptedException { AtomicReference exceptionHolder = new AtomicReference<>(); // Create datafeed config - DatafeedConfig config = createDatafeedConfig(datafeedId, "j1"); - blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config, actionListener), + DatafeedConfig.Builder config = createDatafeedConfig(datafeedId, "j1"); + blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config.build(), createSecurityHeader(), actionListener), indexResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertEquals(RestStatus.CREATED, indexResponseHolder.get().status()); @@ -64,7 +68,11 @@ public void testCrud() throws InterruptedException { blockingCall(actionListener -> datafeedConfigProvider.getDatafeedConfig(datafeedId, actionListener), configBuilderHolder, exceptionHolder); assertNull(exceptionHolder.get()); - assertEquals(config, configBuilderHolder.get().build()); + + // Headers are set by the putDatafeedConfig method so they + // must be added to the original config before equality testing + config.setHeaders(createSecurityHeader()); + assertEquals(config.build(), configBuilderHolder.get().build()); // Update DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedId); @@ -77,12 +85,21 @@ public void testCrud() throws InterruptedException { AtomicReference configHolder = new AtomicReference<>(); blockingCall(actionListener -> - datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), updateHeaders, actionListener), + datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), updateHeaders, + (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener), configHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertThat(configHolder.get().getIndices(), equalTo(updateIndices)); assertThat(configHolder.get().getHeaders().get(securityHeader), equalTo("CHANGED")); + // Read the updated config + configBuilderHolder.set(null); + blockingCall(actionListener -> datafeedConfigProvider.getDatafeedConfig(datafeedId, actionListener), + configBuilderHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertThat(configBuilderHolder.get().build().getIndices(), equalTo(updateIndices)); + assertThat(configBuilderHolder.get().build().getHeaders().get(securityHeader), equalTo("CHANGED")); + // Delete AtomicReference deleteResponseHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.deleteDatafeedConfig(datafeedId, actionListener), @@ -98,18 +115,19 @@ public void testMultipleCreateAndDeletes() throws InterruptedException { AtomicReference exceptionHolder = new AtomicReference<>(); // Create datafeed config - DatafeedConfig config = createDatafeedConfig(datafeedId, "j1"); - blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config, actionListener), + DatafeedConfig.Builder config = createDatafeedConfig(datafeedId, "j1"); + blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config.build(), Collections.emptyMap(), actionListener), indexResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertEquals(RestStatus.CREATED, indexResponseHolder.get().status()); // cannot create another with the same id indexResponseHolder.set(null); - blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config, actionListener), + blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config.build(), Collections.emptyMap(), actionListener), indexResponseHolder, exceptionHolder); assertNull(indexResponseHolder.get()); - assertThat(exceptionHolder.get(), instanceOf(VersionConflictEngineException.class)); + assertThat(exceptionHolder.get(), instanceOf(ResourceAlreadyExistsException.class)); + assertEquals("A datafeed with Id [df2] already exists", exceptionHolder.get().getMessage()); // delete exceptionHolder.set(null); @@ -127,11 +145,11 @@ public void testMultipleCreateAndDeletes() throws InterruptedException { assertThat(exceptionHolder.get(), instanceOf(ResourceNotFoundException.class)); } - public void testUpdateWithAValidationError() throws Exception { + public void testUpdateWhenApplyingTheUpdateThrows() throws Exception { final String datafeedId = "df-bad-update"; - DatafeedConfig config = createDatafeedConfig(datafeedId, "j2"); - putDatafeedConfig(config); + DatafeedConfig.Builder config = createDatafeedConfig(datafeedId, "j2"); + putDatafeedConfig(config, Collections.emptyMap()); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedId); update.setId("wrong-datafeed-id"); @@ -139,7 +157,8 @@ public void testUpdateWithAValidationError() throws Exception { AtomicReference exceptionHolder = new AtomicReference<>(); AtomicReference configHolder = new AtomicReference<>(); blockingCall(actionListener -> - datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(), actionListener), + datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(), + (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener), configHolder, exceptionHolder); assertNull(configHolder.get()); assertNotNull(exceptionHolder.get()); @@ -147,6 +166,33 @@ public void testUpdateWithAValidationError() throws Exception { assertThat(exceptionHolder.get().getMessage(), containsString("Cannot apply update to datafeedConfig with different id")); } + public void testUpdateWithValidatorFunctionThatErrors() throws Exception { + final String datafeedId = "df-validated-update"; + + DatafeedConfig.Builder config = createDatafeedConfig(datafeedId, "hob-job"); + putDatafeedConfig(config, Collections.emptyMap()); + + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedId); + List updateIndices = Collections.singletonList("a-different-index"); + update.setIndices(updateIndices); + + BiConsumer> validateErrorFunction = (updatedConfig, listener) -> { + new Thread(() -> listener.onFailure(new IllegalArgumentException("this is a bad update")), getTestName()).start(); + }; + + AtomicReference configHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + blockingCall(actionListener -> + datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(), + validateErrorFunction, actionListener), + configHolder, exceptionHolder); + + assertNull(configHolder.get()); + assertThat(exceptionHolder.get(), IsInstanceOf.instanceOf(IllegalArgumentException.class)); + assertThat(exceptionHolder.get().getMessage(), containsString("this is a bad update")); + + } + public void testAllowNoDatafeeds() throws InterruptedException { AtomicReference> datafeedIdsHolder = new AtomicReference<>(); AtomicReference exceptionHolder = new AtomicReference<>(); @@ -182,11 +228,11 @@ public void testAllowNoDatafeeds() throws InterruptedException { } public void testExpandDatafeeds() throws Exception { - DatafeedConfig foo1 = putDatafeedConfig(createDatafeedConfig("foo-1", "j1")); - DatafeedConfig foo2 = putDatafeedConfig(createDatafeedConfig("foo-2", "j2")); - DatafeedConfig bar1 = putDatafeedConfig(createDatafeedConfig("bar-1", "j3")); - DatafeedConfig bar2 = putDatafeedConfig(createDatafeedConfig("bar-2", "j4")); - putDatafeedConfig(createDatafeedConfig("not-used", "j5")); + DatafeedConfig foo1 = putDatafeedConfig(createDatafeedConfig("foo-1", "j1"), Collections.emptyMap()); + DatafeedConfig foo2 = putDatafeedConfig(createDatafeedConfig("foo-2", "j2"), Collections.emptyMap()); + DatafeedConfig bar1 = putDatafeedConfig(createDatafeedConfig("bar-1", "j3"), Collections.emptyMap()); + DatafeedConfig bar2 = putDatafeedConfig(createDatafeedConfig("bar-2", "j4"), Collections.emptyMap()); + putDatafeedConfig(createDatafeedConfig("not-used", "j5"), Collections.emptyMap()); client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); @@ -234,20 +280,65 @@ public void testExpandDatafeeds() throws Exception { assertThat(expandedDatafeeds, containsInAnyOrder(bar1, foo1, foo2)); } - private DatafeedConfig createDatafeedConfig(String id, String jobId) { + public void testFindDatafeedForJobId() throws Exception { + putDatafeedConfig(createDatafeedConfig("foo-1", "j1"), Collections.emptyMap()); + putDatafeedConfig(createDatafeedConfig("foo-2", "j2"), Collections.emptyMap()); + putDatafeedConfig(createDatafeedConfig("bar-1", "j3"), Collections.emptyMap()); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + AtomicReference> datafeedIdsHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + blockingCall(actionListener -> datafeedConfigProvider.findDatafeedForJobId("new-job", actionListener), + datafeedIdsHolder, exceptionHolder); + assertThat(datafeedIdsHolder.get(), empty()); + + blockingCall(actionListener -> datafeedConfigProvider.findDatafeedForJobId("j2", actionListener), + datafeedIdsHolder, exceptionHolder); + assertThat(datafeedIdsHolder.get(), contains("foo-2")); + + blockingCall(actionListener -> datafeedConfigProvider.findDatafeedForJobId("j3", actionListener), + datafeedIdsHolder, exceptionHolder); + assertThat(datafeedIdsHolder.get(), contains("bar-1")); + } + + public void testHeadersAreOverwritten() throws Exception { + String dfId = "df-with-headers"; + DatafeedConfig.Builder configWithUnrelatedHeaders = createDatafeedConfig(dfId, "j1"); + Map headers = new HashMap<>(); + headers.put("UNRELATED-FIELD", "WILL-BE-FILTERED"); + configWithUnrelatedHeaders.setHeaders(headers); + + putDatafeedConfig(configWithUnrelatedHeaders, createSecurityHeader()); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference configBuilderHolder = new AtomicReference<>(); + blockingCall(actionListener -> datafeedConfigProvider.getDatafeedConfig(dfId, actionListener), + configBuilderHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertThat(configBuilderHolder.get().build().getHeaders().entrySet(), hasSize(1)); + assertEquals(configBuilderHolder.get().build().getHeaders(), createSecurityHeader()); + } + + private DatafeedConfig.Builder createDatafeedConfig(String id, String jobId) { DatafeedConfig.Builder builder = new DatafeedConfig.Builder(id, jobId); builder.setIndices(Collections.singletonList("beats*")); + return builder; + } + private Map createSecurityHeader() { Map headers = new HashMap<>(); // Only security headers are updated, grab the first one String securityHeader = ClientHelper.SECURITY_HEADER_FILTERS.iterator().next(); headers.put(securityHeader, "SECURITY_"); - builder.setHeaders(headers); - return builder.build(); + return headers; } - private DatafeedConfig putDatafeedConfig(DatafeedConfig config) throws Exception { - this.blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config, actionListener)); + private DatafeedConfig putDatafeedConfig(DatafeedConfig.Builder builder, Map headers) throws Exception { + builder.setHeaders(headers); + DatafeedConfig config = builder.build(); + this.blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config, headers, actionListener)); return config; } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index 712206e37556a..3456290745a05 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -13,6 +13,11 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; @@ -22,6 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Operator; import org.elasticsearch.xpack.core.ml.job.config.RuleCondition; import org.elasticsearch.xpack.core.ml.job.config.RuleScope; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -419,6 +425,38 @@ public void testFindJobsWithCustomRules() throws Exception { assertThat(foundJobIds, containsInAnyOrder(jobWithRules1.getId(), jobWithRules2.getId())); } + public void testValidateDatafeedJob() throws Exception { + String jobId = "validate-df-job"; + putJob(createJob(jobId, Collections.emptyList())); + + AtomicReference responseHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("df1", jobId); + builder.setIndices(Collections.singletonList("data-index")); + DatafeedConfig config = builder.build(); + + blockingCall(listener -> jobConfigProvider.validateDatafeedJob(config, listener), responseHolder, exceptionHolder); + assertTrue(responseHolder.get()); + assertNull(exceptionHolder.get()); + + builder = new DatafeedConfig.Builder("df1", jobId); + builder.setIndices(Collections.singletonList("data-index")); + + // This config is not valid because it uses aggs but the job's + // summary count field is not set + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + HistogramAggregationBuilder histogram = + AggregationBuilders.histogram("time").interval(1800.0).field("time").subAggregation(maxTime); + builder.setAggregations(new AggregatorFactories.Builder().addAggregator(histogram)); + DatafeedConfig badConfig = builder.build(); + + blockingCall(listener -> jobConfigProvider.validateDatafeedJob(badConfig, listener), responseHolder, exceptionHolder); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ElasticsearchStatusException.class)); + assertEquals(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, exceptionHolder.get().getMessage()); + } + private static Job.Builder createJob(String jobId, List groups) { Detector.Builder d1 = new Detector.Builder("info_content", "domain"); d1.setOverFieldName("client"); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 0a87060a18dfc..73caeebf19b2f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; @@ -57,6 +58,7 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -441,20 +443,10 @@ public void testNotifyFilterChangedGivenOnlyRemovedItems() throws IOException { } public void testUpdateProcessOnCalendarChanged() throws IOException { - List docsAsBytes = new ArrayList<>(); - Job.Builder job1 = buildJobBuilder("job-1"); - docsAsBytes.add(toBytesReference(job1.build())); - Job.Builder job2 = buildJobBuilder("job-2"); -// docsAsBytes.add(toBytesReference(job2.build())); - Job.Builder job3 = buildJobBuilder("job-3"); - docsAsBytes.add(toBytesReference(job3.build())); - Job.Builder job4 = buildJobBuilder("job-4"); - docsAsBytes.add(toBytesReference(job4.build())); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask(job1.getId(), "node_id", JobState.OPENED, tasksBuilder); - addJobTask(job2.getId(), "node_id", JobState.OPENED, tasksBuilder); - addJobTask(job3.getId(), "node_id", JobState.OPENED, tasksBuilder); + addJobTask("job-1", "node_id", JobState.OPENED, tasksBuilder); + addJobTask("job-2", "node_id", JobState.OPENED, tasksBuilder); + addJobTask("job-3", "node_id", JobState.OPENED, tasksBuilder); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder() @@ -463,7 +455,10 @@ public void testUpdateProcessOnCalendarChanged() throws IOException { when(clusterService.state()).thenReturn(clusterState); MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); - mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); + // For the JobConfigProvider expand groups search. + // The search will not return any results + mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), Collections.emptyList()); + JobManager jobManager = createJobManager(mockClientBuilder.build()); jobManager.updateProcessOnCalendarChanged(Arrays.asList("job-1", "job-3", "job-4"), @@ -477,28 +472,17 @@ public void testUpdateProcessOnCalendarChanged() throws IOException { List capturedUpdateParams = updateParamsCaptor.getAllValues(); assertThat(capturedUpdateParams.size(), equalTo(2)); - assertThat(capturedUpdateParams.get(0).getJobId(), equalTo(job1.getId())); + assertThat(capturedUpdateParams.get(0).getJobId(), equalTo("job-1")); assertThat(capturedUpdateParams.get(0).isUpdateScheduledEvents(), is(true)); - assertThat(capturedUpdateParams.get(1).getJobId(), equalTo(job3.getId())); + assertThat(capturedUpdateParams.get(1).getJobId(), equalTo("job-3")); assertThat(capturedUpdateParams.get(1).isUpdateScheduledEvents(), is(true)); } public void testUpdateProcessOnCalendarChanged_GivenGroups() throws IOException { - Job.Builder job1 = buildJobBuilder("job-1"); - job1.setGroups(Collections.singletonList("group-1")); - Job.Builder job2 = buildJobBuilder("job-2"); - job2.setGroups(Collections.singletonList("group-1")); - Job.Builder job3 = buildJobBuilder("job-3"); - - List docsAsBytes = new ArrayList<>(); - docsAsBytes.add(toBytesReference(job1.build())); - docsAsBytes.add(toBytesReference(job2.build())); -// docsAsBytes.add(toBytesReference(job3.build())); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask(job1.getId(), "node_id", JobState.OPENED, tasksBuilder); - addJobTask(job2.getId(), "node_id", JobState.OPENED, tasksBuilder); - addJobTask(job3.getId(), "node_id", JobState.OPENED, tasksBuilder); + addJobTask("job-1", "node_id", JobState.OPENED, tasksBuilder); + addJobTask("job-2", "node_id", JobState.OPENED, tasksBuilder); + addJobTask("job-3", "node_id", JobState.OPENED, tasksBuilder); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder() @@ -507,7 +491,16 @@ public void testUpdateProcessOnCalendarChanged_GivenGroups() throws IOException when(clusterService.state()).thenReturn(clusterState); MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); - mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); + // For the JobConfigProvider expand groups search. + // group-1 will expand to job-1 and job-2 + List> fieldHits = new ArrayList<>(); + fieldHits.add(Collections.singletonMap(Job.ID.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.singletonList("job-1")))); + fieldHits.add(Collections.singletonMap(Job.ID.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.singletonList("job-2")))); + + + mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), fieldHits); JobManager jobManager = createJobManager(mockClientBuilder.build()); jobManager.updateProcessOnCalendarChanged(Collections.singletonList("group-1"), @@ -521,9 +514,9 @@ public void testUpdateProcessOnCalendarChanged_GivenGroups() throws IOException List capturedUpdateParams = updateParamsCaptor.getAllValues(); assertThat(capturedUpdateParams.size(), equalTo(2)); - assertThat(capturedUpdateParams.get(0).getJobId(), equalTo(job1.getId())); + assertThat(capturedUpdateParams.get(0).getJobId(), equalTo("job-1")); assertThat(capturedUpdateParams.get(0).isUpdateScheduledEvents(), is(true)); - assertThat(capturedUpdateParams.get(1).getJobId(), equalTo(job2.getId())); + assertThat(capturedUpdateParams.get(1).getJobId(), equalTo("job-2")); assertThat(capturedUpdateParams.get(1).isUpdateScheduledEvents(), is(true)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java index a5f3d5ff5179c..726b815728f52 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java @@ -41,6 +41,7 @@ import org.elasticsearch.client.ClusterAdminClient; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -57,6 +58,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertArrayEquals; @@ -308,6 +310,43 @@ public Void answer(InvocationOnMock invocationOnMock) { return this; } + /* + * Mock a search that returns search hits with fields. + * The number of hits is the size of fields + */ + @SuppressWarnings("unchecked") + public MockClientBuilder prepareSearchFields(String indexName, List> fields) { + SearchRequestBuilder builder = mock(SearchRequestBuilder.class); + when(builder.setIndicesOptions(any())).thenReturn(builder); + when(builder.setQuery(any())).thenReturn(builder); + when(builder.setSource(any())).thenReturn(builder); + SearchRequest request = new SearchRequest(indexName); + when(builder.request()).thenReturn(request); + + when(client.prepareSearch(eq(indexName))).thenReturn(builder); + + SearchHit hits [] = new SearchHit[fields.size()]; + for (int i=0; i() { + @Override + public Void answer(InvocationOnMock invocationOnMock) { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(response); + return null; + } + }).when(client).search(eq(request), any()); + + return this; + } + public MockClientBuilder prepareSearchAnySize(String index, String type, SearchResponse response, ArgumentCaptor filter) { SearchRequestBuilder builder = mock(SearchRequestBuilder.class); when(builder.setTypes(eq(type))).thenReturn(builder);