-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] Fork trained model stats work to thread pool threads #99439
Conversation
The steps that assemble the trained model stats response largely ran on the transport worker thread. Even though none of the steps look massively expensive, it's likely one of them is, because we've observed client lockups (Kibana) while trained model stats is running. This change moves the steps into the ML utility thread pool to avoid any risk of blocking the transport thread.
Pinging @elastic/ml-core (Team:ML) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe consider using org.elasticsearch.action.support.SubscribableListener#addListener(org.elasticsearch.action.ActionListener<T>, java.util.concurrent.Executor, org.elasticsearch.common.util.concurrent.ThreadContext)
rather than explicitly forking on success? Seems a little neater, and also this pattern handles exceptions thrown by the forked task (and forking failures) better than the code as written here.
(Also, unrelated to this change, but do you really need the exception-mangling of ListenableFuture
or would SubscribableListener
be a better choice?)
matchedDeploymentIds, | ||
idsListener | ||
); | ||
executor.execute(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather we used an ActionRunnable
here too for better exception-safety and also in case executor
ever became something that might reject tasks on shutdown/overload:
executor.execute(() -> { | |
executor.execute(ActionRunnable.wrap(idsListener, l -> { |
(also avoiding capture of idsListener
below)
Also (just one more thing!) consider sprinkling a few |
|
||
ListenableFuture<NodesStatsResponse> nodesStatsListener = new ListenableFuture<>(); | ||
nodesStatsListener.addListener(listener.delegateFailureAndWrap((delegate, nodesStatsResponse) -> { | ||
}).<List<InferenceStats>>andThen(executor, null, (l, nodesStatsResponse) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DaveCTurner would you expect that this line would run the code of the lambda that starts on this line (the one containing the call to pipelineIdsByResource
) using executor
?
Or is executor
used to run the next step after this one?
Currently pipelineIdsByResource
is getting called on the transport_worker
thread:
[2024-03-19T19:12:09,189][FATAL][o.e.x.m.u.InferenceProcessorInfoExtractor] [yamlRestTest-0] should not be here
java.lang.AssertionError: Expected current thread [Thread[#124,elasticsearch[yamlRestTest-0][transport_worker][T#9],5,main]] to not be a transport thread. Reason: [non-trivial nested loops over cluster state structures]
at org.elasticsearch.transport.Transports.assertNotTransportThread(Transports.java:66) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.ml.utils.InferenceProcessorInfoExtractor.pipelineIdsByResource(InferenceProcessorInfoExtractor.java:81) ~[?:?]
at org.elasticsearch.xpack.ml.action.TransportGetTrainedModelsStatsAction.lambda$doExecute$4(TransportGetTrainedModelsStatsAction.java:166) ~[?:?]
at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:245) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.support.SubscribableListener$SuccessResult.complete(SubscribableListener.java:366) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.support.SubscribableListener.tryComplete(SubscribableListener.java:286) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.support.SubscribableListener.addListener(SubscribableListener.java:189) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.support.SubscribableListener.lambda$andThen$0(SubscribableListener.java:437) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.ActionListener.run(ActionListener.java:356) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.support.SubscribableListener.newForked(SubscribableListener.java:128) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.support.SubscribableListener.andThen(SubscribableListener.java:437) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.ml.action.TransportGetTrainedModelsStatsAction.doExecute(TransportGetTrainedModelsStatsAction.java:156) ~[?:?]
at org.elasticsearch.xpack.ml.action.TransportGetTrainedModelsStatsAction.doExecute(TransportGetTrainedModelsStatsAction.java:79) ~[?:?]
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:96) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.support.ActionFilter$Simple.apply(ActionFilter.java:53) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:93) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.lambda$applyInternal$3(SecurityActionFilter.java:165) ~[?:?]
at org.elasticsearch.action.ActionListenerImplementations$DelegatingFailureActionListener.onResponse(ActionListenerImplementations.java:217) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService.lambda$authorizeAction$8(AuthorizationService.java:456) ~[?:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService$AuthorizationResultListener.onResponse(AuthorizationService.java:1027) ~[?:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService$AuthorizationResultListener.onResponse(AuthorizationService.java:993) ~[?:?]
at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:32) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService.lambda$authorizeAction$9(AuthorizationService.java:470) ~[?:?]
at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:245) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.security.authz.RBACEngine.authorizeClusterAction(RBACEngine.java:188) ~[?:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService.authorizeAction(AuthorizationService.java:460) ~[?:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService.maybeAuthorizeRunAs(AuthorizationService.java:436) ~[?:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService.lambda$authorize$3(AuthorizationService.java:323) ~[?:?]
at org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:171) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:32) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.security.authz.RBACEngine.lambda$resolveAuthorizationInfo$0(RBACEngine.java:153) ~[?:?]
at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:245) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.security.authz.store.CompositeRolesStore.lambda$getRoles$4(CompositeRolesStore.java:193) ~[?:?]
at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:245) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.security.authz.store.CompositeRolesStore.lambda$getRole$5(CompositeRolesStore.java:211) ~[?:?]
at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:245) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.core.security.authz.store.RoleReferenceIntersection.lambda$buildRole$0(RoleReferenceIntersection.java:49) ~[?:?]
at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:245) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.support.GroupedActionListener.onResponse(GroupedActionListener.java:56) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.security.authz.store.CompositeRolesStore.buildRoleFromRoleReference(CompositeRolesStore.java:291) ~[?:?]
at org.elasticsearch.xpack.core.security.authz.store.RoleReferenceIntersection.lambda$buildRole$1(RoleReferenceIntersection.java:53) ~[?:?]
at java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
at org.elasticsearch.xpack.core.security.authz.store.RoleReferenceIntersection.buildRole(RoleReferenceIntersection.java:53) ~[?:?]
at org.elasticsearch.xpack.security.authz.store.CompositeRolesStore.getRole(CompositeRolesStore.java:209) ~[?:?]
at org.elasticsearch.xpack.security.authz.store.CompositeRolesStore.getRoles(CompositeRolesStore.java:186) ~[?:?]
at org.elasticsearch.xpack.security.authz.RBACEngine.resolveAuthorizationInfo(RBACEngine.java:149) ~[?:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService.authorize(AuthorizationService.java:339) ~[?:?]
at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.lambda$applyInternal$4(SecurityActionFilter.java:161) ~[?:?]
at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:245) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.ActionListenerImplementations$MappedActionListener.onResponse(ActionListenerImplementations.java:95) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.security.authc.AuthenticatorChain.authenticate(AuthenticatorChain.java:93) ~[?:?]
at org.elasticsearch.xpack.security.authc.AuthenticationService.authenticate(AuthenticationService.java:264) ~[?:?]
at org.elasticsearch.xpack.security.authc.AuthenticationService.authenticate(AuthenticationService.java:173) ~[?:?]
at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.applyInternal(SecurityActionFilter.java:157) ~[?:?]
at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.apply(SecurityActionFilter.java:114) ~[?:?]
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:93) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:68) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.tasks.TaskManager.registerAndExecute(TaskManager.java:196) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.client.internal.node.NodeClient.executeLocally(NodeClient.java:105) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.rest.action.RestCancellableNodeClient.doExecute(RestCancellableNodeClient.java:81) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.client.internal.support.AbstractClient.execute(AbstractClient.java:356) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.ml.rest.inference.RestGetTrainedModelsStatsAction.lambda$prepareRequest$0(RestGetTrainedModelsStatsAction.java:66) ~[?:?]
at org.elasticsearch.rest.BaseRestHandler.handleRequest(BaseRestHandler.java:106) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.rest.RestController$1.onResponse(RestController.java:452) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.rest.RestController$1.onResponse(RestController.java:446) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.security.rest.SecurityRestFilter.doHandleRequest(SecurityRestFilter.java:89) ~[?:?]
at org.elasticsearch.xpack.security.rest.SecurityRestFilter.lambda$intercept$0(SecurityRestFilter.java:81) ~[?:?]
at org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:171) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.security.authc.support.SecondaryAuthenticator.lambda$authenticateAndAttachToContext$3(SecondaryAuthenticator.java:99) ~[?:?]
at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:245) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.xpack.security.authc.support.SecondaryAuthenticator.authenticate(SecondaryAuthenticator.java:109) ~[?:?]
at org.elasticsearch.xpack.security.authc.support.SecondaryAuthenticator.authenticateAndAttachToContext(SecondaryAuthenticator.java:90) ~[?:?]
at org.elasticsearch.xpack.security.rest.SecurityRestFilter.intercept(SecurityRestFilter.java:75) ~[?:?]
at org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:446) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.rest.RestController.tryAllHandlers(RestController.java:606) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:329) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.http.AbstractHttpServerTransport.dispatchRequest(AbstractHttpServerTransport.java:465) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.http.AbstractHttpServerTransport.handleIncomingRequest(AbstractHttpServerTransport.java:561) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.http.AbstractHttpServerTransport.incomingRequest(AbstractHttpServerTransport.java:438) ~[elasticsearch-8.14.0-SNAPSHOT.jar:?]
at org.elasticsearch.http.netty4.Netty4HttpPipeliningHandler.handlePipelinedRequest(Netty4HttpPipeliningHandler.java:126) ~[?:?]
at org.elasticsearch.http.netty4.Netty4HttpPipeliningHandler.channelRead(Netty4HttpPipeliningHandler.java:116) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[?:?]
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
at org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.forwardData(Netty4HttpHeaderValidator.java:194) ~[?:?]
at org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.forwardFullRequest(Netty4HttpHeaderValidator.java:137) ~[?:?]
at org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.lambda$requestStart$1(Netty4HttpHeaderValidator.java:120) ~[?:?]
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) ~[?:?]
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:106) ~[?:?]
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[?:?]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) ~[?:?]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[?:?]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) ~[?:?]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[?:?]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[?:?]
at java.lang.Thread.run(Thread.java:1583) ~[?:?]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use executor
for this step, unless the step executes immediately. If the previous step is complete when andThen
is called, this step executes immediately on the calling thread.
You probably want to fork the whole thing onto executor
first so that all the andThen
calls run on executor
too. You then only need to specify executor
if the previous step might complete on some other thread.
See the Javadocs for andThen
for more info (or perhaps the same info worded differently):
elasticsearch/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java
Lines 427 to 430 in cf05e28
* The threading of the {@code nextStep} callback is the same as for listeners added with {@link #addListener}: if this listener is | |
* already complete then {@code nextStep} is invoked on the thread calling {@link #andThen} and in its thread context, but if this | |
* listener is incomplete then {@code nextStep} is invoked using {@code executor}, in a thread context captured when {@link #andThen} | |
* was called. |
|
||
ListenableFuture<NodesStatsResponse> nodesStatsListener = new ListenableFuture<>(); | ||
nodesStatsListener.addListener(listener.delegateFailureAndWrap((delegate, nodesStatsResponse) -> { | ||
}).<List<InferenceStats>>andThen(executor, null, (l, nodesStatsResponse) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use executor
for this step, unless the step executes immediately. If the previous step is complete when andThen
is called, this step executes immediately on the calling thread.
You probably want to fork the whole thing onto executor
first so that all the andThen
calls run on executor
too. You then only need to specify executor
if the previous step might complete on some other thread.
See the Javadocs for andThen
for more info (or perhaps the same info worded differently):
elasticsearch/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java
Lines 427 to 430 in cf05e28
* The threading of the {@code nextStep} callback is the same as for listeners added with {@link #addListener}: if this listener is | |
* already complete then {@code nextStep} is invoked on the thread calling {@link #andThen} and in its thread context, but if this | |
* listener is incomplete then {@code nextStep} is invoked using {@code executor}, in a thread context captured when {@link #andThen} | |
* was called. |
int numberOfAllocations = deploymentStats.getStats().results().stream().mapToInt(AssignmentStats::getNumberOfAllocations).sum(); | ||
modelSizeStats( | ||
responseBuilder.getExpandedModelIdsWithAliases(), | ||
SubscribableListener.<String>newForked(l -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend adding a blank line (or possibly a comment) here, it makes everything else format so much more nicely:
SubscribableListener.<String>newForked(l -> { | |
SubscribableListener | |
.<String>newForked(l -> { |
I left a suggested implementation in droberts195#1. |
Yes, I did look at those Javadocs, but for some reason didn't twig why the specified executor wasn't being used in my case. 🤦 After having it described again in different words I understand now.
Yes, you're right, that's the best thing to do. Thanks for making that change and all the other tweaks too. |
Hopefully #106532 helps here |
It's likely that it was the step that iterates all ingest pipelines to check for inference ingest processors that reference trained models. Probably the cluster where we saw the lockup had hundreds of ingest pipelines. (It's too long ago to be sure now though.) I've added an assertion to this method that it doesn't run on a transport thread. |
The steps that assemble the trained model stats response largely ran on the transport worker thread. Even though none of the steps look massively expensive, it's likely one of them is, because we've observed client lockups (Kibana) while trained model stats is running. This change moves the steps into the ML utility thread pool to avoid any risk of blocking the transport thread.