-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cancel search task on connection close #43332
Cancel search task on connection close #43332
Conversation
Pinging @elastic/es-search |
run elasticsearch-ci/bwc |
13494a6
to
1f45077
Compare
@javanna I looked at it and was a bit surprised how invasive the change was. I spent some time with this and got something that might be simpler to do. I wanted to shared it to hear what you think. It's a relatively simple patch so I am posting this here: diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java
index 95695bec4f0..a98966079ec 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java
@@ -19,13 +19,20 @@
package org.elasticsearch.rest.action.search;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
+import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
@@ -40,12 +47,16 @@ import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.term.TermSuggestionBuilder.SuggestMode;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import java.util.function.Consumer;
import java.util.function.IntConsumer;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
@@ -80,6 +91,57 @@ public class RestSearchAction extends BaseRestHandler {
return "search_action";
}
+ private static class CloseListener implements ActionListener<Void> {
+ private final NodeClient client;
+ private final Consumer<HttpChannel> onClose;
+ private TaskId taskId;
+ private HttpChannel channel;
+ private boolean taskFinished = false;
+
+ private CloseListener(NodeClient client, Consumer<HttpChannel> onClose) {
+ this.client = client;
+ this.onClose = onClose;
+ }
+
+ @Override
+ public synchronized void onResponse(Void aVoid) {
+ if (taskId != null) {
+ CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
+ cancelTasksRequest.setTaskId(taskId);
+ taskFinished();
+ onClose.accept(channel);
+ client.admin().cluster().cancelTasks(cancelTasksRequest, ActionListener.wrap(r -> {}, e -> {}));
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ //
+ }
+
+ synchronized void taskFinished() {
+ taskId = null;
+ taskFinished = true;
+ }
+
+ synchronized void maybeRegister(HttpChannel httpChannel, TaskId taskId) {
+ if (taskFinished) {
+ return; // we are already done.. do nothing
+ }
+ this.taskId = taskId;
+ if (channel == null) {
+ channel = httpChannel;
+ httpChannel.addCloseListener(this);
+ }
+ }
+
+ synchronized void reset() {
+ taskFinished = false;
+ }
+ }
+
+ private final Map<HttpChannel, CloseListener> listenerMap = ConcurrentCollections.newConcurrentMap();
+
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
SearchRequest searchRequest = new SearchRequest();
@@ -99,7 +161,27 @@ public class RestSearchAction extends BaseRestHandler {
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(searchRequest, request, parser, setSize));
- return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
+ return channel -> {
+ HttpChannel httpChannel = request.getHttpChannel();
+ CloseListener closeListener = listenerMap.computeIfAbsent(httpChannel, chan -> new CloseListener(client, listenerMap::remove));
+ closeListener.reset();
+ RestStatusToXContentListener<StatusToXContentObject> innerChannel = new RestStatusToXContentListener<>(channel);
+ Task task = client.executeLocally(SearchAction.INSTANCE, searchRequest, new ActionListener<>() {
+ @Override
+ public void onResponse(SearchResponse searchResponse) {
+ closeListener.taskFinished();
+ innerChannel.onResponse(searchResponse);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ closeListener.taskFinished();
+ innerChannel.onFailure(e);
+ }
+ });
+ TaskId taskId = new TaskId(client.getLocalNodeId(), task.getId());
+ closeListener.maybeRegister(httpChannel, taskId);
+ };
} lemme know what you think. |
I would like to point out that there can be multiple concurrent requests from a single channel. This is allowed by http pipelining. So this line here https://github.com/elastic/elasticsearch/pull/43332/files#diff-0b96f2316058011679a4c71f8069d6daR63 is not guaranteed. So we will probably need something more like this: |
Is it possible to detect that the http channel is used in a pipeline ? This is not widely used so we could simply ignore these channels and execute the search normally ?
+1, I like the fact that your change is done entirely on the |
HTTP Pipelining is allowed by the HTTP/1.1 spec. It is always possible even if very few clients implement it. There is no requirement that Elasticsearch supports concurrent search requests however. You are welcome to respond to a second search request on a channel with something like a |
I think we can make the pipelining work here, we can simply use a set of TaksIds and cancel all of them. I don't think that's a big issue. thanks for pointing it out @tbrooks8 |
@s1monw I like what you are proposing, I have missed that I could get my hands on the task id directly in the rest action by calling client#executeLocally instead of the usual client#search. That simplifies things quite a bit. I will update the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments regarding the synchronization of tasks that use the same channel but otherwise the logic looks good to me.
this.client = client; | ||
} | ||
|
||
void registerTask(HttpChannel httpChannel, TaskId taskId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We synchronize on the task when calling this function but different tasks can reference the same channel concurrently (http pipelining) so we need to synchronize this function too ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even if we use AtomicReference for the channel and CopyOnWriteArraySet for the taskids?
this.taskIds.add(taskId); | ||
} | ||
|
||
private void unregisterTask(TaskId taskId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here ?
} | ||
|
||
@Override | ||
public void onResponse(Void aVoid) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also synchronize this call in order to ensure that it cannot execute concurrently.
|
||
@Override | ||
public void onFailure(Exception e) { | ||
//nothing to do here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what situation can yields this but we should also remove the channel from the map here to be on the safe side (no leak).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh well, I just realized that I am never removing the channel from the map, it's an oversight as I haven't written a test for when a connection gets closed yet, will fix.
ActionType<Response> actionType, ActionListener<Response> listener) { | ||
//0: initial state, 1: either linked or already unlinked without being linked first, 2: first linked and then unlinked | ||
//link can only be done if it's the first thing that happens. unlink will only happen if link was done first. | ||
AtomicInteger link = new AtomicInteger(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only see two states used (0 and >0) so maybe an AtomicBoolean is enough ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
been there, tried that, multiple times, because I had the same thought. The problem is that we only want link when unlink has not been performed yet, and unlink only when link has already been performed (optional). This is the key statement: if (link.getAndIncrement() > 0)
which cannot be achieved with a compareAndSet
using AtomicBoolean
, as we only do the unlink if link has already happened, but in any case we change the state. We could probably achieve a similar behaviour with atomic boolean if we don't use compare and set, which would be ok as we are in a synchronized block, but I am not sure that we would be able to still skip unlink when link is not yet performed.
if (e instanceof Exception) { | ||
listener.onFailure((Exception)e); | ||
} else { | ||
//TODO should we rather throw in case of throwable instead of notifying the listener? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is called from an ActionListener
that handles Exception
only so getting something else than an Exception
should not be possible. I'd just rethrow if this is a type that we don't expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
man, good point. We should probably fix TaskListener.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #44946
heya @elastic/es-clients search tasks cancellation on connection close is in, and was backported to 7.x. Testing with your clients would be appreciated, also specific tests for http pipelining would be great. |
…lastic#45379) The low-level REST client exposes a `performRequestAsync` method that allows to send async requests, but today it does not expose the ability to cancel such requests. That is something that the underlying apache async http client supports, and it makes sense for us to expose. This commit adds a return value to the `performRequestAsync` method, which is backwards compatible. A `Cancellable` object gets returned, which exposes a `cancel` public method. When calling `cancel`, the on-going request associated with the returned `Cancellable` instance will be cancelled by calling its `abort` method. This works throughout multiple retries, though some special care was needed for the case where `cancel` is called between different attempts (when one attempt has failed and the consecutive one has not been sent yet). Note that cancelling a request on the client side does not automatically translate to cancelling the server side execution of it. That needs to be specifically implemented, which is on the work for the search API (see elastic#43332). Relates to elastic#44802
…45379) The low-level REST client exposes a `performRequestAsync` method that allows to send async requests, but today it does not expose the ability to cancel such requests. That is something that the underlying apache async http client supports, and it makes sense for us to expose. This commit adds a return value to the `performRequestAsync` method, which is backwards compatible. A `Cancellable` object gets returned, which exposes a `cancel` public method. When calling `cancel`, the on-going request associated with the returned `Cancellable` instance will be cancelled by calling its `abort` method. This works throughout multiple retries, though some special care was needed for the case where `cancel` is called between different attempts (when one attempt has failed and the consecutive one has not been sent yet). Note that cancelling a request on the client side does not automatically translate to cancelling the server side execution of it. That needs to be specifically implemented, which is on the work for the search API (see #43332). Relates to #44802
Since #43332 and #56327 we cancel rest requests when the rest channel closes and transport requests when the transport channel closes. This commit cancels proxy requests and its descendant requests when the proxy channel closes. This change is also required to support cross-clusters task cancellation. Relates #43332 Relates #56327
Since elastic#43332 and elastic#56327 we cancel rest requests when the rest channel closes and transport requests when the transport channel closes. This commit cancels proxy requests and its descendant requests when the proxy channel closes. This change is also required to support cross-clusters task cancellation. Relates elastic#43332 Relates elastic#56327
Since #43332 and #56327 we cancel rest requests when the rest channel closes and transport requests when the transport channel closes. This commit cancels proxy requests and its descendant requests when the proxy channel closes. This change is also required to support cross-clusters task cancellation. Relates #43332 Relates #56327
This PR introduces a mechanism to cancel a search task when its corresponding connection gets closed. That would relief users from having to manually deal with tasks and cancel them if needed. Especially the process of finding the task_id requires calling get tasks which needs to call every node in the cluster.
The implementation is based on associating each http channel with its currently running search task, and cancelling the task when the previously registered close listener gets called.
Left to do, for following PRs:
Closes #43105