Skip to content

Commit

Permalink
Changing SearchBP version to 2_6_0 (#6216)
Browse files Browse the repository at this point in the history
Signed-off-by: PritLadani <pritkladani@gmail.com>
  • Loading branch information
PritLadani authored Feb 7, 2023
1 parent a5dc22a commit 1757843
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ SearchBackpressureSettings getSettings() {
return settings;
}

SearchBackpressureState getSearchBackpressureStats(Class<? extends SearchBackpressureTask> taskType) {
SearchBackpressureState getSearchBackpressureState(Class<? extends SearchBackpressureTask> taskType) {
return searchBackpressureStates.get(taskType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.opensearch.common.unit.TimeValue;

/**
* Settings related to search backpressure mode and internal
* Settings related to search backpressure mode and interval
*
* @opensearch.internal
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,6 @@ public SearchTaskSettings(Settings settings, ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_BURST, this::setCancellationBurst);
}

/**
* Callback listeners.
*/

public double getTotalHeapPercentThreshold() {
return totalHeapPercentThreshold;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public SearchBackpressureStats(
public SearchBackpressureStats(StreamInput in) throws IOException {
searchShardTaskStats = new SearchShardTaskStats(in);
mode = SearchBackpressureMode.fromName(in.readString());
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
searchTaskStats = in.readOptionalWriteable(SearchTaskStats::new);
} else {
searchTaskStats = null;
Expand All @@ -64,7 +64,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public void writeTo(StreamOutput out) throws IOException {
searchShardTaskStats.writeTo(out);
out.writeString(mode.getName());
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeOptionalWriteable(searchTaskStats);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void testTrackerStateUpdateOnSearchTaskCompletion() {
// service.onTaskCompleted(new SearchTask(1, "test", "test", () -> "Test", TaskId.EMPTY_TASK_ID, new HashMap<>()));
service.onTaskCompleted(createMockTaskWithResourceStats(SearchTask.class, 100, 200));
}
assertEquals(100, service.getSearchBackpressureStats(SearchTask.class).getCompletionCount());
assertEquals(100, service.getSearchBackpressureState(SearchTask.class).getCompletionCount());
verify(mockTaskResourceUsageTracker, times(100)).update(any());
}

Expand Down Expand Up @@ -182,7 +182,7 @@ public void testTrackerStateUpdateOnSearchShardTaskCompletion() {
for (int i = 0; i < 100; i++) {
service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, 200));
}
assertEquals(100, service.getSearchBackpressureStats(SearchShardTask.class).getCompletionCount());
assertEquals(100, service.getSearchBackpressureState(SearchShardTask.class).getCompletionCount());
verify(mockTaskResourceUsageTracker, times(100)).update(any());
}

Expand Down Expand Up @@ -234,19 +234,19 @@ public void testSearchTaskInFlightCancellation() {
// There are 25 SearchTasks eligible for cancellation but only 5 will be cancelled (burst limit).
service.doRun();
verify(mockTaskManager, times(5)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
assertEquals(1, service.getSearchBackpressureStats(SearchTask.class).getLimitReachedCount());
assertEquals(1, service.getSearchBackpressureState(SearchTask.class).getLimitReachedCount());

// If the clock or completed task count haven't made sufficient progress, we'll continue to be rate-limited.
service.doRun();
verify(mockTaskManager, times(5)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
assertEquals(2, service.getSearchBackpressureStats(SearchTask.class).getLimitReachedCount());
assertEquals(2, service.getSearchBackpressureState(SearchTask.class).getLimitReachedCount());

// Fast-forward the clock by ten second to replenish some tokens.
// This will add 50 tokens (time delta * rate) to 'rateLimitPerTime' but it will cancel only 5 tasks (burst limit).
mockTime.addAndGet(TimeUnit.SECONDS.toNanos(10));
service.doRun();
verify(mockTaskManager, times(10)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
assertEquals(3, service.getSearchBackpressureStats(SearchTask.class).getLimitReachedCount());
assertEquals(3, service.getSearchBackpressureState(SearchTask.class).getLimitReachedCount());

// Verify search backpressure stats.
SearchBackpressureStats expectedStats = new SearchBackpressureStats(
Expand Down Expand Up @@ -306,12 +306,12 @@ public void testSearchShardTaskInFlightCancellation() {
// There are 15 SearchShardTasks eligible for cancellation but only 10 will be cancelled (burst limit).
service.doRun();
verify(mockTaskManager, times(10)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
assertEquals(1, service.getSearchBackpressureStats(SearchShardTask.class).getLimitReachedCount());
assertEquals(1, service.getSearchBackpressureState(SearchShardTask.class).getLimitReachedCount());

// If the clock or completed task count haven't made sufficient progress, we'll continue to be rate-limited.
service.doRun();
verify(mockTaskManager, times(10)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
assertEquals(2, service.getSearchBackpressureStats(SearchShardTask.class).getLimitReachedCount());
assertEquals(2, service.getSearchBackpressureState(SearchShardTask.class).getLimitReachedCount());

// Simulate task completion to replenish some tokens.
// This will add 2 tokens (task count delta * cancellationRatio) to 'rateLimitPerTaskCompletion'.
Expand All @@ -320,7 +320,7 @@ public void testSearchShardTaskInFlightCancellation() {
}
service.doRun();
verify(mockTaskManager, times(12)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
assertEquals(3, service.getSearchBackpressureStats(SearchShardTask.class).getLimitReachedCount());
assertEquals(3, service.getSearchBackpressureState(SearchShardTask.class).getLimitReachedCount());

// Verify search backpressure stats.
SearchBackpressureStats expectedStats = new SearchBackpressureStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task
}

@Override
public Stats stats(List<? extends Task> searchShardTasks) {
public Stats stats(List<? extends Task> activeTasks) {
return null;
}
};
Expand Down

0 comments on commit 1757843

Please sign in to comment.