Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Cancel query if given detector already have one #54

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public Collection<Object> createComponents(
Settings settings = environment.settings();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a unit/integration test for the cancel mechanism? If not, I strongly suggest we add one.

You can add an ESIntegTestCase where we

  1. define a SearchOperationListener such that index operations are delayed to simulate long running queries;
  2. create a fake plugin to use listener defined in 1)
  3. add AD plugin and the fake plugin together
  4. ... automate what you did on manual testing ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also one of my concern. I noticed we don't have any unit test for clientUtil and tried to add one but it's too complicated. When manual testing, I use similar listener which will delay the search to make up the long running query. Not sure if that can be done in integration test, I will sync up with you offline.

Clock clock = Clock.systemUTC();
Throttler throttler = new Throttler(clock);
ClientUtil clientUtil = new ClientUtil(settings, client, throttler);
ClientUtil clientUtil = new ClientUtil(settings, client, throttler, threadPool);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil);
this.clusterService = clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public void run() {
}
)
);

deleteUtil.deleteDetectorResult(client);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ public class CommonName {
// Format name
// ======================================
public static final String EPOCH_MILLIS_FORMAT = "epoch_millis";

// ======================================
// Anomaly Detector name for X-Opaque-Id header
// ======================================
public static final String ANOMALY_DETECTOR = "[Anomaly Detector]";
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,6 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener<
return;
}
AnomalyDetector anomalyDetector = detector.get();
if (stateManager.hasRunningQuery(anomalyDetector)) {
LOG.error("There is one query running for detectorId: {}", anomalyDetector.getDetectorId());
listener.onFailure(new EndRunException(adID, "There is one query running on AnomalyDetector", true));
return;
}

