From f4b5e0233341977aaa76593ca032d9ac4eba7444 Mon Sep 17 00:00:00 2001 From: larsrc Date: Tue, 13 Apr 2021 07:08:24 -0700 Subject: [PATCH] Let workers finish lost races without delaying dynamic execution. If a worker is blocked on reading a response, it doesn't listen for interrupts. This changes blocking on reading to blocking on a sleep-loop, and if interrupted, the worker gets to finish in a separate thread before returning to the pool. This lets the action finish immediately. RELNOTES: None. PiperOrigin-RevId: 368207735 --- .../build/lib/worker/SingleplexWorker.java | 24 +++--------- .../build/lib/worker/WorkerSpawnRunner.java | 39 +++++++++++++++++++ .../build/lib/worker/ExampleWorker.java | 20 ++++++++++ .../lib/worker/ExampleWorkerOptions.java | 8 ++++ .../devtools/build/lib/worker/WorkerTest.java | 5 --- 5 files changed, 73 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java index dbe0c494db3f11..3edce2c3227b6f 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java +++ b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java @@ -120,24 +120,12 @@ void putRequest(WorkRequest request) throws IOException { @Override WorkResponse getResponse(int requestId) throws IOException, InterruptedException { recordingInputStream.startRecording(4096); - // Ironically, we don't allow interrupts during dynamic execution, since we can't cancel - // the worker short of destroying it. - if (!workerKey.isSpeculative()) { - while (recordingInputStream.available() == 0) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - // This should only happen when not in dynamic execution, so we can safely kill the - // worker. - destroy(); - throw e; - } - if (!process.isAlive()) { - throw new IOException( - String.format( - "Worker process for %s died while waiting for response", - workerKey.getMnemonic())); - } + while (recordingInputStream.available() == 0) { + Thread.sleep(10); + if (!process.isAlive()) { + throw new IOException( + String.format( + "Worker process for %s died while waiting for response", workerKey.getMnemonic())); } } return workerProtocol.getResponse(); diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java index 86798ec46f86c8..d144e476f362dd 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java @@ -457,6 +457,10 @@ WorkResponse execInWorker( try { response = worker.getResponse(request.getRequestId()); + } catch (InterruptedException e) { + finishWorkAsync(key, worker, request); + worker = null; + throw e; } catch (IOException e) { restoreInterrupt(e); // If protobuf or json reader couldn't parse the response, try to print whatever the @@ -514,6 +518,41 @@ WorkResponse execInWorker( return response; } + /** + * Starts a thread to collect the response from a worker when it's no longer of interest. + * + *

This can happen either when we lost the race in dynamic execution or the build got + * interrupted. This takes ownership of the worker for purposes of returning it to the worker + * pool. + */ + private void finishWorkAsync(WorkerKey key, Worker worker, WorkRequest request) { + Thread reaper = + new Thread( + () -> { + Worker w = worker; + try { + w.getResponse(request.getRequestId()); + } catch (IOException | InterruptedException e1) { + // If this happens, we either can't trust the output of the worker, or we got + // interrupted while handling being interrupted. In the latter case, let's stop + // trying and just destroy the worker. If it's a singleplex worker, there will + // be a dangling response that we don't want to keep trying to read, so we destroy + // the worker. + try { + workers.invalidateObject(key, w); + w = null; + } catch (IOException | InterruptedException e2) { + // The reaper thread can't do anything useful about this. + } + } finally { + if (w != null) { + workers.returnObject(key, w); + } + } + }); + reaper.start(); + } + private static void restoreInterrupt(IOException e) { if (e instanceof InterruptedIOException) { Thread.currentThread().interrupt(); diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java index 9800f19ab3debc..c79237bab82fd6 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java +++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java @@ -42,9 +42,12 @@ import java.util.Map; import java.util.Random; import java.util.UUID; +import java.util.concurrent.Semaphore; import java.util.function.BiFunction; import java.util.regex.Matcher; import java.util.regex.Pattern; +import sun.misc.Signal; +import sun.misc.SignalHandler; /** An example implementation of a worker process that is used for integration tests. */ public final class ExampleWorker { @@ -144,6 +147,23 @@ private static int doWork(List args, PrintWriter err) { PrintStream originalStdOut = System.out; PrintStream originalStdErr = System.err; + if (workerOptions.waitForSignal) { + Semaphore signalSem = new Semaphore(0); + Signal.handle( + new Signal("HUP"), + new SignalHandler() { + @Override + public void handle(Signal sig) { + signalSem.release(); + } + }); + try { + signalSem.acquire(); + } catch (InterruptedException e) { + System.out.println("Interrupted while waiting for signal"); + e.printStackTrace(); + } + } try (PrintStream ps = new PrintStream(baos)) { System.setOut(ps); System.setErr(ps); diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java index 40d6faa5811b4a..440717916a3fd4 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java +++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java @@ -135,6 +135,14 @@ public static class ExampleWorkOptions extends OptionsBase { ) public boolean hardPoison; + @Option( + name = "wait_for_signal", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.NO_OP}, + defaultValue = "false", + help = "Don't send a response until receiving a SIGXXXX.") + public boolean waitForSignal; + /** Enum converter for --worker_protocol. */ public static class WorkerProtocolEnumConverter extends EnumConverter { diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java index 8a55bef8a90b03..e67aa06429b678 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java @@ -174,11 +174,6 @@ private void verifyGetResponseFailure(String responseString, String expectedErro assertThat(ex).hasMessageThat().contains(expectedError); } - @Test - public void testGetResponse_json_emptyString_throws() throws IOException { - verifyGetResponseFailure("", "Could not parse json work request correctly"); - } - @Test public void testGetResponse_badJson_throws() throws IOException { verifyGetResponseFailure(