From bbec950015f87661b583f520ebba5d2b2211eefa Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 1 Feb 2019 14:00:40 -0600 Subject: [PATCH] ML: Fix error race condition on stop _all datafeeds and close _all jobs (#38113) (#38211) * ML: Ignore when task is not found for _all * Addressing PR comments * Update TransportStopDatafeedAction.java --- .../xpack/ml/action/TransportCloseJobAction.java | 14 ++++++++++++-- .../ml/action/TransportStopDatafeedAction.java | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 63b8f90a114d0..4f9fb89509cb7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.FailedNodeException; @@ -17,6 +18,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; @@ -291,7 +293,12 @@ protected void taskOperation(CloseJobAction.Request request, TransportOpenJobAct threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - listener.onFailure(e); + if (e instanceof ResourceNotFoundException && Strings.isAllOrWildcard(new String[]{request.getJobId()})) { + jobTask.closeJob("close job (api)"); + listener.onResponse(new CloseJobAction.Response(true)); + } else { + listener.onFailure(e); + } } @Override @@ -356,7 +363,10 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { @Override public void onFailure(Exception e) { final int slot = counter.incrementAndGet(); - failures.set(slot - 1, e); + if ((e instanceof ResourceNotFoundException && + Strings.isAllOrWildcard(new String[]{request.getJobId()})) == false) { + failures.set(slot - 1, e); + } if (slot == numberOfJobs) { sendResponseOrFailure(request.getJobId(), listener, failures); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 77910f21f67d1..5d9d900aaa8dd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; @@ -193,7 +194,10 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask persisten @Override public void onFailure(Exception e) { final int slot = counter.incrementAndGet(); - failures.set(slot - 1, e); + if ((e instanceof ResourceNotFoundException && + Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) { + failures.set(slot - 1, e); + } if (slot == startedDatafeeds.size()) { sendResponseOrFailure(request.getDatafeedId(), listener, failures); } @@ -221,7 +225,13 @@ protected void taskOperation(StopDatafeedAction.Request request, TransportStartD threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - listener.onFailure(e); + if ((e instanceof ResourceNotFoundException && + Strings.isAllOrWildcard(new String[]{request.getDatafeedId()}))) { + datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout()); + listener.onResponse(new StopDatafeedAction.Response(true)); + } else { + listener.onFailure(e); + } } @Override