Skip to content

Commit

Permalink
[ML] Catch exceptions during pytorch_inference startup (elastic#103873)…
Browse files Browse the repository at this point in the history
… (elastic#103911)

A couple of test failures (elastic#103808 and elastic#103868) reveal that
an exception thrown while connecting to the pytorch_inference
process can be uncaught and hence cause the whole node to stop.

This change does not fix the underlying problem of failure to
connect to the process that those issues relate to, but it
converts the error from one that crashes a whole node to one
that just fails the affected model deployment.
  • Loading branch information
droberts195 committed Jan 4, 2024
1 parent 0eb7b8c commit 90c655d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/103873.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103873
summary: Catch exceptions during `pytorch_inference` startup
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -200,9 +201,18 @@ public void startDeployment(TrainedModelDeploymentTask task, ActionListener<Trai
// here, we are being called back on the searching thread, which MAY be a network thread
// `startAndLoad` creates named pipes, blocking the calling thread, better to execute that in our utility
// executor.
executorServiceForDeployment.execute(
() -> processContext.startAndLoad(modelConfig.getLocation(), modelLoadedListener)
);
executorServiceForDeployment.execute(new AbstractRunnable() {

@Override
public void onFailure(Exception e) {
failedDeploymentListener.onFailure(e);
}

@Override
protected void doRun() {
processContext.startAndLoad(modelConfig.getLocation(), modelLoadedListener);
}
});
}, failedDeploymentListener::onFailure)
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public NativePyTorchProcess createProcess(
process.start(executorService);
} catch (IOException | EsRejectedExecutionException e) {
String msg = "Failed to connect to pytorch process for job " + task.getDeploymentId();
logger.error(msg);
logger.error(msg, e);
try {
IOUtils.close(process);
} catch (IOException ioe) {
Expand Down

0 comments on commit 90c655d

Please sign in to comment.