diff --git a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java index dbe0c494db3f11..3edce2c3227b6f 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java +++ b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java @@ -120,24 +120,12 @@ void putRequest(WorkRequest request) throws IOException { @Override WorkResponse getResponse(int requestId) throws IOException, InterruptedException { recordingInputStream.startRecording(4096); - // Ironically, we don't allow interrupts during dynamic execution, since we can't cancel - // the worker short of destroying it. - if (!workerKey.isSpeculative()) { - while (recordingInputStream.available() == 0) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - // This should only happen when not in dynamic execution, so we can safely kill the - // worker. - destroy(); - throw e; - } - if (!process.isAlive()) { - throw new IOException( - String.format( - "Worker process for %s died while waiting for response", - workerKey.getMnemonic())); - } + while (recordingInputStream.available() == 0) { + Thread.sleep(10); + if (!process.isAlive()) { + throw new IOException( + String.format( + "Worker process for %s died while waiting for response", workerKey.getMnemonic())); } } return workerProtocol.getResponse(); diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java index 86798ec46f86c8..d144e476f362dd 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java @@ -457,6 +457,10 @@ WorkResponse execInWorker( try { response = worker.getResponse(request.getRequestId()); + } catch (InterruptedException e) { + finishWorkAsync(key, worker, request); + worker = null; + throw e; } catch (IOException e) { restoreInterrupt(e); // If protobuf or json reader couldn't parse the response, try to print whatever the @@ -514,6 +518,41 @@ WorkResponse execInWorker( return response; } + /** + * Starts a thread to collect the response from a worker when it's no longer of interest. + * + *
This can happen either when we lost the race in dynamic execution or the build got
+ * interrupted. This takes ownership of the worker for purposes of returning it to the worker
+ * pool.
+ */
+ private void finishWorkAsync(WorkerKey key, Worker worker, WorkRequest request) {
+ Thread reaper =
+ new Thread(
+ () -> {
+ Worker w = worker;
+ try {
+ w.getResponse(request.getRequestId());
+ } catch (IOException | InterruptedException e1) {
+ // If this happens, we either can't trust the output of the worker, or we got
+ // interrupted while handling being interrupted. In the latter case, let's stop
+ // trying and just destroy the worker. If it's a singleplex worker, there will
+ // be a dangling response that we don't want to keep trying to read, so we destroy
+ // the worker.
+ try {
+ workers.invalidateObject(key, w);
+ w = null;
+ } catch (IOException | InterruptedException e2) {
+ // The reaper thread can't do anything useful about this.
+ }
+ } finally {
+ if (w != null) {
+ workers.returnObject(key, w);
+ }
+ }
+ });
+ reaper.start();
+ }
+
private static void restoreInterrupt(IOException e) {
if (e instanceof InterruptedIOException) {
Thread.currentThread().interrupt();
diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java
index 9800f19ab3debc..c79237bab82fd6 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java
@@ -42,9 +42,12 @@
import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.Semaphore;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
/** An example implementation of a worker process that is used for integration tests. */
public final class ExampleWorker {
@@ -144,6 +147,23 @@ private static int doWork(List