String thresholdModelID = modelManager.getThresholdModelId(adID);
Optional<DiscoveryNode> thresholdNode = hashRing.getOwningNode(thresholdModelID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,58 @@

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;

import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.InternalFailure;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;

public class ClientUtil {
private volatile TimeValue requestTimeout;
private Client client;
private final Throttler throttler;
private ThreadPool threadPool;

@Inject
public ClientUtil(Settings setting, Client client, Throttler throttler) {
public ClientUtil(Settings setting, Client client, Throttler throttler, ThreadPool threadPool) {
this.requestTimeout = REQUEST_TIMEOUT.get(setting);
this.client = client;
this.throttler = throttler;
this.threadPool = threadPool;
}

/**
Expand Down Expand Up @@ -159,17 +176,19 @@ public <Request extends ActionRequest, Response extends ActionResponse> Response
}

/**
* Send a nonblocking request with a timeout and return response. The request will first be put into
* the negative cache. Once the request complete, it will be removed from the negative cache.
*
* Send a nonblocking request with a timeout and return response.
* If there is already a query running on given detector, it will try to
* cancel the query. Otherwise it will add this query to the negative cache
* and then attach the AnomalyDetection specific header to the request.
* Once the request complete, it will be removed from the negative cache.
* @param <Request> ActionRequest
* @param <Response> ActionResponse
* @param request request like index/search/get
* @param LOG log
* @param consumer functional interface to operate as a client request like client::get
* @param <Request> ActionRequest
* @param <Response> ActionResponse
* @param detector Anomaly Detector
* @return the response
* @throws EndRunException when there is already a query running
* @throws InternalFailure when there is already a query running
* @throws ElasticsearchTimeoutException when we cannot get response within time.
* @throws IllegalStateException when the waiting thread is interrupted
*/
Expand All @@ -179,28 +198,32 @@ public <Request extends ActionRequest, Response extends ActionResponse> Optional
BiConsumer<Request, ActionListener<Response>> consumer,
AnomalyDetector detector
) {

try {
// if key already exist, reject the request and throws exception
if (!throttler.insertFilteredQuery(detector.getDetectorId(), request)) {
LOG.error("There is one query running for detectorId: {}", detector.getDetectorId());
throw new EndRunException(detector.getDetectorId(), "There is one query running on AnomalyDetector", true);
String detectorId = detector.getDetectorId();
if (!throttler.insertFilteredQuery(detectorId, request)) {
LOG.info("There is one query running for detectorId: {}. Trying to cancel the long running query", detectorId);
cancelRunningQuery(client, detectorId, LOG);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return after cancelling since we don't know when the cancel would actually happen? We might keep piling up new queries when the previous old queries are not cancelled.

Also, we need to send InternalFailure not EndRunException. EndRunException is used for scenarios when we might need to terminate AD job running soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Return after cancelling will be more safer. Will update.

throw new InternalFailure(detector.getDetectorId(), "There is already a query running on AnomalyDetector");
}
AtomicReference<Response> respReference = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);

try {
try (ThreadContext.StoredContext context = threadPool.getThreadContext().stashContext()) {
assert context != null;
threadPool.getThreadContext().putHeader(Task.X_OPAQUE_ID, CommonName.ANOMALY_DETECTOR + ":" + detectorId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just asking, will the X_OPAQUE_ID header be passed to child tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will. I can see both parent and children tasks have the same header from the manual test.

consumer.accept(request, new LatchedActionListener<Response>(ActionListener.wrap(response -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible the cancelRunningQuery in progress or fail when start a new request. If cancelRunningQuery is not time consuming, better to start a new request when we get respond of cancelRunningQuery. If it's heavy action, may need to monitor the cancelation status and retry if failed; so we can terminate unnecessary AD query to protect cluster performance. It's ok to add some todo&comments here and refactor it later if you think the change will be big.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is similar with Kaituo's comments. For safety concern, I will not start new request if we need to cancel the running one, just in case the cancel failed somehow and we keep adding new requests. We can revisit it later.

// clear negative cache
throttler.clearFilteredQuery(detector.getDetectorId());
throttler.clearFilteredQuery(detectorId);
respReference.set(response);
}, exception -> {
// clear negative cache
throttler.clearFilteredQuery(detector.getDetectorId());
throttler.clearFilteredQuery(detectorId);
LOG.error("Cannot get response for request {}, error: {}", request, exception);
}), latch));
} catch (Exception e) {
LOG.error("Failed to process the request for detectorId: {}.", detector.getDetectorId());
throttler.clearFilteredQuery(detector.getDetectorId());
LOG.error("Failed to process the request for detectorId: {}.", detectorId);
throttler.clearFilteredQuery(detectorId);
throw e;
}

Expand All @@ -222,4 +245,94 @@ public <Request extends ActionRequest, Response extends ActionResponse> Optional
public boolean hasRunningQuery(AnomalyDetector detector) {
return throttler.getFilteredQuery(detector.getDetectorId()).isPresent();
}

/**
* Cancel long running query for given detectorId
* @param client Elasticsearch client
* @param detectorId Anomaly Detector Id
* @param LOG Logger
*/
private void cancelRunningQuery(Client client, String detectorId, Logger LOG) {
ListTasksRequest listTasksRequest = new ListTasksRequest();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add some parameters to speed up task search:

  1. group_by=parents: so each group you only need to check header once
  2. actions=*search: since our queries are searches. We don't care about write or update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the advice. For the group_by=parents, it's a little weird. For the api it supports this feature(https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html#_task_grouping). However when it comes to java api, it's not supported as far as I see.

For actions=*search, I will added it as "actions=search" since I can see some child query has something like "indices:data/read/search[phase/query]"

Copy link
Member

@kaituo kaituo Mar 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you meant we need to use "actions=*search*", right? Yes, please do that. Your current code uses "*search".

listTasksRequest.setActions("*search*");
client
.execute(
ListTasksAction.INSTANCE,
listTasksRequest,
ActionListener.wrap(response -> { onListTaskResponse(response, detectorId, LOG); }, exception -> {
LOG.error("List Tasks failed.", exception);
throw new InternalFailure(detectorId, "Failed to list current tasks", exception);
})
);
}

/**
* Helper function to handle ListTasksResponse
* @param listTasksResponse ListTasksResponse
* @param detectorId Anomaly Detector Id
* @param LOG Logger
*/
private void onListTaskResponse(ListTasksResponse listTasksResponse, String detectorId, Logger LOG) {
List<TaskInfo> tasks = listTasksResponse.getTasks();
TaskId matchedParentTaskId = null;
TaskId matchedSingleTaskId = null;
for (TaskInfo task : tasks) {
if (!task.getHeaders().isEmpty()
&& task.getHeaders().get(Task.X_OPAQUE_ID).equals(CommonName.ANOMALY_DETECTOR + ":" + detectorId)) {
if (!task.getParentTaskId().equals(TaskId.EMPTY_TASK_ID)) {
// we found the parent task, don't need to check more
matchedParentTaskId = task.getParentTaskId();
break;
} else {
// we found one task, keep checking other tasks
matchedSingleTaskId = task.getTaskId();
}
}
}
// case 1: given detectorId is not in current task list
if (matchedParentTaskId == null && matchedSingleTaskId == null) {
// log and then clear negative cache
LOG.info("Couldn't find task for detectorId: {}. Clean this entry from Throttler", detectorId);
throttler.clearFilteredQuery(detectorId);
return;
}
// case 2: we can find the task for given detectorId
CancelTasksRequest cancelTaskRequest = new CancelTasksRequest();
if (matchedParentTaskId != null) {
cancelTaskRequest.setParentTaskId(matchedParentTaskId);
LOG.info("Start to cancel task for parentTaskId: {}", matchedParentTaskId.toString());
} else {
cancelTaskRequest.setTaskId(matchedSingleTaskId);
LOG.info("Start to cancel task for taskId: {}", matchedSingleTaskId.toString());
}

client
.execute(
CancelTasksAction.INSTANCE,
cancelTaskRequest,
ActionListener.wrap(response -> { onCancelTaskResponse(response, detectorId, LOG); }, exception -> {
LOG.error("Failed to cancel task for detectorId: " + detectorId, exception);
throw new InternalFailure(detectorId, "Failed to cancel current tasks", exception);
})
);
}

/**
* Helper function to handle CancelTasksResponse
* @param cancelTasksResponse CancelTasksResponse
* @param detectorId Anomaly Detector Id
* @param LOG Logger
*/
private void onCancelTaskResponse(CancelTasksResponse cancelTasksResponse, String detectorId, Logger LOG) {
// todo: adding retry mechanism
List<ElasticsearchException> nodeFailures = cancelTasksResponse.getNodeFailures();
List<TaskOperationFailure> taskFailures = cancelTasksResponse.getTaskFailures();
if (nodeFailures.isEmpty() && taskFailures.isEmpty()) {
LOG.info("Cancelling query for detectorId: {} succeeds. Clear entry from Throttler", detectorId);
Copy link
Contributor

@ylwu-amzn ylwu-amzn Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add some retry for these failed tasks. Otherwise, will wait for next detector run to cancel again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add a todo comment for now.

throttler.clearFilteredQuery(detectorId);
return;
}
LOG.error("Failed to cancel task for detectorId: " + detectorId);
throw new InternalFailure(detectorId, "Failed to cancel current tasks due to node or task failures");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log failures?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

package com.amazon.opendistroforelasticsearch.ad;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorExecutionInput;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.Feature;
import com.amazon.opendistroforelasticsearch.ad.model.FeatureData;
import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration;
Expand Down Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -78,6 +79,8 @@
import static org.elasticsearch.test.ESTestCase.randomDouble;
import static org.elasticsearch.test.ESTestCase.randomInt;
import static org.elasticsearch.test.ESTestCase.randomLong;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;

public class TestHelpers {

Expand Down Expand Up @@ -290,4 +293,18 @@ public static ClusterService createClusterService(ThreadPool threadPool, Cluster
);
return ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
}

public static ThreadContext createThreadContext() {
Settings build = Settings.builder().put("request.headers.default", "1").build();
ThreadContext context = new ThreadContext(build);
context.putHeader("foo", "bar");
context.putTransient("x", 1);
return context;
}

public static ThreadPool createThreadPool() {
ThreadPool pool = mock(ThreadPool.class);
when(pool.getThreadContext()).thenReturn(createThreadContext());
return pool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private void templateDailyCron(DailyCronTestExecutionMode mode) {
return null;
}).when(clientUtil).execute(eq(DeleteByQueryAction.INSTANCE), any(), any());

// those tests are covered by each util class
doNothing().when(deleteUtil).deleteDetectorResult(eq(client));

cron.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;

import java.io.IOException;
Expand All @@ -52,6 +53,7 @@ public class AnomalyDetectionIndicesTests extends ESIntegTestCase {
private Settings settings;
private ClusterService clusterService;
private Client client;
private ThreadPool context;

@Before
public void setup() {
Expand All @@ -71,10 +73,11 @@ public void setup() {
clusterSettings.add(AnomalyDetectorSettings.REQUEST_TIMEOUT);
clusterSetting = new ClusterSettings(settings, clusterSettings);
clusterService = TestHelpers.createClusterService(client().threadPool(), clusterSetting);
context = TestHelpers.createThreadPool();
client = mock(Client.class);
Clock clock = Clock.systemUTC();
Throttler throttler = new Throttler(clock);
requestUtil = new ClientUtil(settings, client, throttler);
requestUtil = new ClientUtil(settings, client, throttler, context);
indices = new AnomalyDetectionIndices(client(), clusterService, client().threadPool(), settings, requestUtil);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

Expand All @@ -68,6 +69,7 @@ public class ADStateManagerTests extends ESTestCase {
private Clock clock;
private Duration duration;
private Throttler throttler;
private ThreadPool context;

@Override
protected NamedXContentRegistry xContentRegistry() {
Expand All @@ -91,12 +93,13 @@ public void setUp() throws Exception {
clock = mock(Clock.class);
duration = Duration.ofHours(1);
throttler = new Throttler(clock);

stateManager = new ADStateManager(
client,
xContentRegistry(),
modelManager,
settings,
new ClientUtil(settings, client, throttler),
new ClientUtil(settings, client, throttler, context),
clock,
duration
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -59,7 +60,8 @@ public void setUp() throws Exception {
Client client = client();
Clock clock = mock(Clock.class);
Throttler throttler = new Throttler(clock);
IndexUtils indexUtils = new IndexUtils(client, new ClientUtil(Settings.EMPTY, client, throttler), clusterService());
ThreadPool threadPool = mock(ThreadPool.class);
IndexUtils indexUtils = new IndexUtils(client, new ClientUtil(Settings.EMPTY, client, throttler, threadPool), clusterService());
ModelManager modelManager = mock(ModelManager.class);

clusterStatName1 = "clusterStat1";
Expand Down
Loading