-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Search Task Resource Tracking PoC #1643
Search Task Resource Tracking PoC #1643
Conversation
Can one of the admins verify this patch? |
✅ Gradle Wrapper Validation success a439a4d |
@@ -283,6 +287,10 @@ public void innerOnResponse(Result result) { | |||
} finally { | |||
executeNext(pendingExecutions, thread); | |||
} | |||
ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is expensive. Move this to a static final variable
@@ -283,6 +287,10 @@ public void innerOnResponse(Result result) { | |||
} finally { | |||
executeNext(pendingExecutions, thread); | |||
} | |||
ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean(); | |||
long bytes = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it is worth checking if thread allocation tracking is enabled & supported, for not doing unnecessary work: ThreadMXBean::isThreadAllocatedMemorySupported()
and ThreadMXBean::isThreadAllocatedMemoryEnabled()
*/ | ||
private TaskResourceTracker() { | ||
taskMap = new ConcurrentHashMap<>(); | ||
threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... and we could remove direct usage of theThreadMXBean
from all other places, just fence it behind TaskResourceTracker
// just register read operations | ||
if (action.startsWith("indices:data/read")) { | ||
if (threadContext.getTransient("TASK_ID") == null) { | ||
threadContext.putTransient("TASK_ID", String.valueOf(task.getId())); | ||
|
||
List<String> indices = new ArrayList<>(); | ||
if (request instanceof IndicesRequest) { | ||
indices = Arrays.asList(((IndicesRequest) request).indices()); | ||
} | ||
// TODO Add shard id handling | ||
TaskResourceTracker.getInstance().registerTaskForTracking(task.getId(), indices, null, action); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't this be modelled as a TaskListener
import java.util.List; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
public class TaskResourceTracker implements ResourceWatcher { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add java docs for all new classes
public void registerTaskForTracking(long taskId, List<String> indices, ShardId shardId, String actionName) { | ||
taskMap.put(new TaskInfoKey(taskId, indices, shardId, actionName), new ArrayList<>()); | ||
} | ||
|
||
public void registerWorkerForTask(long taskId, long workerId, long cpuCurrent, long bytesCurrent, String threadpoolName) { | ||
// TODO remove this after identifying cases where it can be true | ||
if (taskMap.get(new TaskInfoKey(taskId)) == null) { | ||
return; | ||
} | ||
|
||
TaskWorkerResourceUtilInfo taskWorkerResourceUtilInfo = | ||
new TaskWorkerResourceUtilInfo(workerId, cpuCurrent, cpuCurrent, bytesCurrent, bytesCurrent, | ||
true, threadpoolName); | ||
|
||
taskMap.get(new TaskInfoKey(taskId)).add(taskWorkerResourceUtilInfo); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above maybe model as a TaskListener?
public TaskInfoKey(long taskId, List<String> indices, ShardId shardId, String action) { | ||
this.taskId = taskId; | ||
this.indices = indices; | ||
this.shardId = shardId; | ||
this.action = action; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task is so far not bound to a ShardId, this should be more generic
ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean(); | ||
long bytesStart = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThreadMXBean is spread all over the place. Lets simplify
|
||
// long bytesEnd = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId()); | ||
if (response instanceof SearchPhaseResult) { | ||
// TaskResourceTracker.getInstance().registerResponseOverhead(((SearchPhaseResult) response).getShardSearchRequest().getParentTask().getId(), bytesEnd - bytesStart); | ||
TaskResourceTracker.getInstance().registerResponseOverhead1(response, bytesStart); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why place it in InboundHandler
import org.opensearch.ExceptionsHelper; | ||
import org.opensearch.tasks.TaskResourceTracker; | ||
|
||
public class ResourceRunnable extends AbstractRunnable implements WrappedRunnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add java docs to explain how threads running cost is computed and associated with the corresponding task
@@ -431,6 +432,7 @@ protected Node( | |||
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); | |||
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool); | |||
resourcesToClose.add(resourceWatcherService); | |||
resourceWatcherService.add(TaskResourceTracker.getInstance(), ResourceWatcherService.Frequency.HIGH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to watch per 5s, seems wasteful. Instead it should be tied to the overall usage subject to high utilization beyond a threshold or individual task level resource utilization or on-demand
|
||
TaskResourceTracker.getInstance().transfer(task.getId(), result, bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be too hard to maintain the code base with this construct. Lets simplify
if ("tw".equals(taskWorkerResourceUtilInfo.getThreadPoolName())) { | ||
response[0] += taskWorkerResourceUtilInfo.getOverheardBytes(); | ||
} else { | ||
search[0] += taskWorkerResourceUtilInfo.getHeapNow() - taskWorkerResourceUtilInfo.getHeapStart(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transport worker is a very critical thread. I'll advice against any custom logic for it.
public void transfer(long taskId, Object ob, long bytes) { | ||
TaskInfoKey key = new TaskInfoKey(taskId); | ||
if (!overhead.containsKey(ob) || taskMap.get(key) == null) return; | ||
|
||
long bytesStart = overhead.get(ob); | ||
|
||
TaskWorkerResourceUtilInfo t = new TaskWorkerResourceUtilInfo(1L, 0L, 0L, bytesStart, bytes, false, "tw"); | ||
taskMap.get(key).add(t); | ||
overhead.remove(ob); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please elaborate on this logic.
taskMap.forEach((taskInfoKey, taskWorkerResourceUtilInfos) -> taskWorkerResourceUtilInfos.forEach(taskWorkerResourceUtilInfo -> { | ||
if (taskWorkerResourceUtilInfo.isActive()) { | ||
taskWorkerResourceUtilInfo.setHeapNow(threadMXBean.getThreadAllocatedBytes(taskWorkerResourceUtilInfo.getWorkerId())); | ||
} | ||
if ("tw".equals(taskWorkerResourceUtilInfo.getThreadPoolName())) { | ||
response[0] += taskWorkerResourceUtilInfo.getOverheardBytes(); | ||
} else { | ||
search[0] += taskWorkerResourceUtilInfo.getHeapNow() - taskWorkerResourceUtilInfo.getHeapStart(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the optimized batch API instead
https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/ThreadMXBean.html#getThreadAllocatedBytes(long[])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest you use the below static assignment for optimizing the overhead of ThreadMXBean
+ static {
+ threadMXBean = ManagementFactory.getThreadMXBean();
+ Method getBytes;
+ try {
+ getBytes = threadMXBean.getClass()
+ .getMethod("getThreadAllocatedBytes", long[].class);
+ getBytes.setAccessible(true);
+ } catch (NoSuchMethodException e) {
+ getBytes = null;
+ }
+ getThreadAllocatedBytes = getBytes;
+ }
@@ -150,6 +153,20 @@ public Task register(String type, String action, TaskAwareRequest request) { | |||
logger.trace("register {} [{}] [{}] [{}]", task.getId(), type, action, task.getDescription()); | |||
} | |||
|
|||
// just register read operations | |||
if (action.startsWith("indices:data/read")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to enrich action with something like isResourceTrackingEnabled()
and use it as an indicator of the need to track resources. Also, the tracking key (in this case, indices
, but is action specific) has to be provided by the action as as well, fe as getResourceTrackingKey
method.
public void registerWorkerForTask(long taskId, long workerId, long cpuCurrent, long bytesCurrent, String threadpoolName) { | ||
// TODO remove this after identifying cases where it can be true | ||
if (taskMap.get(new TaskInfoKey(taskId)) == null) { | ||
return; | ||
} | ||
|
||
TaskWorkerResourceUtilInfo taskWorkerResourceUtilInfo = | ||
new TaskWorkerResourceUtilInfo(workerId, cpuCurrent, cpuCurrent, bytesCurrent, bytesCurrent, | ||
true, threadpoolName); | ||
|
||
taskMap.get(new TaskInfoKey(taskId)).add(taskWorkerResourceUtilInfo); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could be concurrency bugs with these checks between the two gets
TaskInfoKey key = new TaskInfoKey(taskId); | ||
if (!overhead.containsKey(ob) || taskMap.get(key) == null) return; | ||
|
||
long bytesStart = overhead.get(ob); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NPE here, if the key gets removed in between check and usage. Use Long
instead
@@ -74,6 +88,8 @@ public Stats(StreamInput in) throws IOException { | |||
rejected = in.readLong(); | |||
largest = in.readInt(); | |||
completed = in.readLong(); | |||
bytes = in.readLong(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please wrap in version checks:
if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
bytes = in.readLong();
ro = in.readLong();
}
@@ -85,6 +101,8 @@ public void writeTo(StreamOutput out) throws IOException { | |||
out.writeLong(rejected); | |||
out.writeInt(largest); | |||
out.writeLong(completed); | |||
out.writeLong(bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
out.writeLong(bytes);
out.writeLong(ro);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Tushar, this is good stuff, I like the thought you have put into this espl this being your first contribution.
Thanks @reta and @Bukhtawar for taking out time and reviewing this PoC code. I have started thinking more on the final design for this and divided this into 4 to 5 meaningful chunks. Will start raising PRs for prod ready code from next week (making sure these comments are addressed as well). |
@tushar-kharbanda72 Want to finish this? |
@dblock Implemented this feature. Closing this POC PR. Feature implementation PRs: |
Description
This PoC is to capture the system resource overhead for Search requests either on data node or coordinator node. It captures all the heap allocations being done by the threads running on search threadpool. It also captures the heap overhead for the responses received on coordinator while it is waiting for other data nodes to respond back (this serialization of response is generally done on transport_worker and doesn't get accounted for search threadpool tracking).
This PoC just aims to validate correctness and performance impact. Post these validations I'll focus more on concrete design aspect of the changes
Issues Resolved
#1179
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.