Skip to content

Commit

Permalink
print reason why parent task was cancelled (opensearch-project#14604)
Browse files Browse the repository at this point in the history
Signed-off-by: kkewwei <kkewwei@163.com>
  • Loading branch information
kkewwei authored Jul 10, 2024
1 parent 605543b commit dfb8449
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
- Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))
- Create SystemIndexRegistry with helper method matchesSystemIndex ([#14415](https://github.com/opensearch-project/OpenSearch/pull/14415))
- Print reason why parent task was cancelled ([#14604](https://github.com/opensearch-project/OpenSearch/issues/14604))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public void testFailedToStartChildTaskAfterCancelled() throws Exception {
mainAction.startSubTask(taskId, subRequest, future);
TransportException te = expectThrows(TransportException.class, future::actionGet);
assertThat(te.getCause(), instanceOf(TaskCancelledException.class));
assertThat(te.getCause().getMessage(), equalTo("The parent task was cancelled, shouldn't start any child tasks"));
assertThat(te.getCause().getMessage(), equalTo("The parent task was cancelled, shouldn't start any child tasks, by user request"));
allowEntireRequest(rootRequest);
waitForRootTask(rootTaskFuture);
ensureAllBansRemoved();
Expand Down Expand Up @@ -386,7 +386,7 @@ static void waitForRootTask(ActionFuture<TestResponse> rootTask) {
assertThat(
cause.getMessage(),
anyOf(
equalTo("The parent task was cancelled, shouldn't start any child tasks"),
equalTo("The parent task was cancelled, shouldn't start any child tasks, by user request"),
containsString("Task cancelled before it started:"),
equalTo("Task was cancelled while executing")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitF
Collection<DiscoveryNode> childrenNodes = taskManager.startBanOnChildrenNodes(task.getId(), () -> {
logger.trace("child tasks of parent [{}] are completed", taskId);
groupedListener.onResponse(null);
});
}, reason);
taskManager.cancel(task, reason, () -> {
logger.trace("task [{}] is cancelled", taskId);
groupedListener.onResponse(null);
Expand Down
16 changes: 12 additions & 4 deletions server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -510,17 +510,22 @@ public Set<TaskId> getBannedTaskIds() {
return Collections.unmodifiableSet(banedParents.keySet());
}

public Collection<DiscoveryNode> startBanOnChildrenNodes(long taskId, Runnable onChildTasksCompleted) {
return startBanOnChildrenNodes(taskId, onChildTasksCompleted, "unknown");
}

/**
* Start rejecting new child requests as the parent task was cancelled.
*
* @param taskId the parent task id
* @param onChildTasksCompleted called when all child tasks are completed or failed
* @param reason the ban reason
* @return the set of current nodes that have outstanding child tasks
*/
public Collection<DiscoveryNode> startBanOnChildrenNodes(long taskId, Runnable onChildTasksCompleted) {
public Collection<DiscoveryNode> startBanOnChildrenNodes(long taskId, Runnable onChildTasksCompleted, String reason) {
final CancellableTaskHolder holder = cancellableTasks.get(taskId);
if (holder != null) {
return holder.startBan(onChildTasksCompleted);
return holder.startBan(onChildTasksCompleted, reason);
} else {
onChildTasksCompleted.run();
return Collections.emptySet();
Expand Down Expand Up @@ -585,6 +590,7 @@ private static class CancellableTaskHolder {
private List<Runnable> cancellationListeners = null;
private Map<DiscoveryNode, Integer> childTasksPerNode = null;
private boolean banChildren = false;
private String banReason;
private List<Runnable> childTaskCompletedListeners = null;

CancellableTaskHolder(CancellableTask task) {
Expand Down Expand Up @@ -662,7 +668,7 @@ public CancellableTask getTask() {

synchronized void registerChildNode(DiscoveryNode node) {
if (banChildren) {
throw new TaskCancelledException("The parent task was cancelled, shouldn't start any child tasks");
throw new TaskCancelledException("The parent task was cancelled, shouldn't start any child tasks, " + banReason);
}
if (childTasksPerNode == null) {
childTasksPerNode = new HashMap<>();
Expand All @@ -686,11 +692,13 @@ void unregisterChildNode(DiscoveryNode node) {
notifyListeners(listeners);
}

Set<DiscoveryNode> startBan(Runnable onChildTasksCompleted) {
Set<DiscoveryNode> startBan(Runnable onChildTasksCompleted, String reason) {
final Set<DiscoveryNode> pendingChildNodes;
final Runnable toRun;
synchronized (this) {
banChildren = true;
assert reason != null;
banReason = reason;
if (childTasksPerNode == null) {
pendingChildNodes = Collections.emptySet();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ public void testRegisterAndExecuteChildTaskWhileParentTaskIsBeingCanceled() thro
);
assertThat(cancelledException.getMessage(), startsWith("Task cancelled before it started:"));
CountDownLatch latch = new CountDownLatch(1);
taskManager.startBanOnChildrenNodes(parentTaskId.getId(), latch::countDown);
taskManager.startBanOnChildrenNodes(parentTaskId.getId(), latch::countDown, cancelledException.getMessage());
assertTrue("onChildTasksCompleted() is not invoked", latch.await(1, TimeUnit.SECONDS));
}

Expand Down

0 comments on commit dfb8449

Please sign in to comment.