Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Cancel query if given detector already have one #54

Merged

Conversation

zhanghg08
Copy link
Contributor

@zhanghg08 zhanghg08 commented Mar 5, 2020

Issue #55, if available:
Cancel running query if given detector already has one running.

Description of changes:

  1. ./gradlew build
  2. start my own cluster and tested with a 20sec delay query as long running query. If one detector is running a query, a second query request will cancel the previous one and start itself.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@zhanghg08 zhanghg08 requested review from wnbts and ylwu-amzn March 5, 2020 23:42
@zhanghg08 zhanghg08 marked this pull request as ready for review March 5, 2020 23:45
TaskInfo matchedTask = null;
for (TaskInfo task : tasks) {
if (!task.getHeaders().isEmpty() && task.getHeaders().get(Task.X_OPAQUE_ID) != null) {
if (task.getHeaders().get(Task.X_OPAQUE_ID).contains(detectorId)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would
"if (task.getHeaders().get(Task.X_OPAQUE_ID).equals(CommonName.ANOMALY_DETECTOR + ":" + detectorId))"
improves performance since you might need do this comparison a lot of times if there are a lot of tasks?

equals is O(n), while contains can be O(n*m) where m is the string to match and n is the string to search.

See the implementation of contains (depends on indexOf):

http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/classes/java/lang/String.java#l1740

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, will change it.

String detectorId = detector.getDetectorId();
if (!throttler.insertFilteredQuery(detectorId, request)) {
LOG.info("There is one query running for detectorId: {}. Trying to cancel the long running query", detectorId);
cancelRunningQuery(client, detectorId, LOG);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return after cancelling since we don't know when the cancel would actually happen? We might keep piling up new queries when the previous old queries are not cancelled.

Also, we need to send InternalFailure not EndRunException. EndRunException is used for scenarios when we might need to terminate AD job running soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Return after cancelling will be more safer. Will update.

* @param LOG Logger
*/
private void cancelRunningQuery(Client client, String detectorId, Logger LOG) {
ListTasksRequest listTasksRequest = new ListTasksRequest();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add some parameters to speed up task search:

  1. group_by=parents: so each group you only need to check header once
  2. actions=*search: since our queries are searches. We don't care about write or update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the advice. For the group_by=parents, it's a little weird. For the api it supports this feature(https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html#_task_grouping). However when it comes to java api, it's not supported as far as I see.

For actions=*search, I will added it as "actions=search" since I can see some child query has something like "indices:data/read/search[phase/query]"

Copy link
Member

@kaituo kaituo Mar 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you meant we need to use "actions=*search*", right? Yes, please do that. Your current code uses "*search".

@@ -199,7 +199,7 @@ private static Void initGson() {
Settings settings = environment.settings();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a unit/integration test for the cancel mechanism? If not, I strongly suggest we add one.

You can add an ESIntegTestCase where we

  1. define a SearchOperationListener such that index operations are delayed to simulate long running queries;
  2. create a fake plugin to use listener defined in 1)
  3. add AD plugin and the fake plugin together
  4. ... automate what you did on manual testing ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also one of my concern. I noticed we don't have any unit test for clientUtil and tried to add one but it's too complicated. When manual testing, I use similar listener which will delay the search to make up the long running query. Not sure if that can be done in integration test, I will sync up with you offline.

@@ -162,11 +180,11 @@ public ClientUtil(Settings setting, Client client, Throttler throttler) {
* Send a nonblocking request with a timeout and return response. The request will first be put into
* the negative cache. Once the request complete, it will be removed from the negative cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more description about cancel request process

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

throw e;
}

if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return;
}
// case 2: we can find the task for given detectorId
TaskId parentTaskId = matchedTask.getParentTaskId().isSet() ? matchedTask.getParentTaskId() : matchedTask.getTaskId();
Copy link
Contributor

@ylwu-amzn ylwu-amzn Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible the parent task has parent too? If yes, should we find the root task and kill all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For our search query, there is only two-level parent-child relationship.

List<ElasticsearchException> nodeFailures = cancelTasksResponse.getNodeFailures();
List<TaskOperationFailure> taskFailures = cancelTasksResponse.getTaskFailures();
if (nodeFailures.isEmpty() && taskFailures.isEmpty()) {
LOG.info("Cancelling query for detectorId: {} succeeds. Clear entry from Throttler", detectorId);
Copy link
Contributor

@ylwu-amzn ylwu-amzn Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add some retry for these failed tasks. Otherwise, will wait for next detector run to cancel again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add a todo comment for now.

try {
try (ThreadContext.StoredContext context = threadPool.getThreadContext().stashContext()) {
assert context != null;
threadPool.getThreadContext().putHeader(Task.X_OPAQUE_ID, CommonName.ANOMALY_DETECTOR + ":" + detectorId);
consumer.accept(request, new LatchedActionListener<Response>(ActionListener.wrap(response -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible the cancelRunningQuery in progress or fail when start a new request. If cancelRunningQuery is not time consuming, better to start a new request when we get respond of cancelRunningQuery. If it's heavy action, may need to monitor the cancelation status and retry if failed; so we can terminate unnecessary AD query to protect cluster performance. It's ok to add some todo&comments here and refactor it later if you think the change will be big.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is similar with Kaituo's comments. For safety concern, I will not start new request if we need to cancel the running one, just in case the cancel failed somehow and we keep adding new requests. We can revisit it later.

try {
try (ThreadContext.StoredContext context = threadPool.getThreadContext().stashContext()) {
assert context != null;
threadPool.getThreadContext().putHeader(Task.X_OPAQUE_ID, CommonName.ANOMALY_DETECTOR + ":" + detectorId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just asking, will the X_OPAQUE_ID header be passed to child tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will. I can see both parent and children tasks have the same header from the manual test.

// case 2: we can find the task for given detectorId
TaskId parentTaskId = matchedTask.getParentTaskId().isSet() ? matchedTask.getParentTaskId() : matchedTask.getTaskId();
CancelTasksRequest cancelTaskRequest = new CancelTasksRequest();
cancelTaskRequest.setParentTaskId(parentTaskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From line292, if matchedTask.getParentTaskId().isSet() is false, will get matchedTask.getTaskId() as parentTaskId. For this case, cancelTaskRequest.setParentTaskId(parentTaskId) will cancel tasks which has parent task id as matchedTask.getTaskId(). Is it possible the matchedTask has no child tasks? If it's possible, will cancelTaskRequest.setParentTaskId(parentTaskId) throw exception or cancel matchedTask ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got your point. To avoid this corner case, I will go through the entire tasks list(previously it will early terminate once found matched). If there is only one task(no parent), we need to setTaskId, otherwise setParentTaskId

throttler.clearFilteredQuery(detectorId);
return;
}
throw new InternalFailure(detectorId, "Failed to cancel current tasks due to node or task failures");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log failures?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

1. Adding description to throttledTimedRequest
2. Don't send new request if there is one running query. We only cancel the running one.
3. Adding logic to deal with single task cancelling(no parent task)
4. Adding log info/error and removing extra space.
* @param LOG Logger
*/
private void cancelRunningQuery(Client client, String detectorId, Logger LOG) {
ListTasksRequest listTasksRequest = new ListTasksRequest();
Copy link
Member

@kaituo kaituo Mar 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you meant we need to use "actions=*search*", right? Yes, please do that. Your current code uses "*search".

Copy link
Contributor

@ylwu-amzn ylwu-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the change!

@zhanghg08 zhanghg08 merged commit f40c532 into opendistro-for-elasticsearch:development Mar 13, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants