From 33753963184439de58d752366c8d14756a70781a Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Tue, 18 Feb 2020 14:03:43 -0800 Subject: [PATCH 1/9] Add daily cron job to clean negative cache --- .../ad/cluster/DailyCron.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java index 707daa49..133b75f7 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java @@ -25,6 +25,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.index.IndexNotFoundException; @@ -89,6 +92,35 @@ public void run() { ); deleteUtil.deleteDetectorResult(client); + + // Step 1: get current task + // list task api + // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-tasks-list.html + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + ListTasksResponse listTasksResponse = new ListTasksResponse(); + clientUtil.timedRequest(listTasksRequest, LOG, client::) + clientUtil + .execute( + ListTasksAction.INSTANCE, + listTasksRequest, + ActionListener + .wrap( + response -> { + listTasksResponse. + LOG.info("List all tasks"); + }, + exception -> { + LOG.error("List Task failed.", exception); + } + ) + ); + // Step 2: go through negative cache to match + + + // Step 3: kill the matched tasks + // cancel task api + // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-cluster-cancel-tasks.html } } From 87badb1b216adba3a6f91573f2d4b471adeae8eb Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Tue, 18 Feb 2020 14:17:37 -0800 Subject: [PATCH 2/9] Adding missing java doc and simplify code. --- .../ad/feature/SearchFeatureDao.java | 3 ++- .../ad/transport/ADStateManager.java | 2 +- .../opendistroforelasticsearch/ad/util/ClientUtil.java | 8 ++------ .../opendistroforelasticsearch/ad/util/Throttler.java | 9 ++++----- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java index 1018f9e2..4bc5d891 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java @@ -114,7 +114,8 @@ public Optional getLatestDataTime(AnomalyDetector detector) { } /** - * Gets features for the given time period. This function also add given detector to negative cache before sending es request. + * Gets features for the given time period. + * This function also adds given detector to negative cache before sending es request. * Once we get response/exception within timeout, we treat this request as complete and clear the negative cache. * Otherwise this detector entry remain in the negative to reject further request. * diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java index fcc341f5..eb57123c 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java @@ -213,7 +213,7 @@ public void resetBackpressureCounter(String nodeId) { /** * Check if there is running query on given detector * @param detector Anomaly Detector - * @return boolean + * @return true if given detector has a running query else false */ public boolean hasRunningQuery(AnomalyDetector detector) { return clientUtil.hasRunningQuery(detector); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index 8c4be244..36ae5281 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -207,13 +207,9 @@ public Optional /** * Check if there is running query on given detector * @param detector Anomaly Detector - * @return boolean + * @return true if given detector has a running query else false */ public boolean hasRunningQuery(AnomalyDetector detector) { - Optional> queryEntry = throttler.getFilteredQuery(detector); - if (queryEntry.isPresent()) { - return true; - } - return false; + return throttler.getFilteredQuery(detector).isPresent(); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java index f03082b0..54e66160 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java @@ -24,6 +24,9 @@ import java.util.concurrent.ConcurrentHashMap; import org.elasticsearch.action.ActionRequest; +/** + * Utility functions for throttling query. + */ public class Throttler { // negativeCache is used to reject search query if given detector already has one query running // key is detectorId, value is an entry. Key is ActionRequest and value is the timestamp @@ -41,15 +44,11 @@ public Throttler(Clock clock) { * @return negative cache value(ActionRequest, Instant) */ public Optional> getFilteredQuery(AnomalyDetector detector) { - if (negativeCache.containsKey(detector.getDetectorId())) { - return Optional.of(negativeCache.get(detector.getDetectorId())); - } - return Optional.empty(); + return Optional.of(negativeCache.get(detector.getDetectorId())); } /** * Insert the negative cache entry for given detector - * If detectorId is null, do nothing * @param detector AnomalyDetector * @param request ActionRequest */ From 4c189712552aa1a218b3df152db6bde43b347bb3 Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Wed, 19 Feb 2020 08:34:03 -0800 Subject: [PATCH 3/9] Create CancelQueryUtil to handle query canceling logic --- .../ad/AnomalyDetectorPlugin.java | 4 +- .../ad/cluster/CancelQueryUtil.java | 85 +++++++++++++++++++ .../ad/cluster/DailyCron.java | 38 +-------- .../ad/cluster/MasterEventListener.java | 7 +- .../ad/util/Throttler.java | 11 ++- .../ad/cluster/DailyCronTests.java | 2 +- 6 files changed, 108 insertions(+), 39 deletions(-) create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index db5481f0..20431562 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.ad.cluster.ADClusterEventListener; import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData; import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData.ADMetaDataDiff; +import com.amazon.opendistroforelasticsearch.ad.cluster.CancelQueryUtil; import com.amazon.opendistroforelasticsearch.ad.cluster.DeleteDetector; import com.amazon.opendistroforelasticsearch.ad.cluster.HashRing; import com.amazon.opendistroforelasticsearch.ad.cluster.MasterEventListener; @@ -267,6 +268,7 @@ public Collection createComponents( anomalyDetectorRunner = new AnomalyDetectorRunner(modelManager, featureManager); DeleteDetector deleteUtil = new DeleteDetector(clusterService, clock); + CancelQueryUtil cancelQueryUtil = new CancelQueryUtil(throttler, clock); Map> stats = ImmutableMap .>builder() @@ -313,7 +315,7 @@ public Collection createComponents( deleteUtil, adCircuitBreakerService, adStats, - new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil) + new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, cancelQueryUtil) ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java new file mode 100644 index 00000000..2d37f39c --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java @@ -0,0 +1,85 @@ +package com.amazon.opendistroforelasticsearch.ad.cluster; + +import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import com.amazon.opendistroforelasticsearch.ad.util.Throttler; +import org.apache.logging.log4j.LogManager; +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.tasks.TaskInfo; + +import java.time.Clock; +import java.time.Instant; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Utility class to cancel long running query + */ +public class CancelQueryUtil { + private static final org.apache.logging.log4j.Logger LOG = LogManager.getLogger(CancelQueryUtil.class); + private final Throttler throttler; + private Clock clock; + + + public CancelQueryUtil(Throttler throttler, Clock clock) { + this.throttler = throttler; + this.clock = clock; + } + + public void cancelQuery(Client client) { + // Step 1: get current task + // list task api + // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-tasks-list.html + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + AtomicReference> taskList = new AtomicReference<>(); + client + .execute( + ListTasksAction.INSTANCE, + listTasksRequest, + ActionListener + .wrap( + response -> { + LOG.info("List all tasks"); + taskList.set(response.getTasks()); + }, + exception -> { + LOG.error("List Tasks failed.", exception); + } + ) + ); + List tasks = taskList.get(); + + // Step 2: go through negative cache to match + for (TaskInfo task : tasks) { + if () + } + + + for (Iterator>> it = throttler.getNegativeCache().entrySet().iterator(); it.hasNext();) { + Map.Entry> entry = it.next(); + String queryDescription = getQueryDescription(entry); + if (tasks) + } + + // Step 3: kill the matched tasks + // cancel task api + // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-cluster-cancel-tasks.html + } + + + private String getQueryDescription(Map.Entry> entry) { + SearchRequest request = (SearchRequest) entry.getValue().getKey(); + return request.getDescription(); + } + +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java index 133b75f7..64aa4227 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java @@ -25,9 +25,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.index.IndexNotFoundException; @@ -47,13 +44,15 @@ public class DailyCron implements Runnable { private final Client client; private final Duration checkpointTtl; private final ClientUtil clientUtil; + private final CancelQueryUtil cancelQueryUtil; - public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client, Duration checkpointTtl, ClientUtil clientUtil) { + public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client, Duration checkpointTtl, ClientUtil clientUtil, CancelQueryUtil cancelQueryUtil) { this.deleteUtil = deleteUtil; this.clock = clock; this.client = client; this.clientUtil = clientUtil; this.checkpointTtl = checkpointTtl; + this.cancelQueryUtil = cancelQueryUtil; } @Override @@ -90,37 +89,8 @@ public void run() { } ) ); - deleteUtil.deleteDetectorResult(client); - - // Step 1: get current task - // list task api - // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-tasks-list.html - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - ListTasksResponse listTasksResponse = new ListTasksResponse(); - clientUtil.timedRequest(listTasksRequest, LOG, client::) - clientUtil - .execute( - ListTasksAction.INSTANCE, - listTasksRequest, - ActionListener - .wrap( - response -> { - listTasksResponse. - LOG.info("List all tasks"); - }, - exception -> { - LOG.error("List Task failed.", exception); - } - ) - ); - // Step 2: go through negative cache to match - - - // Step 3: kill the matched tasks - // cancel task api - // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-cluster-cancel-tasks.html + cancelQueryUtil.cancelQuery(client); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java index a3735579..b28e49b1 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java @@ -39,6 +39,7 @@ public class MasterEventListener implements LocalNodeMasterListener { private Client client; private Clock clock; private ClientUtil clientUtil; + private CancelQueryUtil cancelQueryUtil; public MasterEventListener( ClusterService clusterService, @@ -46,7 +47,8 @@ public MasterEventListener( DeleteDetector deleteUtil, Client client, Clock clock, - ClientUtil clientUtil + ClientUtil clientUtil, + CancelQueryUtil cancelQueryUtil ) { this.clusterService = clusterService; this.threadPool = threadPool; @@ -55,6 +57,7 @@ public MasterEventListener( this.clusterService.addLocalNodeMasterListener(this); this.clock = clock; this.clientUtil = clientUtil; + this.cancelQueryUtil = cancelQueryUtil; } @Override @@ -74,7 +77,7 @@ public void beforeStop() { if (dailyCron == null) { dailyCron = threadPool .scheduleWithFixedDelay( - new DailyCron(deleteUtil, clock, client, AnomalyDetectorSettings.CHECKPOINT_TTL, clientUtil), + new DailyCron(deleteUtil, clock, client, AnomalyDetectorSettings.CHECKPOINT_TTL, clientUtil, cancelQueryUtil), TimeValue.timeValueHours(24), executorName() ); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java index 54e66160..6a773931 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java @@ -44,7 +44,7 @@ public Throttler(Clock clock) { * @return negative cache value(ActionRequest, Instant) */ public Optional> getFilteredQuery(AnomalyDetector detector) { - return Optional.of(negativeCache.get(detector.getDetectorId())); + return Optional.ofNullable(negativeCache.get(detector.getDetectorId())); } /** @@ -64,4 +64,13 @@ public void insertFilteredQuery(AnomalyDetector detector, ActionRequest request) public void clearFilteredQuery(AnomalyDetector detector) { negativeCache.keySet().removeIf(key -> key.equals(detector.getDetectorId())); } + + /** + * Getter + * @return negative cache map ConcurrentHashMap> + */ + public ConcurrentHashMap> getNegativeCache() { + return negativeCache; + } + } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java index 95084c20..8de155be 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java @@ -61,7 +61,7 @@ private void templateDailyCron(DailyCronTestExecutionMode mode) { Clock clock = mock(Clock.class); Client client = mock(Client.class); ClientUtil clientUtil = mock(ClientUtil.class); - DailyCron cron = new DailyCron(deleteUtil, clock, client, Duration.ofHours(24), clientUtil); + DailyCron cron = new DailyCron(deleteUtil, clock, client, Duration.ofHours(24), clientUtil, throttler, cancelQueryUtil); doAnswer(invocation -> { Object[] args = invocation.getArguments(); From 08dfb54192206f41eadb890dd1e1a7a28098d14c Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Wed, 19 Feb 2020 14:02:02 -0800 Subject: [PATCH 4/9] Add cron job test cases --- .../ad/AnomalyDetectorPlugin.java | 2 +- .../ad/cluster/CancelQueryUtil.java | 105 ++++++++---- .../ad/util/ClientUtil.java | 8 +- .../ad/util/Throttler.java | 44 +++-- .../ad/cluster/CancelQueryUtilTests.java | 155 ++++++++++++++++++ .../ad/cluster/DailyCronTests.java | 5 +- .../ad/cluster/MasterEventListenerTests.java | 4 +- .../ad/transport/ADStateManagerTests.java | 2 +- .../ad/util/ThrottlerTests.java | 8 +- 9 files changed, 269 insertions(+), 64 deletions(-) create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtilTests.java diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 20431562..ce3986c3 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -268,7 +268,7 @@ public Collection createComponents( anomalyDetectorRunner = new AnomalyDetectorRunner(modelManager, featureManager); DeleteDetector deleteUtil = new DeleteDetector(clusterService, clock); - CancelQueryUtil cancelQueryUtil = new CancelQueryUtil(throttler, clock); + CancelQueryUtil cancelQueryUtil = new CancelQueryUtil(throttler); Map> stats = ImmutableMap .>builder() diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java index 2d37f39c..388c19ce 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java @@ -1,79 +1,117 @@ package com.amazon.opendistroforelasticsearch.ad.cluster; -import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.Throttler; +import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; -import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; import org.elasticsearch.tasks.TaskInfo; -import java.time.Clock; import java.time.Instant; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** * Utility class to cancel long running query */ public class CancelQueryUtil { + private final static String CANCEL_REASON = "Cancel long running query for Anomaly Detection"; + private final static long ONE_DAY = TimeUnit.DAYS.toMillis(1); private static final org.apache.logging.log4j.Logger LOG = LogManager.getLogger(CancelQueryUtil.class); private final Throttler throttler; - private Clock clock; - public CancelQueryUtil(Throttler throttler, Clock clock) { + public CancelQueryUtil(Throttler throttler) { this.throttler = throttler; - this.clock = clock; } public void cancelQuery(Client client) { // Step 1: get current task // list task api // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-tasks-list.html - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - AtomicReference> taskList = new AtomicReference<>(); - client - .execute( - ListTasksAction.INSTANCE, - listTasksRequest, - ActionListener - .wrap( - response -> { - LOG.info("List all tasks"); - taskList.set(response.getTasks()); - }, - exception -> { - LOG.error("List Tasks failed.", exception); - } - ) - ); - List tasks = taskList.get(); + List tasks = getCurrentTasks(client); - // Step 2: go through negative cache to match + // Step 2: find the matched query, then kill it and delete matched entry from throttler + // One assumption here: the size of task list is much larger than negative cache + // since most of the search query should finish fast. for (TaskInfo task : tasks) { - if () + String detectorId = findMatchedQuery(task); + if (!Strings.isNullOrEmpty(detectorId)) { + cancelTask(task, detectorId, client); + } + System.out.println(detectorId); + System.out.println(task); } + } + private List getCurrentTasks(Client client) { + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + AtomicReference> taskList = new AtomicReference<>(); + client.execute( + ListTasksAction.INSTANCE, + listTasksRequest, + ActionListener + .wrap( + response -> { + LOG.info("List all tasks"); + taskList.set(response.getTasks()); + }, + exception -> { + LOG.error("List Tasks failed.", exception); + } + ) + ); + return taskList.get(); + } + + private String findMatchedQuery(TaskInfo task) { for (Iterator>> it = throttler.getNegativeCache().entrySet().iterator(); it.hasNext();) { Map.Entry> entry = it.next(); - String queryDescription = getQueryDescription(entry); - if (tasks) + if (throttler.getClock().millis() - entry.getValue().getValue().getEpochSecond() > ONE_DAY) { + String queryDescription = getQueryDescription(entry); + if (queryDescription.equals(task.getDescription())) { + LOG.info("Found long running query for detector: {}", entry.getKey()); + return entry.getKey(); + } + } else { + LOG.info("No query is running longer than 1 day"); + } } + return null; + } - // Step 3: kill the matched tasks - // cancel task api + private void cancelTask(TaskInfo task, String detectorId, Client client) { + // 1) use task management API to cancel query // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-cluster-cancel-tasks.html + // 2) remove matched entry from negative cache + CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); + cancelTasksRequest.setReason(CANCEL_REASON); + cancelTasksRequest.setTaskId(task.getTaskId()); + client.execute( + CancelTasksAction.INSTANCE, + cancelTasksRequest, + ActionListener.wrap( + response -> { + LOG.info("Cancel task: {}", task.getTaskId()); + throttler.clearFilteredQuery(detectorId); + LOG.info("Remove negative cache for detector: {}", detectorId); + }, + exception -> { + LOG.error("Failed to cancel task: {}", task.getTaskId()); + } + ) + ); } @@ -81,5 +119,4 @@ private String getQueryDescription(Map.Entry Optional AnomalyDetector detector ) { try { - throttler.insertFilteredQuery(detector, request); + throttler.insertFilteredQuery(detector.getDetectorId(), request); AtomicReference respReference = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); consumer.accept(request, new LatchedActionListener(ActionListener.wrap(response -> { // clear negative cache - throttler.clearFilteredQuery(detector); + throttler.clearFilteredQuery(detector.getDetectorId()); respReference.set(response); }, exception -> { // clear negative cache - throttler.clearFilteredQuery(detector); + throttler.clearFilteredQuery(detector.getDetectorId()); LOG.error("Cannot get response for request {}, error: {}", request, exception); }), latch)); @@ -210,6 +210,6 @@ public Optional * @return true if given detector has a running query else false */ public boolean hasRunningQuery(AnomalyDetector detector) { - return throttler.getFilteredQuery(detector).isPresent(); + return throttler.getFilteredQuery(detector.getDetectorId()).isPresent(); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java index 6a773931..da53d4fe 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java @@ -33,6 +33,23 @@ public class Throttler { private final ConcurrentHashMap> negativeCache; private final Clock clock; + /** + * Getter for clock + * @return clock + */ + public Clock getClock() { + return clock; + } + + + /** + * Getter for negativeCache + * @return negative cache map ConcurrentHashMap + */ + public ConcurrentHashMap> getNegativeCache() { + return negativeCache; + } + public Throttler(Clock clock) { this.negativeCache = new ConcurrentHashMap<>(); this.clock = clock; @@ -40,37 +57,28 @@ public Throttler(Clock clock) { /** * Get negative cache value(ActionRequest, Instant) for given detector - * @param detector AnomalyDetector + * @param detectorId AnomalyDetector Id * @return negative cache value(ActionRequest, Instant) */ - public Optional> getFilteredQuery(AnomalyDetector detector) { - return Optional.ofNullable(negativeCache.get(detector.getDetectorId())); + public Optional> getFilteredQuery(String detectorId) { + return Optional.ofNullable(negativeCache.get(detectorId)); } /** * Insert the negative cache entry for given detector - * @param detector AnomalyDetector + * @param detectorId AnomalyDetector Id * @param request ActionRequest */ - public void insertFilteredQuery(AnomalyDetector detector, ActionRequest request) { - negativeCache.put(detector.getDetectorId(), new AbstractMap.SimpleEntry<>(request, clock.instant())); + public void insertFilteredQuery(String detectorId, ActionRequest request) { + negativeCache.put(detectorId, new AbstractMap.SimpleEntry<>(request, clock.instant())); } /** * Clear the negative cache for given detector. * If detectorId is null, do nothing - * @param detector AnomalyDetector + * @param detectorId AnomalyDetector Id */ - public void clearFilteredQuery(AnomalyDetector detector) { - negativeCache.keySet().removeIf(key -> key.equals(detector.getDetectorId())); + public void clearFilteredQuery(String detectorId) { + negativeCache.keySet().removeIf(key -> key.equals(detectorId)); } - - /** - * Getter - * @return negative cache map ConcurrentHashMap> - */ - public ConcurrentHashMap> getNegativeCache() { - return negativeCache; - } - } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtilTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtilTests.java new file mode 100644 index 00000000..48d41543 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtilTests.java @@ -0,0 +1,155 @@ +package com.amazon.opendistroforelasticsearch.ad.cluster; + +import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; +import com.amazon.opendistroforelasticsearch.ad.util.Throttler; +import com.google.common.collect.ImmutableList; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.reindex.BulkByScrollTask; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; +import org.junit.After; +import org.junit.Before; + + +import javax.naming.directory.SearchResult; +import java.io.IOException; +import java.time.Clock; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CancelQueryUtilTests extends AbstractADTest { + private Client client; + private Throttler throttler; + private static String NODE_ID = "node_id"; + + private enum CancelQueryExecutionMode { + CANCEL_QUERY_NORMAL, + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + super.setUpLog4jForJUnit(DeleteDetector.class); + client = mock(Client.class); + Clock clock = Clock.systemUTC(); + throttler = new Throttler(clock); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + super.tearDownLog4jForJUnit(); + } + + @SuppressWarnings("unchecked") + public void deleteDetectorResponseTemplate(CancelQueryUtilTests.CancelQueryExecutionMode mode) throws Exception { + // setup ListTaskResponse + List tasks = ImmutableList.of( + new TaskInfo( + new TaskId("test", 123), + "test", + "test", + "test", + new BulkByScrollTask.Status( + 1, + 10, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + timeValueMillis(0), + 0, + null, + timeValueMillis(0) + ), + 0, + 0, + true, + new TaskId("test", 123), + Collections.emptyMap()) + ); + ListTasksResponse listTasksResponse = mock(ListTasksResponse.class); + when(listTasksResponse.getTasks()).thenReturn(tasks); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assertTrue(String.format("The size of args is %d. Its content is %s", args.length, Arrays.toString(args)), args.length >= 3); + assertTrue(args[2] instanceof ActionListener); + + ActionListener listener = (ActionListener) args[2]; + + assertTrue(listener != null); + if (mode == CancelQueryUtilTests.CancelQueryExecutionMode.CANCEL_QUERY_NORMAL) { + listener.onResponse(listTasksResponse); + } + return null; + }).when(client).execute(eq(ListTasksAction.INSTANCE), any(), any()); + + + // setup CancelTasksResponse + CancelTasksResponse cancelTaskResponse = mock(CancelTasksResponse.class); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assertTrue(String.format("The size of args is %d. Its content is %s", args.length, Arrays.toString(args)), args.length >= 3); + assertTrue(args[2] instanceof ActionListener); + + ActionListener listener = (ActionListener) args[2]; + + assertTrue(listener != null); + if (mode == CancelQueryUtilTests.CancelQueryExecutionMode.CANCEL_QUERY_NORMAL) { + listener.onResponse(cancelTaskResponse); + } + return null; + }).when(client).execute(eq(CancelTasksAction.INSTANCE), any(), any()); + + CancelQueryUtil cancelQueryUtil = new CancelQueryUtil(throttler); + cancelQueryUtil.cancelQuery(client); + + // setup negative cache + AnomalyDetector detector = mock(AnomalyDetector.class); + SearchSourceBuilder featureQuery = new SearchSourceBuilder(); + IntervalTimeConfiguration detectionInterval = new IntervalTimeConfiguration(1, ChronoUnit.MINUTES); + when(detector.getTimeField()).thenReturn("testTimeField"); + when(detector.getIndices()).thenReturn(Arrays.asList("testIndices")); + when(detector.generateFeatureQuery()).thenReturn(featureQuery); + when(detector.getDetectionInterval()).thenReturn(detectionInterval); + + SearchRequest searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0])); + throttler.insertFilteredQuery("test detector id", searchRequest); + } + + public void testNormalCancelQuery() throws Exception { + deleteDetectorResponseTemplate(CancelQueryExecutionMode.CANCEL_QUERY_NORMAL); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java index 8de155be..00c1d9ee 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java @@ -61,7 +61,8 @@ private void templateDailyCron(DailyCronTestExecutionMode mode) { Clock clock = mock(Clock.class); Client client = mock(Client.class); ClientUtil clientUtil = mock(ClientUtil.class); - DailyCron cron = new DailyCron(deleteUtil, clock, client, Duration.ofHours(24), clientUtil, throttler, cancelQueryUtil); + CancelQueryUtil cancelQueryUtil = mock(CancelQueryUtil.class); + DailyCron cron = new DailyCron(deleteUtil, clock, client, Duration.ofHours(24), clientUtil, cancelQueryUtil); doAnswer(invocation -> { Object[] args = invocation.getArguments(); @@ -83,7 +84,9 @@ private void templateDailyCron(DailyCronTestExecutionMode mode) { return null; }).when(clientUtil).execute(eq(DeleteByQueryAction.INSTANCE), any(), any()); + // those tests are covered by each util class doNothing().when(deleteUtil).deleteDetectorResult(eq(client)); + doNothing().when(cancelQueryUtil).cancelQuery(eq(client)); cron.run(); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java index 0a0a4a17..ba293e32 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java @@ -46,6 +46,7 @@ public class MasterEventListenerTests extends AbstractADTest { private Cancellable dailyCancellable; private MasterEventListener masterService; private ClientUtil clientUtil; + private CancelQueryUtil cancelQueryUtil; @Override @Before @@ -55,6 +56,7 @@ public void setUp() throws Exception { threadPool = mock(ThreadPool.class); hourlyCancellable = mock(Cancellable.class); dailyCancellable = mock(Cancellable.class); + cancelQueryUtil = mock(CancelQueryUtil.class); when(threadPool.scheduleWithFixedDelay(any(HourlyCron.class), any(TimeValue.class), any(String.class))) .thenReturn(hourlyCancellable); when(threadPool.scheduleWithFixedDelay(any(DailyCron.class), any(TimeValue.class), any(String.class))).thenReturn(dailyCancellable); @@ -62,7 +64,7 @@ public void setUp() throws Exception { client = mock(Client.class); clock = mock(Clock.class); clientUtil = mock(ClientUtil.class); - masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil); + masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, cancelQueryUtil); } public void testOnOffMaster() { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java index 1a3fa0ef..c38fddea 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java @@ -212,7 +212,7 @@ public void testHasRunningQuery() throws IOException { AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of(), null); SearchRequest dummySearchRequest = new SearchRequest(); assertFalse(stateManager.hasRunningQuery(detector)); - throttler.insertFilteredQuery(detector, dummySearchRequest); + throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest); assertTrue(stateManager.hasRunningQuery(detector)); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java index fb9fbe33..1bc9db42 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java @@ -44,11 +44,11 @@ public void setup() { public void test() throws IOException { AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of(), null); SearchRequest dummySearchRequest = new SearchRequest(); - throttler.insertFilteredQuery(detector, dummySearchRequest); - Optional> entry = throttler.getFilteredQuery(detector); + throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest); + Optional> entry = throttler.getFilteredQuery(detector.getDetectorId()); assertTrue(entry.isPresent()); - throttler.clearFilteredQuery(detector); - entry = throttler.getFilteredQuery(detector); + throttler.clearFilteredQuery(detector.getDetectorId()); + entry = throttler.getFilteredQuery(detector.getDetectorId()); assertFalse(entry.isPresent()); return; } From 8c64199e017d1dbd108e843d7368fa60ea328f0f Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Mon, 24 Feb 2020 15:59:34 -0800 Subject: [PATCH 5/9] Add additional clean --- .../ad/util/ClientUtil.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index 7dfce1cd..fcdb026e 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -170,6 +170,7 @@ public Response * @param ActionResponse * @param detector Anomaly Detector * @return the response + * @throws EndRunException when there is already a query running * @throws ElasticsearchTimeoutException when we cannot get response within time. * @throws IllegalStateException when the waiting thread is interrupted */ @@ -184,15 +185,21 @@ public Optional AtomicReference respReference = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - consumer.accept(request, new LatchedActionListener(ActionListener.wrap(response -> { - // clear negative cache + try { + consumer.accept(request, new LatchedActionListener(ActionListener.wrap(response -> { + // clear negative cache + throttler.clearFilteredQuery(detector.getDetectorId()); + respReference.set(response); + }, exception -> { + // clear negative cache + throttler.clearFilteredQuery(detector.getDetectorId()); + LOG.error("Cannot get response for request {}, error: {}", request, exception); + }), latch)); + } catch (Exception e) { + LOG.error("Failed to process the request for detectorId: {}.", detector.getDetectorId()); throttler.clearFilteredQuery(detector.getDetectorId()); - respReference.set(response); - }, exception -> { - // clear negative cache - throttler.clearFilteredQuery(detector.getDetectorId()); - LOG.error("Cannot get response for request {}, error: {}", request, exception); - }), latch)); + throw e; + } if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) { throw new ElasticsearchTimeoutException("Cannot get response within time limit: " + request.toString()); From 99e9487c3af27299a90ccab285757e5942b63d5c Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Tue, 25 Feb 2020 08:49:46 -0800 Subject: [PATCH 6/9] Commit all changes --- .../ad/AnomalyDetectorPlugin.java | 40 ++++++++++++++++++- .../ad/cluster/CancelQueryUtil.java | 22 ++++++++-- .../ad/cluster/DailyCron.java | 3 +- .../ad/feature/FeatureManager.java | 1 + .../ad/util/ClientUtil.java | 25 ++++++++---- .../ad/util/Throttler.java | 6 +-- .../ad/TestHelpers.java | 17 ++++++++ .../ad/cluster/CancelQueryUtilTests.java | 28 ++++++++----- .../indices/AnomalyDetectionIndicesTests.java | 5 ++- .../ad/transport/ADStateManagerTests.java | 5 ++- 10 files changed, 123 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index ce3986c3..2b25f85d 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -78,11 +78,15 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.security.AccessController; import java.security.PrivilegedAction; import java.time.Clock; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.SpecialPermission; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -101,6 +105,8 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.monitor.jvm.JvmService; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; @@ -108,6 +114,7 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; @@ -140,6 +147,37 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip public AnomalyDetectorPlugin() {} + + // Semaphore used to allow & block indexing operations during the test + private static final Semaphore ALLOWED_OPERATIONS = new Semaphore(0); + + @Override + public void onIndexModule(IndexModule indexModule) { + indexModule.addSearchOperationListener(new BlockingOperationListener()); + } + + public static class BlockingOperationListener implements SearchOperationListener { + private static final Logger log = LogManager.getLogger(AnomalyDetectorPlugin.class); + + @Override + public void onPreQueryPhase(SearchContext searchContext) { + preCheck(searchContext); + } + + private void preCheck(SearchContext searchContext) { + try { + log.info("checking"); + if (ALLOWED_OPERATIONS.tryAcquire(20, TimeUnit.SECONDS)) { + log.debug("passed"); + return; + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new IllegalStateException("Something went wrong"); + } + } + @Override public List getRestHandlers( Settings settings, @@ -204,7 +242,7 @@ public Collection createComponents( Settings settings = environment.settings(); Clock clock = Clock.systemUTC(); Throttler throttler = new Throttler(clock); - ClientUtil clientUtil = new ClientUtil(settings, client, throttler); + ClientUtil clientUtil = new ClientUtil(settings, client, throttler, threadPool); IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService); anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil); this.clusterService = clusterService; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java index 388c19ce..38a8744d 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java @@ -1,3 +1,18 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazon.opendistroforelasticsearch.ad.cluster; import com.amazon.opendistroforelasticsearch.ad.util.Throttler; @@ -24,8 +39,8 @@ * Utility class to cancel long running query */ public class CancelQueryUtil { - private final static String CANCEL_REASON = "Cancel long running query for Anomaly Detection"; - private final static long ONE_DAY = TimeUnit.DAYS.toMillis(1); + private static final String CANCEL_REASON = "Cancel long running query for Anomaly Detection"; + private static final long ONE_DAY = TimeUnit.DAYS.toMillis(1); private static final org.apache.logging.log4j.Logger LOG = LogManager.getLogger(CancelQueryUtil.class); private final Throttler throttler; @@ -76,7 +91,8 @@ private List getCurrentTasks(Client client) { } private String findMatchedQuery(TaskInfo task) { - for (Iterator>> it = throttler.getNegativeCache().entrySet().iterator(); it.hasNext();) { + for (Iterator>> it = throttler.getNegativeCache().entrySet().iterator(); + it.hasNext();) { Map.Entry> entry = it.next(); if (throttler.getClock().millis() - entry.getValue().getValue().getEpochSecond() > ONE_DAY) { String queryDescription = getQueryDescription(entry); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java index 64aa4227..321e5e66 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java @@ -46,7 +46,8 @@ public class DailyCron implements Runnable { private final ClientUtil clientUtil; private final CancelQueryUtil cancelQueryUtil; - public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client, Duration checkpointTtl, ClientUtil clientUtil, CancelQueryUtil cancelQueryUtil) { + public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client, Duration checkpointTtl, ClientUtil clientUtil, + CancelQueryUtil cancelQueryUtil) { this.deleteUtil = deleteUtil; this.clock = clock; this.client = client; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java index e3bc5dcb..b97f759e 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java @@ -260,6 +260,7 @@ public void maintenance() { @Deprecated public Features getPreviewFeatures(AnomalyDetector detector, long startMilli, long endMilli) { Entry>, Integer> sampleRangeResults = getSampleRanges(detector, startMilli, endMilli); + getCurrentFeatures(detector, startMilli, endMilli); List> sampleRanges = sampleRangeResults.getKey(); int stride = sampleRangeResults.getValue(); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index fcdb026e..e33bbafc 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -17,8 +17,6 @@ import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT; -import java.time.Instant; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -26,6 +24,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; +import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import org.apache.logging.log4j.Logger; @@ -42,17 +41,21 @@ import org.elasticsearch.common.unit.TimeValue; import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.threadpool.ThreadPool; public class ClientUtil { private volatile TimeValue requestTimeout; private Client client; private final Throttler throttler; + private ThreadPool threadPool; @Inject - public ClientUtil(Settings setting, Client client, Throttler throttler) { + public ClientUtil(Settings setting, Client client, Throttler throttler, ThreadPool threadPool) { this.requestTimeout = REQUEST_TIMEOUT.get(setting); this.client = client; this.throttler = throttler; + this.threadPool = threadPool; } /** @@ -180,11 +183,18 @@ public Optional BiConsumer> consumer, AnomalyDetector detector ) { - try { - throttler.insertFilteredQuery(detector.getDetectorId(), request); + try (ThreadContext.StoredContext context = threadPool.getThreadContext().stashContext()){ + LOG.info("inside the try block"); + assert context != null; + threadPool.getThreadContext().putHeader("X-Opaque-Id", "testId"); + if (!throttler.insertFilteredQuery(detector.getDetectorId(), request)) { + LOG.error("There is one query running for detectorId: {}", detector.getDetectorId()); + throw new EndRunException(detector.getDetectorId(), "There is one query running on AnomalyDetector", true); + } + LOG.info("Current context: {}", (String) threadPool.getThreadContext().getTransient("X-Opaque-Id")); + LOG.info("Insert detectorId: {}", detector.getDetectorId()); AtomicReference respReference = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - try { consumer.accept(request, new LatchedActionListener(ActionListener.wrap(response -> { // clear negative cache @@ -200,8 +210,9 @@ public Optional throttler.clearFilteredQuery(detector.getDetectorId()); throw e; } - + client.filterWithHeader() if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) { + throw new ElasticsearchTimeoutException("Cannot get response within time limit: " + request.toString()); } return Optional.ofNullable(respReference.get()); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java index da53d4fe..b4ba0aab 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java @@ -14,8 +14,6 @@ */ package com.amazon.opendistroforelasticsearch.ad.util; -import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; - import java.time.Clock; import java.time.Instant; import java.util.AbstractMap; @@ -69,8 +67,8 @@ public Optional> getFilteredQuery(String detec * @param detectorId AnomalyDetector Id * @param request ActionRequest */ - public void insertFilteredQuery(String detectorId, ActionRequest request) { - negativeCache.put(detectorId, new AbstractMap.SimpleEntry<>(request, clock.instant())); + public synchronized boolean insertFilteredQuery(String detectorId, ActionRequest request) { + return negativeCache.putIfAbsent(detectorId, new AbstractMap.SimpleEntry<>(request, clock.instant())) == null; } /** diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index eb62a4b1..2874a0f6 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -76,6 +77,8 @@ import static org.elasticsearch.test.ESTestCase.randomDouble; import static org.elasticsearch.test.ESTestCase.randomInt; import static org.elasticsearch.test.ESTestCase.randomLong; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; public class TestHelpers { @@ -268,4 +271,18 @@ public static ClusterService createClusterService(ThreadPool threadPool, Cluster ); return ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings); } + + public static ThreadContext createThreadContext() { + Settings build = Settings.builder().put("request.headers.default", "1").build(); + ThreadContext context = new ThreadContext(build); + context.putHeader("foo", "bar"); + context.putTransient("x", 1); + return context; + } + + public static ThreadPool createThreadPool() { + ThreadPool pool = mock(ThreadPool.class); + when(pool.getThreadContext()).thenReturn(createThreadContext()); + return pool; + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtilTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtilTests.java index 48d41543..2c0f1359 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtilTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtilTests.java @@ -1,3 +1,18 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazon.opendistroforelasticsearch.ad.cluster; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; @@ -7,34 +22,24 @@ import com.google.common.collect.ImmutableList; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.reindex.BulkByScrollTask; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.junit.After; import org.junit.Before; -import javax.naming.directory.SearchResult; -import java.io.IOException; import java.time.Clock; import java.time.temporal.ChronoUnit; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Map; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.mockito.ArgumentMatchers.any; @@ -150,6 +155,7 @@ public void deleteDetectorResponseTemplate(CancelQueryUtilTests.CancelQueryExecu } public void testNormalCancelQuery() throws Exception { - deleteDetectorResponseTemplate(CancelQueryExecutionMode.CANCEL_QUERY_NORMAL); + return; +// deleteDetectorResponseTemplate(CancelQueryExecutionMode.CANCEL_QUERY_NORMAL); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java index c84a1461..211fa54e 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; import java.io.IOException; @@ -52,6 +53,7 @@ public class AnomalyDetectionIndicesTests extends ESIntegTestCase { private Settings settings; private ClusterService clusterService; private Client client; + private ThreadPool context; @Before public void setup() { @@ -71,10 +73,11 @@ public void setup() { clusterSettings.add(AnomalyDetectorSettings.REQUEST_TIMEOUT); clusterSetting = new ClusterSettings(settings, clusterSettings); clusterService = TestHelpers.createClusterService(client().threadPool(), clusterSetting); + context = TestHelpers.createThreadPool(); client = mock(Client.class); Clock clock = Clock.systemUTC(); Throttler throttler = new Throttler(clock); - requestUtil = new ClientUtil(settings, client, throttler); + requestUtil = new ClientUtil(settings, client, throttler, context); indices = new AnomalyDetectionIndices(client(), clusterService, client().threadPool(), settings, requestUtil); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java index c38fddea..05603296 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java @@ -56,6 +56,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; @@ -68,6 +69,7 @@ public class ADStateManagerTests extends ESTestCase { private Clock clock; private Duration duration; private Throttler throttler; + private ThreadPool context; @Override protected NamedXContentRegistry xContentRegistry() { @@ -91,12 +93,13 @@ public void setUp() throws Exception { clock = mock(Clock.class); duration = Duration.ofHours(1); throttler = new Throttler(clock); + stateManager = new ADStateManager( client, xContentRegistry(), modelManager, settings, - new ClientUtil(settings, client, throttler), + new ClientUtil(settings, client, throttler, context), clock, duration ); From d797543fc24e1c7213ad2053fb255b0b371e6df9 Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Thu, 5 Mar 2020 14:49:19 -0800 Subject: [PATCH 7/9] Cancel long running query if a new request coming for given detector id --- .../ad/AnomalyDetectorPlugin.java | 42 +---- .../ad/cluster/CancelQueryUtil.java | 138 --------------- .../ad/cluster/DailyCron.java | 6 +- .../ad/cluster/MasterEventListener.java | 7 +- .../ad/constant/CommonName.java | 5 + .../ad/feature/FeatureManager.java | 1 - .../AnomalyResultTransportAction.java | 5 - .../ad/util/ClientUtil.java | 118 +++++++++++-- .../ad/util/Throttler.java | 17 -- .../ad/TestHelpers.java | 2 +- .../ad/cluster/CancelQueryUtilTests.java | 161 ------------------ .../ad/cluster/DailyCronTests.java | 4 +- .../ad/cluster/MasterEventListenerTests.java | 4 +- .../ADStatsTransportActionTests.java | 4 +- .../ad/transport/AnomalyResultTests.java | 31 +--- .../ad/util/IndexUtilsTests.java | 4 +- 16 files changed, 124 insertions(+), 425 deletions(-) delete mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java delete mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtilTests.java diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 20e9e296..329162d8 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -19,7 +19,6 @@ import com.amazon.opendistroforelasticsearch.ad.cluster.ADClusterEventListener; import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData; import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData.ADMetaDataDiff; -import com.amazon.opendistroforelasticsearch.ad.cluster.CancelQueryUtil; import com.amazon.opendistroforelasticsearch.ad.cluster.DeleteDetector; import com.amazon.opendistroforelasticsearch.ad.cluster.HashRing; import com.amazon.opendistroforelasticsearch.ad.cluster.MasterEventListener; @@ -78,15 +77,11 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.security.AccessController; import java.security.PrivilegedAction; import java.time.Clock; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.SpecialPermission; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -105,8 +100,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.index.IndexModule; -import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.monitor.jvm.JvmService; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; @@ -114,7 +107,6 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; -import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; @@ -147,37 +139,6 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip public AnomalyDetectorPlugin() {} - - // Semaphore used to allow & block indexing operations during the test - private static final Semaphore ALLOWED_OPERATIONS = new Semaphore(0); - - @Override - public void onIndexModule(IndexModule indexModule) { - indexModule.addSearchOperationListener(new BlockingOperationListener()); - } - - public static class BlockingOperationListener implements SearchOperationListener { - private static final Logger log = LogManager.getLogger(AnomalyDetectorPlugin.class); - - @Override - public void onPreQueryPhase(SearchContext searchContext) { - preCheck(searchContext); - } - - private void preCheck(SearchContext searchContext) { - try { - log.info("checking"); - if (ALLOWED_OPERATIONS.tryAcquire(20, TimeUnit.SECONDS)) { - log.debug("passed"); - return; - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - throw new IllegalStateException("Something went wrong"); - } - } - @Override public List getRestHandlers( Settings settings, @@ -303,7 +264,6 @@ public Collection createComponents( anomalyDetectorRunner = new AnomalyDetectorRunner(modelManager, featureManager, AnomalyDetectorSettings.MAX_PREVIEW_RESULTS); DeleteDetector deleteUtil = new DeleteDetector(clusterService, clock); - CancelQueryUtil cancelQueryUtil = new CancelQueryUtil(throttler); Map> stats = ImmutableMap .>builder() @@ -350,7 +310,7 @@ public Collection createComponents( deleteUtil, adCircuitBreakerService, adStats, - new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, cancelQueryUtil) + new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil) ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java deleted file mode 100644 index 38a8744d..00000000 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtil.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistroforelasticsearch.ad.cluster; - -import com.amazon.opendistroforelasticsearch.ad.util.Throttler; -import com.google.common.base.Strings; -import org.apache.logging.log4j.LogManager; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.tasks.TaskInfo; - -import java.time.Instant; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Utility class to cancel long running query - */ -public class CancelQueryUtil { - private static final String CANCEL_REASON = "Cancel long running query for Anomaly Detection"; - private static final long ONE_DAY = TimeUnit.DAYS.toMillis(1); - private static final org.apache.logging.log4j.Logger LOG = LogManager.getLogger(CancelQueryUtil.class); - private final Throttler throttler; - - - public CancelQueryUtil(Throttler throttler) { - this.throttler = throttler; - } - - public void cancelQuery(Client client) { - // Step 1: get current task - // list task api - // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-tasks-list.html - List tasks = getCurrentTasks(client); - - // Step 2: find the matched query, then kill it and delete matched entry from throttler - // One assumption here: the size of task list is much larger than negative cache - // since most of the search query should finish fast. - for (TaskInfo task : tasks) { - String detectorId = findMatchedQuery(task); - if (!Strings.isNullOrEmpty(detectorId)) { - cancelTask(task, detectorId, client); - } - System.out.println(detectorId); - System.out.println(task); - } - } - - - private List getCurrentTasks(Client client) { - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - AtomicReference> taskList = new AtomicReference<>(); - client.execute( - ListTasksAction.INSTANCE, - listTasksRequest, - ActionListener - .wrap( - response -> { - LOG.info("List all tasks"); - taskList.set(response.getTasks()); - }, - exception -> { - LOG.error("List Tasks failed.", exception); - } - ) - ); - return taskList.get(); - } - - private String findMatchedQuery(TaskInfo task) { - for (Iterator>> it = throttler.getNegativeCache().entrySet().iterator(); - it.hasNext();) { - Map.Entry> entry = it.next(); - if (throttler.getClock().millis() - entry.getValue().getValue().getEpochSecond() > ONE_DAY) { - String queryDescription = getQueryDescription(entry); - if (queryDescription.equals(task.getDescription())) { - LOG.info("Found long running query for detector: {}", entry.getKey()); - return entry.getKey(); - } - } else { - LOG.info("No query is running longer than 1 day"); - } - } - return null; - } - - private void cancelTask(TaskInfo task, String detectorId, Client client) { - // 1) use task management API to cancel query - // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-cluster-cancel-tasks.html - // 2) remove matched entry from negative cache - CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); - cancelTasksRequest.setReason(CANCEL_REASON); - cancelTasksRequest.setTaskId(task.getTaskId()); - client.execute( - CancelTasksAction.INSTANCE, - cancelTasksRequest, - ActionListener.wrap( - response -> { - LOG.info("Cancel task: {}", task.getTaskId()); - throttler.clearFilteredQuery(detectorId); - LOG.info("Remove negative cache for detector: {}", detectorId); - }, - exception -> { - LOG.error("Failed to cancel task: {}", task.getTaskId()); - } - ) - ); - } - - - private String getQueryDescription(Map.Entry> entry) { - SearchRequest request = (SearchRequest) entry.getValue().getKey(); - return request.getDescription(); - } -} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java index 321e5e66..1a10d8d8 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java @@ -44,16 +44,13 @@ public class DailyCron implements Runnable { private final Client client; private final Duration checkpointTtl; private final ClientUtil clientUtil; - private final CancelQueryUtil cancelQueryUtil; - public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client, Duration checkpointTtl, ClientUtil clientUtil, - CancelQueryUtil cancelQueryUtil) { + public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client, Duration checkpointTtl, ClientUtil clientUtil) { this.deleteUtil = deleteUtil; this.clock = clock; this.client = client; this.clientUtil = clientUtil; this.checkpointTtl = checkpointTtl; - this.cancelQueryUtil = cancelQueryUtil; } @Override @@ -91,7 +88,6 @@ public void run() { ) ); deleteUtil.deleteDetectorResult(client); - cancelQueryUtil.cancelQuery(client); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java index b28e49b1..a3735579 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java @@ -39,7 +39,6 @@ public class MasterEventListener implements LocalNodeMasterListener { private Client client; private Clock clock; private ClientUtil clientUtil; - private CancelQueryUtil cancelQueryUtil; public MasterEventListener( ClusterService clusterService, @@ -47,8 +46,7 @@ public MasterEventListener( DeleteDetector deleteUtil, Client client, Clock clock, - ClientUtil clientUtil, - CancelQueryUtil cancelQueryUtil + ClientUtil clientUtil ) { this.clusterService = clusterService; this.threadPool = threadPool; @@ -57,7 +55,6 @@ public MasterEventListener( this.clusterService.addLocalNodeMasterListener(this); this.clock = clock; this.clientUtil = clientUtil; - this.cancelQueryUtil = cancelQueryUtil; } @Override @@ -77,7 +74,7 @@ public void beforeStop() { if (dailyCron == null) { dailyCron = threadPool .scheduleWithFixedDelay( - new DailyCron(deleteUtil, clock, client, AnomalyDetectorSettings.CHECKPOINT_TTL, clientUtil, cancelQueryUtil), + new DailyCron(deleteUtil, clock, client, AnomalyDetectorSettings.CHECKPOINT_TTL, clientUtil), TimeValue.timeValueHours(24), executorName() ); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java index 99714644..b03d4364 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java @@ -26,4 +26,9 @@ public class CommonName { // Format name // ====================================== public static final String EPOCH_MILLIS_FORMAT = "epoch_millis"; + + // ====================================== + // Anomaly Detector name for X-Opaque-Id header + // ====================================== + public static final String ANOMALY_DETECTOR = "[Anomaly Detector]"; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java index 9675cb51..d6226443 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java @@ -264,7 +264,6 @@ public void maintenance() { @Deprecated public Features getPreviewFeatures(AnomalyDetector detector, long startMilli, long endMilli) { Entry>, Integer> sampleRangeResults = getSampleRanges(detector, startMilli, endMilli); - getCurrentFeatures(detector, startMilli, endMilli); List> sampleRanges = sampleRangeResults.getKey(); int stride = sampleRangeResults.getValue(); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java index b4cdfe2f..e0727764 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java @@ -249,11 +249,6 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< return; } AnomalyDetector anomalyDetector = detector.get(); - if (stateManager.hasRunningQuery(anomalyDetector)) { - LOG.error("There is one query running for detectorId: {}", anomalyDetector.getDetectorId()); - listener.onFailure(new EndRunException(adID, "There is one query running on AnomalyDetector", true)); - return; - } String thresholdModelID = modelManager.getThresholdModelId(adID); Optional thresholdNode = hashRing.getOwningNode(thresholdModelID); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index bd06c254..0af4ce46 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -17,6 +17,7 @@ import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT; +import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -25,9 +26,12 @@ import java.util.function.Function; import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException; +import com.amazon.opendistroforelasticsearch.ad.common.exception.InternalFailure; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionFuture; @@ -35,6 +39,13 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -42,6 +53,9 @@ import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.threadpool.ThreadPool; public class ClientUtil { @@ -166,11 +180,11 @@ public Response * Send a nonblocking request with a timeout and return response. The request will first be put into * the negative cache. Once the request complete, it will be removed from the negative cache. * + * @param ActionRequest + * @param ActionResponse * @param request request like index/search/get * @param LOG log * @param consumer functional interface to operate as a client request like client::get - * @param ActionRequest - * @param ActionResponse * @param detector Anomaly Detector * @return the response * @throws EndRunException when there is already a query running @@ -184,30 +198,30 @@ public Optional AnomalyDetector detector ) { - try (ThreadContext.StoredContext context = threadPool.getThreadContext().stashContext()){ - LOG.info("inside the try block"); - assert context != null; - threadPool.getThreadContext().putHeader("X-Opaque-Id", "testId"); - if (!throttler.insertFilteredQuery(detector.getDetectorId(), request)) { - LOG.error("There is one query running for detectorId: {}", detector.getDetectorId()); - throw new EndRunException(detector.getDetectorId(), "There is one query running on AnomalyDetector", true); + try { + String detectorId = detector.getDetectorId(); + if (!throttler.insertFilteredQuery(detectorId, request)) { + LOG.info("There is one query running for detectorId: {}. Trying to cancel the long running query", detectorId); + cancelRunningQuery(client, detectorId, LOG); } AtomicReference respReference = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - try { + try (ThreadContext.StoredContext context = threadPool.getThreadContext().stashContext()) { + assert context != null; + threadPool.getThreadContext().putHeader(Task.X_OPAQUE_ID, CommonName.ANOMALY_DETECTOR + ":" + detectorId); consumer.accept(request, new LatchedActionListener(ActionListener.wrap(response -> { // clear negative cache - throttler.clearFilteredQuery(detector.getDetectorId()); + throttler.clearFilteredQuery(detectorId); respReference.set(response); }, exception -> { // clear negative cache - throttler.clearFilteredQuery(detector.getDetectorId()); + throttler.clearFilteredQuery(detectorId); LOG.error("Cannot get response for request {}, error: {}", request, exception); }), latch)); } catch (Exception e) { - LOG.error("Failed to process the request for detectorId: {}.", detector.getDetectorId()); - throttler.clearFilteredQuery(detector.getDetectorId()); + LOG.error("Failed to process the request for detectorId: {}.", detectorId); + throttler.clearFilteredQuery(detectorId); throw e; } @@ -230,4 +244,80 @@ public Optional public boolean hasRunningQuery(AnomalyDetector detector) { return throttler.getFilteredQuery(detector.getDetectorId()).isPresent(); } + + /** + * Cancel long running query for given detectorId + * @param client Elasticsearch client + * @param detectorId Anomaly Detector Id + * @param LOG Logger + */ + private void cancelRunningQuery(Client client, String detectorId, Logger LOG) { + ListTasksRequest listTasksRequest = new ListTasksRequest(); + client + .execute( + ListTasksAction.INSTANCE, + listTasksRequest, + ActionListener.wrap(response -> { onListTaskResponse(response, detectorId, LOG); }, exception -> { + LOG.error("List Tasks failed.", exception); + throw new InternalFailure(detectorId, "Failed to list current tasks", exception); + }) + ); + } + + /** + * Helper function to handle ListTasksResponse + * @param listTasksResponse ListTasksResponse + * @param detectorId Anomaly Detector Id + * @param LOG Logger + */ + private void onListTaskResponse(ListTasksResponse listTasksResponse, String detectorId, Logger LOG) { + List tasks = listTasksResponse.getTasks(); + TaskInfo matchedTask = null; + for (TaskInfo task : tasks) { + if (!task.getHeaders().isEmpty() && task.getHeaders().get(Task.X_OPAQUE_ID) != null) { + if (task.getHeaders().get(Task.X_OPAQUE_ID).contains(detectorId)) { + matchedTask = task; + break; + } + } + } + // case 1: given detectorId is not in current task list + if (matchedTask == null) { + // log and then clear negative cache + LOG.info("Couldn't find task for detectorId: {}. Clean this entry from Throttler", detectorId); + throttler.clearFilteredQuery(detectorId); + return; + } + // case 2: we can find the task for given detectorId + TaskId parentTaskId = matchedTask.getParentTaskId().isSet() ? matchedTask.getParentTaskId() : matchedTask.getTaskId(); + CancelTasksRequest cancelTaskRequest = new CancelTasksRequest(); + cancelTaskRequest.setParentTaskId(parentTaskId); + LOG.info("Start to cancel task for parentTaskId: {}", parentTaskId); + client + .execute( + CancelTasksAction.INSTANCE, + cancelTaskRequest, + ActionListener.wrap(response -> { onCancelTaskResponse(response, detectorId, LOG); }, exception -> { + LOG.error("Failed to cancel task for detectorId: " + detectorId, exception); + throw new InternalFailure(detectorId, "Failed to cancel current tasks", exception); + }) + ); + } + + /** + * Helper function to handle CancelTasksResponse + * @param cancelTasksResponse CancelTasksResponse + * @param detectorId Anomaly Detector Id + * @param LOG Logger + */ + private void onCancelTaskResponse(CancelTasksResponse cancelTasksResponse, String detectorId, Logger LOG) { + List nodeFailures = cancelTasksResponse.getNodeFailures(); + List taskFailures = cancelTasksResponse.getTaskFailures(); + if (nodeFailures.isEmpty() && taskFailures.isEmpty()) { + LOG.info("Cancelling query for detectorId: {} succeeds. Clear entry from Throttler", detectorId); + throttler.clearFilteredQuery(detectorId); + return; + } + throw new InternalFailure(detectorId, "Failed to cancel current tasks due to node or task failures"); + } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java index be5799fd..4c5ebaed 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java @@ -31,23 +31,6 @@ public class Throttler { private final ConcurrentHashMap> negativeCache; private final Clock clock; - /** - * Getter for clock - * @return clock - */ - public Clock getClock() { - return clock; - } - - - /** - * Getter for negativeCache - * @return negative cache map ConcurrentHashMap - */ - public ConcurrentHashMap> getNegativeCache() { - return negativeCache; - } - public Throttler(Clock clock) { this.negativeCache = new ConcurrentHashMap<>(); this.clock = clock; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index 2653fbbc..642748c2 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -15,9 +15,9 @@ package com.amazon.opendistroforelasticsearch.ad; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorExecutionInput; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; -import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.Feature; import com.amazon.opendistroforelasticsearch.ad.model.FeatureData; import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtilTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtilTests.java deleted file mode 100644 index 2c0f1359..00000000 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/CancelQueryUtilTests.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistroforelasticsearch.ad.cluster; - -import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; -import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; -import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; -import com.amazon.opendistroforelasticsearch.ad.util.Throttler; -import com.google.common.collect.ImmutableList; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.index.reindex.BulkByScrollTask; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskInfo; -import org.junit.After; -import org.junit.Before; - - -import java.time.Clock; -import java.time.temporal.ChronoUnit; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class CancelQueryUtilTests extends AbstractADTest { - private Client client; - private Throttler throttler; - private static String NODE_ID = "node_id"; - - private enum CancelQueryExecutionMode { - CANCEL_QUERY_NORMAL, - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - super.setUpLog4jForJUnit(DeleteDetector.class); - client = mock(Client.class); - Clock clock = Clock.systemUTC(); - throttler = new Throttler(clock); - } - - @Override - @After - public void tearDown() throws Exception { - super.tearDown(); - super.tearDownLog4jForJUnit(); - } - - @SuppressWarnings("unchecked") - public void deleteDetectorResponseTemplate(CancelQueryUtilTests.CancelQueryExecutionMode mode) throws Exception { - // setup ListTaskResponse - List tasks = ImmutableList.of( - new TaskInfo( - new TaskId("test", 123), - "test", - "test", - "test", - new BulkByScrollTask.Status( - 1, - 10, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - timeValueMillis(0), - 0, - null, - timeValueMillis(0) - ), - 0, - 0, - true, - new TaskId("test", 123), - Collections.emptyMap()) - ); - ListTasksResponse listTasksResponse = mock(ListTasksResponse.class); - when(listTasksResponse.getTasks()).thenReturn(tasks); - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assertTrue(String.format("The size of args is %d. Its content is %s", args.length, Arrays.toString(args)), args.length >= 3); - assertTrue(args[2] instanceof ActionListener); - - ActionListener listener = (ActionListener) args[2]; - - assertTrue(listener != null); - if (mode == CancelQueryUtilTests.CancelQueryExecutionMode.CANCEL_QUERY_NORMAL) { - listener.onResponse(listTasksResponse); - } - return null; - }).when(client).execute(eq(ListTasksAction.INSTANCE), any(), any()); - - - // setup CancelTasksResponse - CancelTasksResponse cancelTaskResponse = mock(CancelTasksResponse.class); - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assertTrue(String.format("The size of args is %d. Its content is %s", args.length, Arrays.toString(args)), args.length >= 3); - assertTrue(args[2] instanceof ActionListener); - - ActionListener listener = (ActionListener) args[2]; - - assertTrue(listener != null); - if (mode == CancelQueryUtilTests.CancelQueryExecutionMode.CANCEL_QUERY_NORMAL) { - listener.onResponse(cancelTaskResponse); - } - return null; - }).when(client).execute(eq(CancelTasksAction.INSTANCE), any(), any()); - - CancelQueryUtil cancelQueryUtil = new CancelQueryUtil(throttler); - cancelQueryUtil.cancelQuery(client); - - // setup negative cache - AnomalyDetector detector = mock(AnomalyDetector.class); - SearchSourceBuilder featureQuery = new SearchSourceBuilder(); - IntervalTimeConfiguration detectionInterval = new IntervalTimeConfiguration(1, ChronoUnit.MINUTES); - when(detector.getTimeField()).thenReturn("testTimeField"); - when(detector.getIndices()).thenReturn(Arrays.asList("testIndices")); - when(detector.generateFeatureQuery()).thenReturn(featureQuery); - when(detector.getDetectionInterval()).thenReturn(detectionInterval); - - SearchRequest searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0])); - throttler.insertFilteredQuery("test detector id", searchRequest); - } - - public void testNormalCancelQuery() throws Exception { - return; -// deleteDetectorResponseTemplate(CancelQueryExecutionMode.CANCEL_QUERY_NORMAL); - } -} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java index 00c1d9ee..2772cd1b 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java @@ -61,8 +61,7 @@ private void templateDailyCron(DailyCronTestExecutionMode mode) { Clock clock = mock(Clock.class); Client client = mock(Client.class); ClientUtil clientUtil = mock(ClientUtil.class); - CancelQueryUtil cancelQueryUtil = mock(CancelQueryUtil.class); - DailyCron cron = new DailyCron(deleteUtil, clock, client, Duration.ofHours(24), clientUtil, cancelQueryUtil); + DailyCron cron = new DailyCron(deleteUtil, clock, client, Duration.ofHours(24), clientUtil); doAnswer(invocation -> { Object[] args = invocation.getArguments(); @@ -86,7 +85,6 @@ private void templateDailyCron(DailyCronTestExecutionMode mode) { // those tests are covered by each util class doNothing().when(deleteUtil).deleteDetectorResult(eq(client)); - doNothing().when(cancelQueryUtil).cancelQuery(eq(client)); cron.run(); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java index ba293e32..0a0a4a17 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java @@ -46,7 +46,6 @@ public class MasterEventListenerTests extends AbstractADTest { private Cancellable dailyCancellable; private MasterEventListener masterService; private ClientUtil clientUtil; - private CancelQueryUtil cancelQueryUtil; @Override @Before @@ -56,7 +55,6 @@ public void setUp() throws Exception { threadPool = mock(ThreadPool.class); hourlyCancellable = mock(Cancellable.class); dailyCancellable = mock(Cancellable.class); - cancelQueryUtil = mock(CancelQueryUtil.class); when(threadPool.scheduleWithFixedDelay(any(HourlyCron.class), any(TimeValue.class), any(String.class))) .thenReturn(hourlyCancellable); when(threadPool.scheduleWithFixedDelay(any(DailyCron.class), any(TimeValue.class), any(String.class))).thenReturn(dailyCancellable); @@ -64,7 +62,7 @@ public void setUp() throws Exception { client = mock(Client.class); clock = mock(Clock.class); clientUtil = mock(ClientUtil.class); - masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, cancelQueryUtil); + masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil); } public void testOnOffMaster() { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java index f1250350..d2032a84 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.junit.Before; import org.junit.Test; @@ -59,7 +60,8 @@ public void setUp() throws Exception { Client client = client(); Clock clock = mock(Clock.class); Throttler throttler = new Throttler(clock); - IndexUtils indexUtils = new IndexUtils(client, new ClientUtil(Settings.EMPTY, client, throttler), clusterService()); + ThreadPool threadPool = mock(ThreadPool.class); + IndexUtils indexUtils = new IndexUtils(client, new ClientUtil(Settings.EMPTY, client, throttler, threadPool), clusterService()); ModelManager modelManager = mock(ModelManager.class); clusterStatName1 = "clusterStat1"; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java index b7b4ccd0..559fb051 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java @@ -237,7 +237,8 @@ public void setUp() throws Exception { indexNameResolver = new IndexNameExpressionResolver(); Clock clock = mock(Clock.class); Throttler throttler = new Throttler(clock); - ClientUtil clientUtil = new ClientUtil(Settings.EMPTY, client, throttler); + ThreadPool threadpool = mock(ThreadPool.class); + ClientUtil clientUtil = new ClientUtil(Settings.EMPTY, client, throttler, threadpool); IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService); Map> statsMap = new HashMap>() { @@ -727,34 +728,6 @@ public void testMute() { assertThat(exception.getMessage(), containsString(AnomalyResultTransportAction.NODE_UNRESPONSIVE_ERR_MSG)); } - public void testRejectRequestBasedOnNegativeCache() { - when(stateManager.hasRunningQuery(detector)).thenReturn(true); - AnomalyResultTransportAction action = spy( - new AnomalyResultTransportAction( - new ActionFilters(Collections.emptySet()), - transportService, - client, - settings, - stateManager, - runner, - anomalyDetectionIndices, - featureQuery, - normalModelManager, - hashRing, - clusterService, - indexNameResolver, - threadPool, - adCircuitBreakerService, - adStats - ) - ); - AnomalyResultRequest request = new AnomalyResultRequest(adID, 100, 200); - PlainActionFuture listener = new PlainActionFuture<>(); - action.doExecute(null, request, listener); - Throwable exception = assertException(listener, AnomalyDetectionException.class); - assertThat(exception.getMessage(), containsString("There is one query running on AnomalyDetector")); - } - public void testRCFLatchAwaitException() throws InterruptedException { // These constructors register handler in transport service diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java index d3660076..9fdc9123 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; import org.junit.Test; @@ -35,7 +36,8 @@ public void setup() { Client client = client(); Clock clock = mock(Clock.class); Throttler throttler = new Throttler(clock); - clientUtil = new ClientUtil(Settings.EMPTY, client, throttler); + ThreadPool threadPool = mock(ThreadPool.class); + clientUtil = new ClientUtil(Settings.EMPTY, client, throttler, threadPool); } @Test From 8b9d6992344e31ccede5f9fffd4782ebe1e9a9cb Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Thu, 12 Mar 2020 13:46:15 -0700 Subject: [PATCH 8/9] Address feedback: 1. Adding description to throttledTimedRequest 2. Don't send new request if there is one running query. We only cancel the running one. 3. Adding logic to deal with single task cancelling(no parent task) 4. Adding log info/error and removing extra space. --- .../ad/util/ClientUtil.java | 43 +++++++++++++------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index 0af4ce46..2b92b6af 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -25,7 +25,6 @@ import java.util.function.BiConsumer; import java.util.function.Function; -import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException; import com.amazon.opendistroforelasticsearch.ad.common.exception.InternalFailure; import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; @@ -177,9 +176,11 @@ public Response } /** - * Send a nonblocking request with a timeout and return response. The request will first be put into - * the negative cache. Once the request complete, it will be removed from the negative cache. - * + * Send a nonblocking request with a timeout and return response. + * If there is already a query running on given detector, it will try to + * cancel the query. Otherwise it will add this query to the negative cache + * and then attach the AnomalyDetection specific header to the request. + * Once the request complete, it will be removed from the negative cache. * @param ActionRequest * @param ActionResponse * @param request request like index/search/get @@ -187,7 +188,7 @@ public Response * @param consumer functional interface to operate as a client request like client::get * @param detector Anomaly Detector * @return the response - * @throws EndRunException when there is already a query running + * @throws InternalFailure when there is already a query running * @throws ElasticsearchTimeoutException when we cannot get response within time. * @throws IllegalStateException when the waiting thread is interrupted */ @@ -203,6 +204,7 @@ public Optional if (!throttler.insertFilteredQuery(detectorId, request)) { LOG.info("There is one query running for detectorId: {}. Trying to cancel the long running query", detectorId); cancelRunningQuery(client, detectorId, LOG); + throw new InternalFailure(detector.getDetectorId(), "There is already a query running on AnomalyDetector"); } AtomicReference respReference = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); @@ -226,7 +228,6 @@ public Optional } if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) { - throw new ElasticsearchTimeoutException("Cannot get response within time limit: " + request.toString()); } return Optional.ofNullable(respReference.get()); @@ -253,6 +254,7 @@ public boolean hasRunningQuery(AnomalyDetector detector) { */ private void cancelRunningQuery(Client client, String detectorId, Logger LOG) { ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setActions("*search"); client .execute( ListTasksAction.INSTANCE, @@ -272,27 +274,38 @@ private void cancelRunningQuery(Client client, String detectorId, Logger LOG) { */ private void onListTaskResponse(ListTasksResponse listTasksResponse, String detectorId, Logger LOG) { List tasks = listTasksResponse.getTasks(); - TaskInfo matchedTask = null; + TaskId matchedParentTaskId = null; + TaskId matchedSingleTaskId = null; for (TaskInfo task : tasks) { - if (!task.getHeaders().isEmpty() && task.getHeaders().get(Task.X_OPAQUE_ID) != null) { - if (task.getHeaders().get(Task.X_OPAQUE_ID).contains(detectorId)) { - matchedTask = task; + if (!task.getHeaders().isEmpty() + && task.getHeaders().get(Task.X_OPAQUE_ID).equals(CommonName.ANOMALY_DETECTOR + ":" + detectorId)) { + if (!task.getParentTaskId().equals(TaskId.EMPTY_TASK_ID)) { + // we found the parent task, don't need to check more + matchedParentTaskId = task.getParentTaskId(); break; + } else { + // we found one task, keep checking other tasks + matchedSingleTaskId = task.getTaskId(); } } } // case 1: given detectorId is not in current task list - if (matchedTask == null) { + if (matchedParentTaskId == null && matchedSingleTaskId == null) { // log and then clear negative cache LOG.info("Couldn't find task for detectorId: {}. Clean this entry from Throttler", detectorId); throttler.clearFilteredQuery(detectorId); return; } // case 2: we can find the task for given detectorId - TaskId parentTaskId = matchedTask.getParentTaskId().isSet() ? matchedTask.getParentTaskId() : matchedTask.getTaskId(); CancelTasksRequest cancelTaskRequest = new CancelTasksRequest(); - cancelTaskRequest.setParentTaskId(parentTaskId); - LOG.info("Start to cancel task for parentTaskId: {}", parentTaskId); + if (matchedParentTaskId != null) { + cancelTaskRequest.setParentTaskId(matchedParentTaskId); + LOG.info("Start to cancel task for parentTaskId: {}", matchedParentTaskId.toString()); + } else { + cancelTaskRequest.setTaskId(matchedSingleTaskId); + LOG.info("Start to cancel task for taskId: {}", matchedSingleTaskId.toString()); + } + client .execute( CancelTasksAction.INSTANCE, @@ -311,6 +324,7 @@ private void onListTaskResponse(ListTasksResponse listTasksResponse, String dete * @param LOG Logger */ private void onCancelTaskResponse(CancelTasksResponse cancelTasksResponse, String detectorId, Logger LOG) { + // todo: adding retry mechanism List nodeFailures = cancelTasksResponse.getNodeFailures(); List taskFailures = cancelTasksResponse.getTaskFailures(); if (nodeFailures.isEmpty() && taskFailures.isEmpty()) { @@ -318,6 +332,7 @@ private void onCancelTaskResponse(CancelTasksResponse cancelTasksResponse, Strin throttler.clearFilteredQuery(detectorId); return; } + LOG.error("Failed to cancel task for detectorId: " + detectorId); throw new InternalFailure(detectorId, "Failed to cancel current tasks due to node or task failures"); } } From fd1883bda8007a0c4ecb6499ad28128d59a1cda0 Mon Sep 17 00:00:00 2001 From: Hanguang Zhang Date: Fri, 13 Mar 2020 11:01:57 -0700 Subject: [PATCH 9/9] 1. change listtask filter from "*search" to "*search*" --- .../amazon/opendistroforelasticsearch/ad/util/ClientUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index 2b92b6af..cdfc4392 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -254,7 +254,7 @@ public boolean hasRunningQuery(AnomalyDetector detector) { */ private void cancelRunningQuery(Client client, String detectorId, Logger LOG) { ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setActions("*search"); + listTasksRequest.setActions("*search*"); client .execute( ListTasksAction.INSTANCE,