From 1b83d72ddee5dcd923c2fc29b4418a2fc159ac03 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 11 Jul 2019 10:53:05 -0500 Subject: [PATCH 1/3] [ML][Data Frame] responding with appropriate status code when failing _stop --- .../integration/DataFrameRestTestCase.java | 3 +- .../DataFrameTaskFailedStateIT.java | 43 ++++++------------- ...TransportStopDataFrameTransformAction.java | 30 ++++++++++++- 3 files changed, 45 insertions(+), 31 deletions(-) diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index c8d7bf28842e3..b3bf161dc5a1c 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -161,7 +161,8 @@ protected void createContinuousPivotReviewsTransform(String transformId, String String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"}," - + " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}}," + //Set frequency high for testing + + " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\", \"frequency\": \"1s\"}}," + " \"pivot\": {" + " \"group_by\": {" + " \"reviewer\": {" diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java index 96aeeda8755f4..14919f7100406 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java @@ -11,9 +11,8 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; -import java.io.IOException; -import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; @@ -30,29 +29,34 @@ public void testFailureStateInteraction() throws Exception { createReviewsIndex(); String transformId = "failure_pivot_1"; String dataFrameIndex = "failure_pivot_reviews"; - createPivotReviewsTransform(transformId, dataFrameIndex, null); - deleteIndex(REVIEWS_INDEX_NAME); // trigger start failure due to index missing + createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null); startDataframeTransform(transformId, false); + // wait for our initial indexing to complete + waitForDataFrameCheckpoint(transformId); + + // Deleting the only concrete index should make the checkpoint gathering fail + deleteIndex(REVIEWS_INDEX_NAME); // trigger start failure due to index missing + awaitState(transformId, DataFrameTransformTaskState.FAILED); Map fullState = getDataFrameState(transformId); // Verify we have failed for the expected reason assertThat(XContentMapValues.extractValue("state.reason", fullState), equalTo("task encountered irrecoverable failure: no such index [reviews]")); - assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started")); // Verify that we cannot stop or start the transform when the task is in a failed state ResponseException ex = expectThrows(ResponseException.class, () -> stopDataFrameTransform(transformId, false)); assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), - equalTo("Unable to stop data frame transform [failure_pivot_1] as it is in a failed state with reason: [" + - "task encountered irrecoverable failure: no such index [reviews]]. Use force stop to stop the data frame transform.")); + equalTo("Unable to stop data frame transform [failure_pivot_1] as it is in a failed state with reason [" + + "task encountered irrecoverable failure: no such index [reviews]. " + + "Use force stop to stop the data frame transform.")); ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false)); assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), equalTo("Unable to start data frame transform [failure_pivot_1] as it is in a failed state with failure: [" + - "task encountered irrecoverable failure: no such index [reviews]]. " + + "task encountered irrecoverable failure: no such index [reviews]. " + "Use force start to restart data frame transform once error is resolved.")); // Correct the failure by creating the reviews index again @@ -60,23 +64,12 @@ public void testFailureStateInteraction() throws Exception { // Force start the data frame to indicate failure correction startDataframeTransform(transformId, true); // Wait for data to be indexed appropriately and refresh for search - waitForDataFrameCheckpoint(transformId); - refreshIndex(dataFrameIndex); + awaitState(transformId, DataFrameTransformTaskState.STARTED); // Verify that we have started and that our reason is cleared fullState = getDataFrameState(transformId); assertThat(XContentMapValues.extractValue("state.reason", fullState), is(nullValue())); assertThat(XContentMapValues.extractValue("state.task_state", fullState), equalTo("started")); - assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started")); - assertThat(XContentMapValues.extractValue("stats.search_failures", fullState), equalTo(1)); - - // get and check some users to verify we restarted - assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417); - assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72); - assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846); - assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769); - assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918); - stopDataFrameTransform(transformId, true); deleteDataFrameTransform(transformId); @@ -86,14 +79,6 @@ private void awaitState(String transformId, DataFrameTransformTaskState state) t assertBusy(() -> { String currentState = getDataFrameTaskState(transformId); assertThat(state.value(), equalTo(currentState)); - }); - } - - private void assertOnePivotValue(String query, double expected) throws IOException { - Map searchResult = getAsMap(query); - - assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); - double actual = (Double) ((List) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0); - assertEquals(expected, actual, 0.000001); + }, 60, TimeUnit.SECONDS); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 35a9d19658345..003a9d14794df 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; @@ -28,11 +29,13 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -88,8 +91,31 @@ protected void doExecute(Task task, StopDataFrameTransformAction.Request request new PageParams(0, 10_000), request.isAllowNoMatch(), ActionListener.wrap(hitsAndIds -> { + if (request.isForce() == false) { + PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + List failedTasks = new ArrayList<>(); + List failedReasons = new ArrayList<>(); + for (String transformId : hitsAndIds.v2()) { + PersistentTasksCustomMetaData.PersistentTask dfTask = tasks.getTask(transformId); + if (dfTask.getState() instanceof DataFrameTransformState + && ((DataFrameTransformState) dfTask.getState()).getTaskState() == DataFrameTransformTaskState.FAILED) { + failedTasks.add(transformId); + failedReasons.add(((DataFrameTransformState) dfTask.getState()).getReason()); + } + } + if (failedTasks.isEmpty() == false) { + String msg = failedTasks.size() == 1 ? + "Unable to stop data frame transform [" + request.getId() + + "] as it is in a failed state with reason [" + failedReasons.get(0) + + "]. Use force stop to stop the data frame transform." : + "Unable to stop data frame transforms. The following transforms are in a failed state " + + failedTasks + " with reasons " + failedReasons + ". Use force stop to stop the data frame transforms."; + listener.onFailure(new ElasticsearchStatusException(msg, RestStatus.CONFLICT)); + return; + } + } request.setExpandedIds(new HashSet<>(hitsAndIds.v2())); - request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), clusterService.state())); + request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), state)); super.doExecute(task, request, finalListener); }, listener::onFailure @@ -108,6 +134,8 @@ protected void taskOperation(StopDataFrameTransformAction.Request request, DataF } if (ids.contains(transformTask.getTransformId())) { + // This should not occur as we validate that none of the tasks are in a failed state earlier + // Keep this check in here for insurance. if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) { listener.onFailure( new ElasticsearchStatusException("Unable to stop data frame transform [" + request.getId() From b960fd39ad2513d935a01e7e70f0ea9ddbde761f Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 11 Jul 2019 11:51:33 -0500 Subject: [PATCH 2/3] adding null checks for persistent task data --- .../action/TransportStopDataFrameTransformAction.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 003a9d14794df..f9487e4df0d7a 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -91,13 +91,14 @@ protected void doExecute(Task task, StopDataFrameTransformAction.Request request new PageParams(0, 10_000), request.isAllowNoMatch(), ActionListener.wrap(hitsAndIds -> { - if (request.isForce() == false) { - PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + if (request.isForce() == false && tasks != null) { List failedTasks = new ArrayList<>(); List failedReasons = new ArrayList<>(); for (String transformId : hitsAndIds.v2()) { PersistentTasksCustomMetaData.PersistentTask dfTask = tasks.getTask(transformId); - if (dfTask.getState() instanceof DataFrameTransformState + if (dfTask != null + && dfTask.getState() instanceof DataFrameTransformState && ((DataFrameTransformState) dfTask.getState()).getTaskState() == DataFrameTransformTaskState.FAILED) { failedTasks.add(transformId); failedReasons.add(((DataFrameTransformState) dfTask.getState()).getReason()); From 143e43e66a74368e8c48bd1a25cd2e4a35d91393 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 11 Jul 2019 15:03:38 -0500 Subject: [PATCH 3/3] addressing PR comments --- .../core/dataframe/DataFrameMessages.java | 3 + .../DataFrameTaskFailedStateIT.java | 43 ++++++--- ...TransportStopDataFrameTransformAction.java | 61 ++++++------ ...portStopDataFrameTransformActionTests.java | 94 +++++++++++++++++++ 4 files changed, 160 insertions(+), 41 deletions(-) create mode 100644 x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index 5d6cf54c44d14..358ce863ad362 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -32,6 +32,9 @@ public class DataFrameMessages { public static final String DATA_FRAME_FAILED_TO_PERSIST_STATS = "Failed to persist data frame statistics for transform [{0}]"; public static final String DATA_FRAME_UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found"; + public static final String DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM = + "Unable to stop data frame transform [{0}] as it is in a failed state with reason [{1}]." + + " Use force stop to stop the data frame transform."; public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]"; public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION = "Failed to load data frame transform configuration for transform [{0}]"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java index 14919f7100406..96aeeda8755f4 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java @@ -11,8 +11,9 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; +import java.io.IOException; +import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; @@ -29,34 +30,29 @@ public void testFailureStateInteraction() throws Exception { createReviewsIndex(); String transformId = "failure_pivot_1"; String dataFrameIndex = "failure_pivot_reviews"; - createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null); - startDataframeTransform(transformId, false); - // wait for our initial indexing to complete - waitForDataFrameCheckpoint(transformId); - - // Deleting the only concrete index should make the checkpoint gathering fail + createPivotReviewsTransform(transformId, dataFrameIndex, null); deleteIndex(REVIEWS_INDEX_NAME); // trigger start failure due to index missing - + startDataframeTransform(transformId, false); awaitState(transformId, DataFrameTransformTaskState.FAILED); Map fullState = getDataFrameState(transformId); // Verify we have failed for the expected reason assertThat(XContentMapValues.extractValue("state.reason", fullState), equalTo("task encountered irrecoverable failure: no such index [reviews]")); + assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started")); // Verify that we cannot stop or start the transform when the task is in a failed state ResponseException ex = expectThrows(ResponseException.class, () -> stopDataFrameTransform(transformId, false)); assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), - equalTo("Unable to stop data frame transform [failure_pivot_1] as it is in a failed state with reason [" + - "task encountered irrecoverable failure: no such index [reviews]. " + - "Use force stop to stop the data frame transform.")); + equalTo("Unable to stop data frame transform [failure_pivot_1] as it is in a failed state with reason: [" + + "task encountered irrecoverable failure: no such index [reviews]]. Use force stop to stop the data frame transform.")); ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false)); assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), equalTo("Unable to start data frame transform [failure_pivot_1] as it is in a failed state with failure: [" + - "task encountered irrecoverable failure: no such index [reviews]. " + + "task encountered irrecoverable failure: no such index [reviews]]. " + "Use force start to restart data frame transform once error is resolved.")); // Correct the failure by creating the reviews index again @@ -64,12 +60,23 @@ public void testFailureStateInteraction() throws Exception { // Force start the data frame to indicate failure correction startDataframeTransform(transformId, true); // Wait for data to be indexed appropriately and refresh for search - awaitState(transformId, DataFrameTransformTaskState.STARTED); + waitForDataFrameCheckpoint(transformId); + refreshIndex(dataFrameIndex); // Verify that we have started and that our reason is cleared fullState = getDataFrameState(transformId); assertThat(XContentMapValues.extractValue("state.reason", fullState), is(nullValue())); assertThat(XContentMapValues.extractValue("state.task_state", fullState), equalTo("started")); + assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started")); + assertThat(XContentMapValues.extractValue("stats.search_failures", fullState), equalTo(1)); + + // get and check some users to verify we restarted + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918); + stopDataFrameTransform(transformId, true); deleteDataFrameTransform(transformId); @@ -79,6 +86,14 @@ private void awaitState(String transformId, DataFrameTransformTaskState state) t assertBusy(() -> { String currentState = getDataFrameTaskState(transformId); assertThat(state.value(), equalTo(currentState)); - }, 60, TimeUnit.SECONDS); + }); + } + + private void assertOnePivotValue(String query, double expected) throws IOException { + Map searchResult = getAsMap(query); + + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + double actual = (Double) ((List) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0); + assertEquals(expected, actual, 0.000001); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index f9487e4df0d7a..c5edac860580f 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.PageParams; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; @@ -41,6 +42,8 @@ import java.util.List; import java.util.Set; +import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM; + public class TransportStopDataFrameTransformAction extends TransportTasksAction { @@ -66,6 +69,32 @@ public TransportStopDataFrameTransformAction(TransportService transportService, this.client = client; } + static void validateTaskState(ClusterState state, List transformIds, boolean isForce) { + PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + if (isForce == false && tasks != null) { + List failedTasks = new ArrayList<>(); + List failedReasons = new ArrayList<>(); + for (String transformId : transformIds) { + PersistentTasksCustomMetaData.PersistentTask dfTask = tasks.getTask(transformId); + if (dfTask != null + && dfTask.getState() instanceof DataFrameTransformState + && ((DataFrameTransformState) dfTask.getState()).getTaskState() == DataFrameTransformTaskState.FAILED) { + failedTasks.add(transformId); + failedReasons.add(((DataFrameTransformState) dfTask.getState()).getReason()); + } + } + if (failedTasks.isEmpty() == false) { + String msg = failedTasks.size() == 1 ? + DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM, + failedTasks.get(0), + failedReasons.get(0)) : + "Unable to stop data frame transforms. The following transforms are in a failed state " + + failedTasks + " with reasons " + failedReasons + ". Use force stop to stop the data frame transforms."; + throw new ElasticsearchStatusException(msg, RestStatus.CONFLICT); + } + } + } + @Override protected void doExecute(Task task, StopDataFrameTransformAction.Request request, ActionListener listener) { @@ -91,30 +120,7 @@ protected void doExecute(Task task, StopDataFrameTransformAction.Request request new PageParams(0, 10_000), request.isAllowNoMatch(), ActionListener.wrap(hitsAndIds -> { - PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); - if (request.isForce() == false && tasks != null) { - List failedTasks = new ArrayList<>(); - List failedReasons = new ArrayList<>(); - for (String transformId : hitsAndIds.v2()) { - PersistentTasksCustomMetaData.PersistentTask dfTask = tasks.getTask(transformId); - if (dfTask != null - && dfTask.getState() instanceof DataFrameTransformState - && ((DataFrameTransformState) dfTask.getState()).getTaskState() == DataFrameTransformTaskState.FAILED) { - failedTasks.add(transformId); - failedReasons.add(((DataFrameTransformState) dfTask.getState()).getReason()); - } - } - if (failedTasks.isEmpty() == false) { - String msg = failedTasks.size() == 1 ? - "Unable to stop data frame transform [" + request.getId() - + "] as it is in a failed state with reason [" + failedReasons.get(0) - + "]. Use force stop to stop the data frame transform." : - "Unable to stop data frame transforms. The following transforms are in a failed state " + - failedTasks + " with reasons " + failedReasons + ". Use force stop to stop the data frame transforms."; - listener.onFailure(new ElasticsearchStatusException(msg, RestStatus.CONFLICT)); - return; - } - } + validateTaskState(state, hitsAndIds.v2(), request.isForce()); request.setExpandedIds(new HashSet<>(hitsAndIds.v2())); request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), state)); super.doExecute(task, request, finalListener); @@ -139,9 +145,10 @@ protected void taskOperation(StopDataFrameTransformAction.Request request, DataF // Keep this check in here for insurance. if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) { listener.onFailure( - new ElasticsearchStatusException("Unable to stop data frame transform [" + request.getId() - + "] as it is in a failed state with reason: [" + transformTask.getState().getReason() + - "]. Use force stop to stop the data frame transform.", + new ElasticsearchStatusException( + DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM, + request.getId(), + transformTask.getState().getReason()), RestStatus.CONFLICT)); return; } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java new file mode 100644 index 0000000000000..61fad63c83253 --- /dev/null +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.dataframe.action; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.xpack.core.indexing.IndexerState; + +import java.util.Arrays; +import java.util.Collections; + +import static org.elasticsearch.rest.RestStatus.CONFLICT; +import static org.hamcrest.Matchers.equalTo; + +public class TransportStopDataFrameTransformActionTests extends ESTestCase { + + private MetaData.Builder buildMetadata(PersistentTasksCustomMetaData ptasks) { + return MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, ptasks); + } + + public void testTaskStateValidationWithNoTasks() { + MetaData.Builder metaData = MetaData.builder(); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(metaData); + TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); + + PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder(); + csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); + TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); + } + + public void testTaskStateValidationWithDataFrameTasks() { + // Test with the task state being null + PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder() + .addTask("non-failed-task", + DataFrameTransform.NAME, + new DataFrameTransform("data-frame-task-1", Version.CURRENT, null), + new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); + + TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); + + // test again with a non failed task but this time it has internal state + pTasksBuilder.updateTaskState("non-failed-task", new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, + IndexerState.STOPPED, + null, + 0L, + null, + null)); + csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); + + TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); + + pTasksBuilder.addTask("failed-task", + DataFrameTransform.NAME, + new DataFrameTransform("data-frame-task-1", Version.CURRENT, null), + new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")) + .updateTaskState("failed-task", new DataFrameTransformState(DataFrameTransformTaskState.FAILED, + IndexerState.STOPPED, + null, + 0L, + "task has failed", + null)); + csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); + + TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Arrays.asList("non-failed-task", "failed-task"), true); + + TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); + + ClusterState.Builder csBuilderFinal = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); + ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, + () -> TransportStopDataFrameTransformAction.validateTaskState(csBuilderFinal.build(), + Collections.singletonList("failed-task"), + false)); + + assertThat(ex.status(), equalTo(CONFLICT)); + assertThat(ex.getMessage(), + equalTo(DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM, + "failed-task", + "task has failed"))); + } + +}