Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #7972 SearchBackpressureIT flaky tests #8063

Merged
merged 18 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void testSearchShardTaskCancellationWithHighElapsedTime() throws Interrup
public void testSearchTaskCancellationWithHighCpu() throws InterruptedException {
Settings request = Settings.builder()
.put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced")
.put(SearchTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 50)
.put(SearchTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 1000)
.build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get());

Expand Down Expand Up @@ -182,7 +182,7 @@ public void testSearchTaskCancellationWithHighCpu() throws InterruptedException
public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedException {
Settings request = Settings.builder()
.put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced")
.put(SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 50)
.put(SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 1000)
.build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get());

Expand Down
85 changes: 48 additions & 37 deletions server/src/main/java/org/opensearch/tasks/CancellableTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
package org.opensearch.tasks;

import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
stephen-crawford marked this conversation as resolved.
Show resolved Hide resolved
import org.opensearch.common.unit.TimeValue;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.search.SearchService.NO_TIMEOUT;

Expand All @@ -47,17 +47,26 @@
*/
public abstract class CancellableTask extends Task {

private volatile String reason;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private static class CancelledInfo {
String reason;
/**
* The time this task was cancelled as a wall clock time since epoch ({@link System#currentTimeMillis()} style).
*/
Long cancellationStartTime;
/**
* The time this task was cancelled as a relative time ({@link System#nanoTime()} style).
*/
Long cancellationStartTimeNanos;

public CancelledInfo(String reason) {
this.reason = reason;
this.cancellationStartTime = System.currentTimeMillis();
this.cancellationStartTimeNanos = System.nanoTime();
}
}

private final SetOnce<CancelledInfo> cancelledInfo = new SetOnce<>();
private final TimeValue cancelAfterTimeInterval;
/**
* The time this task was cancelled as a wall clock time since epoch ({@link System#currentTimeMillis()} style).
*/
stephen-crawford marked this conversation as resolved.
Show resolved Hide resolved
private Long cancellationStartTime = null;
/**
* The time this task was cancelled as a relative time ({@link System#nanoTime()} style).
*/
private Long cancellationStartTimeNanos = null;

public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT);
Expand All @@ -81,14 +90,29 @@ public CancellableTask(
*/
public void cancel(String reason) {
assert reason != null;
if (cancelled.compareAndSet(false, true)) {
this.cancellationStartTime = System.currentTimeMillis();
this.cancellationStartTimeNanos = System.nanoTime();
this.reason = reason;
if (cancelledInfo.trySet(new CancelledInfo(reason))) {
onCancelled();
}
}

public boolean isCancelled() {
return cancelledInfo.get() != null;
}

/**
* Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled.
*/
public abstract boolean shouldCancelChildrenOnCancellation();

public TimeValue getCancellationTimeout() {
return cancelAfterTimeInterval;
}

/**
* Called after the task is cancelled so that it can take any actions that it has to take.
*/
protected void onCancelled() {}

/**
* Returns true if this task should be automatically cancelled if the coordinating node that
* requested this task left the cluster.
Expand All @@ -97,37 +121,24 @@ public boolean cancelOnParentLeaving() {
return true;
}

@Nullable
stephen-crawford marked this conversation as resolved.
Show resolved Hide resolved
public Long getCancellationStartTime() {
stephen-crawford marked this conversation as resolved.
Show resolved Hide resolved
return cancellationStartTime;
CancelledInfo info = cancelledInfo.get();
return (info != null) ? info.cancellationStartTime : null;
}

@Nullable
stephen-crawford marked this conversation as resolved.
Show resolved Hide resolved
public Long getCancellationStartTimeNanos() {
stephen-crawford marked this conversation as resolved.
Show resolved Hide resolved
return cancellationStartTimeNanos;
}

/**
* Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled.
*/
public abstract boolean shouldCancelChildrenOnCancellation();

public boolean isCancelled() {
return cancelled.get();
}

public TimeValue getCancellationTimeout() {
return cancelAfterTimeInterval;
CancelledInfo info = cancelledInfo.get();
return (info != null) ? info.cancellationStartTimeNanos : null;
}

/**
* The reason the task was cancelled or null if it hasn't been cancelled.
*/
@Nullable
stephen-crawford marked this conversation as resolved.
Show resolved Hide resolved
public final String getReasonCancelled() {
return reason;
public String getReasonCancelled() {
stephen-crawford marked this conversation as resolved.
Show resolved Hide resolved
CancelledInfo info = cancelledInfo.get();
return (info != null) ? info.reason : null;
}

/**
* Called after the task is cancelled so that it can take any actions that it has to take.
*/
protected void onCancelled() {}
}