Skip to content

Commit

Permalink
create per request listener to refresh task resource usage
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy authored and deshsidd committed Jul 18, 2024
1 parent 5ff5485 commit c16727e
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public SearchRequestOperationsListener.CompositeListener buildCompositeListener(
final SearchRequestOperationsListener... perRequestListeners
) {
final List<SearchRequestOperationsListener> searchListenersList = Stream.concat(
searchRequestListenersList.stream(),
Arrays.stream(perRequestListeners)
Arrays.stream(perRequestListeners),
searchRequestListenersList.stream()
)
.filter((searchRequestOperationsListener -> searchRequestOperationsListener.isEnabled(searchRequest)))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SearchTaskResourceOperationsListener;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
Expand Down Expand Up @@ -433,7 +434,8 @@ private void executeRequest(
requestOperationsListeners = searchRequestOperationsCompositeListenerFactory.buildCompositeListener(
originalSearchRequest,
logger,
TraceableSearchRequestOperationsListener.create(tracer, requestSpan)
TraceableSearchRequestOperationsListener.create(tracer, requestSpan),
new SearchTaskResourceOperationsListener(taskResourceTrackingService)
);
SearchRequestContext searchRequestContext = new SearchRequestContext(
requestOperationsListeners,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.node.Node;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -93,7 +92,6 @@ public class ClusterService extends AbstractLifecycleComponent {
private RerouteService rerouteService;

private IndexingPressureService indexingPressureService;
private TaskResourceTrackingService taskResourceTrackingService;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE));
Expand Down Expand Up @@ -267,24 +265,6 @@ public IndexingPressureService getIndexingPressureService() {
return indexingPressureService;
}

/**
* Getter for {@link TaskResourceTrackingService}, This method exposes task level resource usage for other components to use.
*
* @return TaskResourceTrackingService
*/
public TaskResourceTrackingService getTaskResourceTrackingService() {
return taskResourceTrackingService;
}

/**
* Setter for {@link TaskResourceTrackingService}
*
* @param taskResourceTrackingService taskResourceTrackingService
*/
public void setTaskResourceTrackingService(TaskResourceTrackingService taskResourceTrackingService) {
this.taskResourceTrackingService = taskResourceTrackingService;
}

public ClusterApplierService getClusterApplierService() {
return clusterApplierService;
}
Expand Down
1 change: 0 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,6 @@ protected Node(
clusterService.getClusterSettings(),
threadPool
);
clusterService.setTaskResourceTrackingService(taskResourceTrackingService);

final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings(
settings,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchRequestOperationsListener;

/**
* SearchTaskResourceOperationsListener subscriber for operations on search tasks resource usages
*
* @opensearch.internal
*/
public final class SearchTaskResourceOperationsListener extends SearchRequestOperationsListener {
private final TaskResourceTrackingService taskResourceTrackingService;

public SearchTaskResourceOperationsListener(TaskResourceTrackingService taskResourceTrackingService) {
this.taskResourceTrackingService = taskResourceTrackingService;
}

@Override
protected void onPhaseStart(SearchPhaseContext context) {}

@Override
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}

@Override
public void onRequestStart(SearchRequestContext searchRequestContext) {}

@Override
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
taskResourceTrackingService.refreshResourceStats(context.getTask());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -52,7 +51,6 @@
/**
* Service that helps track resource usage of tasks running on a node.
*/
@PublicApi(since = "2.16.0")
@SuppressForbidden(reason = "ThreadMXBean#getThreadAllocatedBytes")
public class TaskResourceTrackingService implements RunnableTaskExecutionListener {

Expand Down Expand Up @@ -359,7 +357,6 @@ public TaskResourceInfo getTaskResourceUsageFromThreadContext() {
/**
* Listener that gets invoked when a task execution completes.
*/
@PublicApi(since = "2.16.0")
public interface TaskCompletionListener {
void onTaskCompleted(Task task);
}
Expand Down

0 comments on commit c16727e

Please sign in to comment.