Skip to content

Commit

Permalink
Support task resource tracking in OpenSearch (#2639)
Browse files Browse the repository at this point in the history
* Add Task id in Thread Context

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Add resource tracking update support for tasks

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* List tasks action support for task resource refresh

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Handle task unregistration case on same thread

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Add lazy initialisation for RunnableTaskExecutionListener

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Segregate resource tracking logic to a separate service.

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Check for running threads during task unregister

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Moved thread context logic to resource tracking service

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* preserve task id in thread context even after stash

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Add null check for resource tracking service

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Tracking service tests and minor refactoring

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Preserve task id fix with test

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Minor test changes and Task tracking call update

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Fix Auto Queue executor method's signature

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Make task runnable task listener factory implement consumer

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Use reflection for ThreadMXBean

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Formatting

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Replace RunnableTaskExecutionListenerFactory with AtomicReference

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Revert "Use reflection for ThreadMXBean"

This reverts commit cbcf3c525bf516fb7164f0221491a7b25c1f96ec.

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Suppress Warning related to ThreadMXBean

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Add separate method for task resource tracking supported check

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Enabled setting by default

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Add debug logs for stale context id

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Remove hardcoded task overhead in tests

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Bump stale task id in thread context log level to warn

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

* Improve assertions and logging

Signed-off-by: Tushar Kharbanda <tushar.kharbanda72@gmail.com>

Co-authored-by: Tushar Kharbanda <tkharban@amazon.com>
  • Loading branch information
tushar-kharbanda72 and Tushar Kharbanda authored Apr 21, 2022
1 parent e9ad90b commit 6517eec
Show file tree
Hide file tree
Showing 30 changed files with 1,421 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,9 @@ public void onTaskUnregistered(Task task) {}

@Override
public void waitForTaskCompletion(Task task) {}

@Override
public void taskExecutionStarted(Task task, Boolean closeableInvoked) {}
});
}
// Need to run the task in a separate thread because node client's .execute() is blocked by our task listener
Expand Down Expand Up @@ -651,6 +654,9 @@ public void waitForTaskCompletion(Task task) {
waitForWaitingToStart.countDown();
}

@Override
public void taskExecutionStarted(Task task, Boolean closeableInvoked) {}

@Override
public void onTaskRegistered(Task task) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -60,8 +61,15 @@ public static long waitForCompletionTimeout(TimeValue timeout) {

private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30);

private final TaskResourceTrackingService taskResourceTrackingService;

@Inject
public TransportListTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
public TransportListTasksAction(
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
TaskResourceTrackingService taskResourceTrackingService
) {
super(
ListTasksAction.NAME,
clusterService,
Expand All @@ -72,6 +80,7 @@ public TransportListTasksAction(ClusterService clusterService, TransportService
TaskInfo::new,
ThreadPool.Names.MANAGEMENT
);
this.taskResourceTrackingService = taskResourceTrackingService;
}

@Override
Expand Down Expand Up @@ -101,6 +110,8 @@ protected void processTasks(ListTasksRequest request, Consumer<Task> operation)
}
taskManager.waitForTaskCompletion(task, timeoutNanos);
});
} else {
operation = operation.andThen(taskResourceTrackingService::refreshResourceStats);
}
super.processTasks(request, operation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public SearchShardTask(long id, String type, String action, String description,
super(id, type, action, description, parentTaskId, headers);
}

@Override
public boolean supportsResourceTracking() {
return true;
}

@Override
public boolean shouldCancelChildrenOnCancellation() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public final String getDescription() {
return descriptionSupplier.get();
}

@Override
public boolean supportsResourceTracking() {
return true;
}

/**
* Attach a {@link SearchProgressListener} to this task.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.action.ActionResponse;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancelledException;
import org.opensearch.tasks.TaskId;
Expand Down Expand Up @@ -88,31 +89,39 @@ public final Task execute(Request request, ActionListener<Response> listener) {
*/
final Releasable unregisterChildNode = registerChildNode(request.getParentTask());
final Task task;

try {
task = taskManager.register("transport", actionName, request);
} catch (TaskCancelledException e) {
unregisterChildNode.close();
throw e;
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(response);

ThreadContext.StoredContext storedContext = taskManager.taskExecutionStarted(task);
try {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(response);
}
}
}

@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(e);
@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(e);
}
}
}
});
});
} finally {
storedContext.close();
}

return task;
}

Expand All @@ -129,25 +138,30 @@ public final Task execute(Request request, TaskListener<Response> listener) {
unregisterChildNode.close();
throw e;
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(task, response);
ThreadContext.StoredContext storedContext = taskManager.taskExecutionStarted(task);
try {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(task, response);
}
}
}

