forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Part 1: Support for cancel_after_timeinterval parameter in search and…
… msearch request (opensearch-project#986) (opensearch-project#1085) * Part 1: Support for cancel_after_timeinterval parameter in search and msearch request This commit introduces the new request level parameter to configure the timeout interval after which a search request will be cancelled. For msearch request the parameter is supported both at parent request and at sub child search requests. If it is provided at parent level and child search request doesn't have it then the parent level value is set at such child request. The parent level msearch is not used to cancel the parent request as it may be tricky to come up with correct value in cases when child search request can have different runtimes TEST: Added test for ser/de with new parameter Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com> * Part 2: Support for cancel_after_timeinterval parameter in search and msearch request This commit adds the handling of the new request level parameter and schedule cancellation task. It also adds a cluster setting to set a global cancellation timeout for search request which will be used in absence of request level timeout. TEST: Added new tests in SearchCancellationIT Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com> * Address Review feedback for Part 1 Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com> * Address review feedback for Part 2 Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com> * Update CancellableTask to remove the cancelOnTimeout boolean flag Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com> * Replace search.cancellation.timeout cluster setting with search.enforce_server.timeout.cancellation to control if cluster level cancel_after_time_interval should take precedence over request level cancel_after_time_interval value Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com> * Removing the search.enforce_server.timeout.cancellation cluster setting and just keeping search.cancel_after_time_interval setting with request level parameter taking the precedence. Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com> Co-authored-by: Sorabh Hamirwasia <hsorabh@amazon.com> Co-authored-by: Sorabh Hamirwasia <hsorabh@amazon.com>
- Loading branch information
Showing
17 changed files
with
590 additions
and
17 deletions.
There are no files selected for viewing
285 changes: 282 additions & 3 deletions
285
server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
135 changes: 135 additions & 0 deletions
135
server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.support; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.opensearch.action.ActionListener; | ||
import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; | ||
import org.opensearch.client.OriginSettingClient; | ||
import org.opensearch.client.node.NodeClient; | ||
import org.opensearch.common.settings.ClusterSettings; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.search.SearchService; | ||
import org.opensearch.tasks.CancellableTask; | ||
import org.opensearch.tasks.TaskId; | ||
import org.opensearch.threadpool.Scheduler; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import static org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; | ||
import static org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING; | ||
|
||
public class TimeoutTaskCancellationUtility { | ||
|
||
private static final Logger logger = LogManager.getLogger(TimeoutTaskCancellationUtility.class); | ||
|
||
/** | ||
* Wraps a listener with a timeout listener {@link TimeoutRunnableListener} to schedule the task cancellation for provided tasks on | ||
* generic thread pool | ||
* @param client - {@link NodeClient} | ||
* @param taskToCancel - task to schedule cancellation for | ||
* @param clusterSettings - {@link ClusterSettings} | ||
* @param listener - original listener associated with the task | ||
* @return wrapped listener | ||
*/ | ||
public static <Response> ActionListener<Response> wrapWithCancellationListener(NodeClient client, CancellableTask taskToCancel, | ||
ClusterSettings clusterSettings, ActionListener<Response> listener) { | ||
final TimeValue globalTimeout = clusterSettings.get(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING); | ||
final TimeValue timeoutInterval = (taskToCancel.getCancellationTimeout() == null) ? globalTimeout | ||
: taskToCancel.getCancellationTimeout(); | ||
// Note: -1 (or no timeout) will help to turn off cancellation. The combinations will be request level set at -1 or request level | ||
// set to null and cluster level set to -1. | ||
ActionListener<Response> listenerToReturn = listener; | ||
if (timeoutInterval.equals(SearchService.NO_TIMEOUT)) { | ||
return listenerToReturn; | ||
} | ||
|
||
try { | ||
final TimeoutRunnableListener<Response> wrappedListener = new TimeoutRunnableListener<>(timeoutInterval, listener, () -> { | ||
final CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); | ||
cancelTasksRequest.setTaskId(new TaskId(client.getLocalNodeId(), taskToCancel.getId())); | ||
cancelTasksRequest.setReason("Cancellation timeout of " + timeoutInterval + " is expired"); | ||
// force the origin to execute the cancellation as a system user | ||
new OriginSettingClient(client, TASKS_ORIGIN).admin().cluster() | ||
.cancelTasks(cancelTasksRequest, ActionListener.wrap(r -> logger.debug( | ||
"Scheduled cancel task with timeout: {} for original task: {} is successfully completed", timeoutInterval, | ||
cancelTasksRequest.getTaskId()), | ||
e -> logger.error(new ParameterizedMessage("Scheduled cancel task with timeout: {} for original task: {} is failed", | ||
timeoutInterval, cancelTasksRequest.getTaskId()), e)) | ||
); | ||
}); | ||
wrappedListener.cancellable = client.threadPool().schedule(wrappedListener, timeoutInterval, ThreadPool.Names.GENERIC); | ||
listenerToReturn = wrappedListener; | ||
} catch (Exception ex) { | ||
// if there is any exception in scheduling the cancellation task then continue without it | ||
logger.warn("Failed to schedule the cancellation task for original task: {}, will continue without it", taskToCancel.getId()); | ||
} | ||
return listenerToReturn; | ||
} | ||
|
||
/** | ||
* Timeout listener which executes the provided runnable after timeout is expired and if a response/failure is not yet received. | ||
* If either a response/failure is received before timeout then the scheduled task is cancelled and response/failure is sent back to | ||
* the original listener. | ||
*/ | ||
private static class TimeoutRunnableListener<Response> implements ActionListener<Response>, Runnable { | ||
|
||
private static final Logger logger = LogManager.getLogger(TimeoutRunnableListener.class); | ||
|
||
// Runnable to execute after timeout | ||
private final TimeValue timeout; | ||
private final ActionListener<Response> originalListener; | ||
private final Runnable timeoutRunnable; | ||
private final AtomicBoolean executeRunnable = new AtomicBoolean(true); | ||
private volatile Scheduler.ScheduledCancellable cancellable; | ||
private final long creationTime; | ||
|
||
TimeoutRunnableListener(TimeValue timeout, ActionListener<Response> listener, Runnable runAfterTimeout) { | ||
this.timeout = timeout; | ||
this.originalListener = listener; | ||
this.timeoutRunnable = runAfterTimeout; | ||
this.creationTime = System.nanoTime(); | ||
} | ||
|
||
@Override public void onResponse(Response response) { | ||
checkAndCancel(); | ||
originalListener.onResponse(response); | ||
} | ||
|
||
@Override public void onFailure(Exception e) { | ||
checkAndCancel(); | ||
originalListener.onFailure(e); | ||
} | ||
|
||
@Override public void run() { | ||
try { | ||
if (executeRunnable.compareAndSet(true, false)) { | ||
timeoutRunnable.run(); | ||
} // else do nothing since either response/failure is already sent to client | ||
} catch (Exception ex) { | ||
// ignore the exception | ||
logger.error(new ParameterizedMessage("Ignoring the failure to run the provided runnable after timeout of {} with " + | ||
"exception", timeout), ex); | ||
} | ||
} | ||
|
||
private void checkAndCancel() { | ||
if (executeRunnable.compareAndSet(true, false)) { | ||
logger.debug("Aborting the scheduled cancel task after {}", | ||
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - creationTime)); | ||
// timer has not yet expired so cancel it | ||
cancellable.cancel(); | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.