@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(task, e);
@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(task, e);
}
}
}
});
});
} finally {
storedContext.close();
}
return task;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.opensearch.script.ScriptMetadata;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.TaskResultsService;

import java.util.ArrayList;
Expand Down Expand Up @@ -394,6 +395,7 @@ protected void configure() {
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(TaskResultsService.class).asEagerSingleton();
bind(TaskResourceTrackingService.class).asEagerSingleton();
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.opensearch.action.admin.indices.close.TransportCloseIndexAction;
Expand Down Expand Up @@ -568,7 +569,8 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS,
ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT,
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS,
IndexingPressure.MAX_INDEXING_BYTES
IndexingPressure.MAX_INDEXING_BYTES,
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.node.Node;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.threadpool.TaskAwareRunnable;

import java.util.List;
import java.util.Optional;
Expand All @@ -55,6 +57,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

public class OpenSearchExecutors {
Expand Down Expand Up @@ -172,14 +175,39 @@ public static OpenSearchThreadPoolExecutor newFixed(
);
}

public static OpenSearchThreadPoolExecutor newAutoQueueFixed(
String name,
int size,
int initialQueueCapacity,
int minQueueSize,
int maxQueueSize,
int frameSize,
TimeValue targetedResponseTime,
ThreadFactory threadFactory,
ThreadContext contextHolder
) {
return newAutoQueueFixed(
name,
size,
initialQueueCapacity,
minQueueSize,
maxQueueSize,
frameSize,
targetedResponseTime,
threadFactory,
contextHolder,
null
);
}

/**
* Return a new executor that will automatically adjust the queue size based on queue throughput.
*
* @param size number of fixed threads to use for executing tasks
* @param size number of fixed threads to use for executing tasks
* @param initialQueueCapacity initial size of the executor queue
* @param minQueueSize minimum queue size that the queue can be adjusted to
* @param maxQueueSize maximum queue size that the queue can be adjusted to
* @param frameSize number of tasks during which stats are collected before adjusting queue size
* @param minQueueSize minimum queue size that the queue can be adjusted to
* @param maxQueueSize maximum queue size that the queue can be adjusted to
* @param frameSize number of tasks during which stats are collected before adjusting queue size
*/
public static OpenSearchThreadPoolExecutor newAutoQueueFixed(
String name,
Expand All @@ -190,7 +218,8 @@ public static OpenSearchThreadPoolExecutor newAutoQueueFixed(
int frameSize,
TimeValue targetedResponseTime,
ThreadFactory threadFactory,
ThreadContext contextHolder
ThreadContext contextHolder,
AtomicReference<RunnableTaskExecutionListener> runnableTaskListener
) {
if (initialQueueCapacity <= 0) {
throw new IllegalArgumentException(
Expand All @@ -201,6 +230,17 @@ public static OpenSearchThreadPoolExecutor newAutoQueueFixed(
ConcurrentCollections.<Runnable>newBlockingQueue(),
initialQueueCapacity
);

Function<Runnable, WrappedRunnable> runnableWrapper;
if (runnableTaskListener != null) {
runnableWrapper = (runnable) -> {
TaskAwareRunnable taskAwareRunnable = new TaskAwareRunnable(contextHolder, runnable, runnableTaskListener);
return new TimedRunnable(taskAwareRunnable);
};
} else {
runnableWrapper = TimedRunnable::new;
}

return new QueueResizingOpenSearchThreadPoolExecutor(
name,
size,
Expand All @@ -210,7 +250,7 @@ public static OpenSearchThreadPoolExecutor newAutoQueueFixed(
queue,
minQueueSize,
maxQueueSize,
TimedRunnable::new,
runnableWrapper,
frameSize,
targetedResponseTime,
threadFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE;
import static org.opensearch.tasks.TaskResourceTrackingService.TASK_ID;

/**
* A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with
Expand Down Expand Up @@ -134,16 +135,23 @@ public StoredContext stashContext() {
* This is needed so the DeprecationLogger in another thread can see the value of X-Opaque-ID provided by a user.
* Otherwise when context is stash, it should be empty.
*/

ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT;

if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putHeaders(
threadContextStruct = threadContextStruct.putHeaders(
MapBuilder.<String, String>newMapBuilder()
.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID))
.immutableMap()
);
threadLocal.set(threadContextStruct);
} else {
threadLocal.set(DEFAULT_CONTEXT);
}

if (context.transientHeaders.containsKey(TASK_ID)) {
threadContextStruct = threadContextStruct.putTransient(TASK_ID, context.transientHeaders.get(TASK_ID));
}

threadLocal.set(threadContextStruct);

return () -> {
// If the node and thus the threadLocal get closed while this task
// is still executing, we don't want this runnable to fail with an
Expand Down
Loading

0 comments on commit 6517eec

Please sign in to comment.