diff --git a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java index bc325107d00b8c..a8684d1c229931 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java +++ b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java @@ -163,4 +163,9 @@ String getRecordingStreamMessage() { recordingInputStream.readRemaining(); return recordingInputStream.getRecordedDataAsString(); } + + @Override + public String toString() { + return workerKey.getMnemonic() + " worker #" + workerId; + } } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java index 182c0c4b0263b8..337b18b84446ec 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java @@ -53,7 +53,7 @@ public void setOptions(WorkerOptions workerOptions) { } @Override - public Worker create(WorkerKey key) throws Exception { + public Worker create(WorkerKey key) { int workerId = pidCounter.getAndIncrement(); String workTypeName = WorkerKey.makeWorkerTypeName(key.getProxied()); Path logFile = @@ -66,9 +66,7 @@ public Worker create(WorkerKey key) throws Exception { worker = new SandboxedWorker(key, workerId, workDir, logFile); } else if (key.getProxied()) { WorkerMultiplexer workerMultiplexer = WorkerMultiplexerManager.getInstance(key, logFile); - worker = - new WorkerProxy( - key, workerId, key.getExecRoot(), workerMultiplexer.getLogFile(), workerMultiplexer); + worker = new WorkerProxy(key, workerId, workerMultiplexer.getLogFile(), workerMultiplexer); } else { worker = new SingleplexWorker(key, workerId, key.getExecRoot(), logFile); } @@ -112,13 +110,12 @@ public PooledObject wrap(Worker worker) { @Override public void destroyObject(WorkerKey key, PooledObject p) throws Exception { if (workerOptions.workerVerbose) { + int workerId = p.getObject().getWorkerId(); reporter.handle( Event.info( String.format( "Destroying %s %s (id %d)", - key.getMnemonic(), - WorkerKey.makeWorkerTypeName(key.getProxied()), - p.getObject().getWorkerId()))); + key.getMnemonic(), WorkerKey.makeWorkerTypeName(key.getProxied()), workerId))); } p.getObject().destroy(); } @@ -161,10 +158,10 @@ public boolean validateObject(WorkerKey key, PooledObject p) { } return false; } - boolean hashMatches = - key.getWorkerFilesCombinedHash().equals(worker.getWorkerFilesCombinedHash()); + boolean filesChanged = + !key.getWorkerFilesCombinedHash().equals(worker.getWorkerFilesCombinedHash()); - if (workerOptions.workerVerbose && reporter != null && !hashMatches) { + if (workerOptions.workerVerbose && reporter != null && filesChanged) { StringBuilder msg = new StringBuilder(); msg.append( String.format( @@ -191,6 +188,6 @@ public boolean validateObject(WorkerKey key, PooledObject p) { reporter.handle(Event.warn(msg.toString())); } - return hashMatches; + return !filesChanged; } } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java index 6ef71aca2bc3b3..1867372ab8f972 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java @@ -67,6 +67,7 @@ public Iterable> getCommandOptions(Command command) public void beforeCommand(CommandEnvironment env) { this.env = env; env.getEventBus().register(this); + WorkerMultiplexerManager.beforeCommand(env); } @Subscribe @@ -236,6 +237,6 @@ public void afterCommand() { if (this.workerFactory != null) { this.workerFactory.setReporter(null); } - WorkerMultiplexerManager.afterCommandCleanup(); + WorkerMultiplexerManager.afterCommand(); } } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java index 858dbbe0af189a..1a669477b56bc5 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; +import javax.annotation.Nullable; /** * An intermediate worker that sends requests and receives responses from the worker processes. @@ -48,29 +49,23 @@ public class WorkerMultiplexer extends Thread { * A map of {@code WorkResponse}s received from the worker process. They are stored in this map * keyed by the request id until the corresponding {@code WorkerProxy} picks them up. */ - private final ConcurrentMap workerProcessResponse; + private final ConcurrentMap workerProcessResponse = + new ConcurrentHashMap<>(); /** * A map of semaphores corresponding to {@code WorkRequest}s. After sending the {@code * WorkRequest}, {@code WorkerProxy} will wait on a semaphore to be released. {@code * WorkerMultiplexer} is responsible for releasing the corresponding semaphore in order to signal * {@code WorkerProxy} that the {@code WorkerResponse} has been received. */ - private final ConcurrentMap responseChecker; - /** The worker process that this WorkerMultiplexer should be talking to. */ - private Subprocess process; + private final ConcurrentMap responseChecker = new ConcurrentHashMap<>(); /** - * Set to true if one of the worker processes returns an unparseable response, or for other - * reasons we can't properly handle the remaining responses. We then discard all the responses - * from other work requests and abort. + * The worker process that this WorkerMultiplexer should be talking to. This should only be set + * once, when creating a new process. If the process dies or its stdio streams get corrupted, the + * {@code WorkerMultiplexer} gets discarded as well and a new one gets created as needed. */ - private boolean isWorkerStreamCorrupted; + private Subprocess process; /** InputStream from the worker process. */ private RecordingInputStream recordingStream; - /** - * True if we have received EOF on the stream from the worker process. We then stop processing, - * and all workers still waiting for responses will fail. - */ - private boolean isWorkerStreamClosed; /** True if this multiplexer was explicitly destroyed. */ private boolean wasDestroyed; /** @@ -89,25 +84,20 @@ public class WorkerMultiplexer extends Thread { * The active Reporter object, non-null if {@code --worker_verbose} is set. This must be cleared * at the end of a command execution. */ - public EventHandler reporter; + private EventHandler reporter; WorkerMultiplexer(Path logFile, WorkerKey workerKey) { this.logFile = logFile; this.workerKey = workerKey; - responseChecker = new ConcurrentHashMap<>(); - workerProcessResponse = new ConcurrentHashMap<>(); - isWorkerStreamCorrupted = false; - isWorkerStreamClosed = false; - wasDestroyed = false; } /** Sets or clears the reporter for outputting verbose info. */ - void setReporter(EventHandler reporter) { + synchronized void setReporter(@Nullable EventHandler reporter) { this.reporter = reporter; } /** Reports a string to the user if reporting is enabled. */ - private void report(String s) { + private synchronized void report(String s) { EventHandler r = this.reporter; // Protect against race condition with setReporter(). if (r != null && s != null) { r.handle(Event.info(s)); @@ -119,8 +109,10 @@ private void report(String s) { * exist. Also makes sure this {@code WorkerMultiplexer} runs as a separate thread. */ public synchronized void createProcess(Path workDir) throws IOException { - // The process may have died in the meanwhile (e.g. between builds). - if (this.process == null || !this.process.isAlive()) { + if (this.process == null) { + if (this.wasDestroyed) { + throw new IOException("Multiplexer destroyed before created process"); + } ImmutableList args = workerKey.getArgs(); File executable = new File(args.get(0)); if (!executable.isAbsolute() && executable.getParent() != null) { @@ -128,8 +120,6 @@ public synchronized void createProcess(Path workDir) throws IOException { newArgs.set(0, new File(workDir.getPathFile(), newArgs.get(0)).getAbsolutePath()); args = ImmutableList.copyOf(newArgs); } - isWorkerStreamCorrupted = false; - isWorkerStreamClosed = false; SubprocessBuilder processBuilder = subprocessFactory != null ? new SubprocessBuilder(subprocessFactory) @@ -139,6 +129,8 @@ public synchronized void createProcess(Path workDir) throws IOException { processBuilder.setStderr(logFile.getPathFile()); processBuilder.setEnv(workerKey.getEnv()); this.process = processBuilder.start(); + } else if (!this.process.isAlive()) { + throw new IOException("Process is dead"); } if (!this.isAlive()) { this.start(); @@ -155,24 +147,24 @@ public Path getLogFile() { /** * Signals this object to destroy itself, including the worker process. The object might not be - * fully destroyed at the end of this call, but will terminate soon. + * fully destroyed at the end of this call, but will terminate soon. This is considered a + * deliberate destruction. */ public synchronized void destroyMultiplexer() { if (this.process != null) { - destroyProcess(this.process); - this.process = null; + destroyProcess(); } wasDestroyed = true; } /** Destroys the worker subprocess. This might block forever if the subprocess refuses to die. */ - private void destroyProcess(Subprocess process) { + private synchronized void destroyProcess() { boolean wasInterrupted = false; try { - process.destroy(); + this.process.destroy(); while (true) { try { - process.waitFor(); + this.process.waitFor(); return; } catch (InterruptedException ie) { wasInterrupted = true; @@ -183,7 +175,6 @@ private void destroyProcess(Subprocess process) { if (wasInterrupted) { Thread.currentThread().interrupt(); // preserve interrupted status } - isWorkerStreamClosed = true; } } @@ -200,10 +191,6 @@ public synchronized void putRequest(WorkRequest request) throws IOException { // We can't know how much of the request was sent, so we have to assume the worker's input // now contains garbage. // TODO(b/151767359): Avoid causing garbage! Maybe by sending in a separate thread? - isWorkerStreamCorrupted = true; - if (e instanceof InterruptedIOException) { - Thread.currentThread().interrupt(); - } responseChecker.remove(request.getRequestId()); throw e; } @@ -228,10 +215,8 @@ public WorkResponse getResponse(Integer requestId) throws InterruptedException { // Wait for the multiplexer to get our response and release this semaphore. The semaphore will // throw {@code InterruptedException} when the multiplexer is terminated. waitForResponse.acquire(); - report("Acquired response semaphore for " + requestId); WorkResponse workResponse = workerProcessResponse.get(requestId); - report("Response for " + requestId + " is " + workResponse); return workResponse; } finally { responseChecker.remove(requestId); @@ -247,25 +232,25 @@ public WorkResponse getResponse(Integer requestId) throws InterruptedException { * execution cancellation. */ private void waitResponse() throws InterruptedException, IOException { - Subprocess p = this.process; - if (p == null || !p.isAlive()) { - // Avoid busy-wait for a new process. + recordingStream = new RecordingInputStream(this.process.getInputStream()); + recordingStream.startRecording(4096); + // TODO(larsrc): Turn this into a loop that also sends requests. + // Allow interrupts while waiting for responses, without conflating it with I/O errors. + while (recordingStream.available() == 0) { + if (!this.process.isAlive()) { + throw new IOException( + String.format("Multiplexer process for %s is dead", workerKey.getMnemonic())); + } Thread.sleep(1); - return; } - recordingStream = new RecordingInputStream(p.getInputStream()); - recordingStream.startRecording(4096); WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream); // A null parsedResponse can only happen if the input stream is closed, in which case we // drop everything. if (parsedResponse == null) { - isWorkerStreamClosed = true; - report( + throw new IOException( String.format( - "Multiplexer process for %s has closed its output, aborting multiplexer", - workerKey.getMnemonic())); - return; + "Multiplexer process for %s died while reading response", workerKey.getMnemonic())); } int requestId = parsedResponse.getRequestId(); @@ -287,13 +272,15 @@ private void waitResponse() throws InterruptedException, IOException { /** The multiplexer thread that listens to the WorkResponse from worker process. */ @Override public void run() { - while (!isWorkerStreamClosed && !isWorkerStreamCorrupted) { + while (this.process.isAlive()) { try { waitResponse(); } catch (IOException e) { // We got this exception while reading from the worker's stdout. We can't trust the // output any more at that point. - isWorkerStreamCorrupted = true; + if (this.process.isAlive()) { + destroyProcess(); + } if (e instanceof InterruptedIOException) { report( String.format( @@ -315,17 +302,12 @@ public void run() { // will let fall on the floor, but we still want to leave the process running for the next // build. // TODO(b/151767359): Cancel all outstanding requests when cancellation is implemented. - releaseAllSemaphores(); + for (Semaphore semaphore : responseChecker.values()) { + semaphore.release(); + } } } - // If we get here, the worker process is either dead or corrupted. We could attempt to restart - // it, but the outstanding requests will have failed already. Until we have a way to signal - // transient failures, we have to just reject all further requests and make sure the process - // is really dead synchronized (this) { - if (process != null && process.isAlive()) { - destroyMultiplexer(); - } releaseAllSemaphores(); } } @@ -350,14 +332,14 @@ String getRecordingStreamMessage() { /** Returns true if this process has died for other reasons than a call to {@code #destroy()}. */ boolean diedUnexpectedly() { - Subprocess p = this.process; // Protects against this.process getting null. - return p != null && !p.isAlive() && !wasDestroyed; + return this.process != null && !this.process.isAlive() && !wasDestroyed; } /** Returns the exit value of multiplexer's process, if it has exited. */ Optional getExitValue() { - Subprocess p = this.process; // Protects against this.process getting null. - return p != null && !p.isAlive() ? Optional.of(p.exitValue()) : Optional.empty(); + return this.process != null && !this.process.isAlive() + ? Optional.of(this.process.exitValue()) + : Optional.empty(); } /** For testing only, to verify that maps are cleared after responses are reaped. */ diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java index 35565f965ea190..aefabf0a4306ee 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java @@ -16,13 +16,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.devtools.build.lib.actions.UserExecException; +import com.google.devtools.build.lib.events.EventHandler; +import com.google.devtools.build.lib.runtime.CommandEnvironment; import com.google.devtools.build.lib.server.FailureDetails; import com.google.devtools.build.lib.server.FailureDetails.FailureDetail; import com.google.devtools.build.lib.server.FailureDetails.Worker.Code; import com.google.devtools.build.lib.vfs.Path; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Semaphore; +import javax.annotation.Nullable; /** * A manager to instantiate and destroy multiplexers. There should only be one {@code @@ -36,9 +38,6 @@ public class WorkerMultiplexerManager { */ private static final Map multiplexerInstance = new HashMap<>(); - /** A semaphore to protect {@code multiplexerInstance} and {@code multiplexerRefCount} objects. */ - private static final Semaphore semMultiplexer = new Semaphore(1); - private WorkerMultiplexerManager() {} /** @@ -46,68 +45,65 @@ private WorkerMultiplexerManager() {} * objects with the same {@code WorkerKey} talk to the same {@code WorkerMultiplexer}. Also, * record how many {@code WorkerProxy} objects are talking to this {@code WorkerMultiplexer}. */ - public static WorkerMultiplexer getInstance(WorkerKey key, Path logFile) - throws InterruptedException { - semMultiplexer.acquire(); - multiplexerInstance.putIfAbsent(key, new InstanceInfo(logFile, key)); - multiplexerInstance.get(key).increaseRefCount(); - WorkerMultiplexer workerMultiplexer = multiplexerInstance.get(key).getWorkerMultiplexer(); - semMultiplexer.release(); - return workerMultiplexer; + public static synchronized WorkerMultiplexer getInstance(WorkerKey key, Path logFile) { + InstanceInfo instanceInfo = + multiplexerInstance.computeIfAbsent(key, k -> new InstanceInfo(logFile, k)); + instanceInfo.increaseRefCount(); + return instanceInfo.getWorkerMultiplexer(); + } + + static void beforeCommand(CommandEnvironment env) { + setReporter(env.getReporter()); + } + + static void afterCommand() { + setReporter(null); } /** - * Removes the {@code WorkerMultiplexer} instance and reference count since it is no longer in - * use. + * Sets the reporter for all existing multiplexer instances. This allows reporting problems + * encountered while fetching an instance, e.g. during WorkerProxy validation. */ - public static void removeInstance(WorkerKey key) throws InterruptedException, UserExecException { - semMultiplexer.acquire(); - try { - multiplexerInstance.get(key).decreaseRefCount(); - if (multiplexerInstance.get(key).getRefCount() == 0) { - multiplexerInstance.get(key).getWorkerMultiplexer().interrupt(); - multiplexerInstance.get(key).getWorkerMultiplexer().destroyMultiplexer(); - multiplexerInstance.remove(key); - } - } catch (Exception e) { - String message = "NullPointerException while accessing non-existent multiplexer instance."; - throw createUserExecException(e, message, Code.MULTIPLEXER_INSTANCE_REMOVAL_FAILURE); - } finally { - semMultiplexer.release(); + private static synchronized void setReporter(@Nullable EventHandler reporter) { + for (InstanceInfo m : multiplexerInstance.values()) { + m.workerMultiplexer.setReporter(reporter); } } - /** Is called when a build is done, to do per-build cleanup. */ - static void afterCommandCleanup() { - try { - semMultiplexer.acquire(); - for (InstanceInfo i : multiplexerInstance.values()) { - i.getWorkerMultiplexer().setReporter(null); - } - semMultiplexer.release(); - } catch (InterruptedException e) { - // Interrupted during cleanup, not much we can do. + /** Removes a {@code WorkerProxy} instance and reference count since it is no longer in use. */ + public static synchronized void removeInstance(WorkerKey key) throws UserExecException { + InstanceInfo instanceInfo = multiplexerInstance.get(key); + if (instanceInfo == null) { + throw createUserExecException( + "Attempting to remove non-existent multiplexer instance.", + Code.MULTIPLEXER_INSTANCE_REMOVAL_FAILURE); + } + instanceInfo.decreaseRefCount(); + if (instanceInfo.getRefCount() == 0) { + instanceInfo.getWorkerMultiplexer().interrupt(); + instanceInfo.getWorkerMultiplexer().destroyMultiplexer(); + multiplexerInstance.remove(key); } } @VisibleForTesting static WorkerMultiplexer getMultiplexer(WorkerKey key) throws UserExecException { - try { - return multiplexerInstance.get(key).getWorkerMultiplexer(); - } catch (NullPointerException e) { - String message = "NullPointerException while accessing non-existent multiplexer instance."; - throw createUserExecException(e, message, Code.MULTIPLEXER_DOES_NOT_EXIST); + InstanceInfo instanceInfo = multiplexerInstance.get(key); + if (instanceInfo == null) { + throw createUserExecException( + "Accessing non-existent multiplexer instance.", Code.MULTIPLEXER_DOES_NOT_EXIST); } + return instanceInfo.getWorkerMultiplexer(); } @VisibleForTesting static Integer getRefCount(WorkerKey key) throws UserExecException { - try { - return multiplexerInstance.get(key).getRefCount(); - } catch (NullPointerException e) { - String message = "NullPointerException while accessing non-existent multiplexer instance."; - throw createUserExecException(e, message, Code.MULTIPLEXER_DOES_NOT_EXIST); + InstanceInfo instanceInfo = multiplexerInstance.get(key); + if (instanceInfo == null) { + throw createUserExecException( + "Accessing non-existent multiplexer instance.", Code.MULTIPLEXER_DOES_NOT_EXIST); } + return instanceInfo.getRefCount(); } @VisibleForTesting @@ -115,18 +111,17 @@ static Integer getInstanceCount() { return multiplexerInstance.keySet().size(); } - private static UserExecException createUserExecException( - Exception e, String message, Code detailedCode) { + private static UserExecException createUserExecException(String message, Code detailedCode) { return new UserExecException( FailureDetail.newBuilder() - .setMessage(ErrorMessage.builder().message(message).exception(e).build().toString()) + .setMessage(message) .setWorker(FailureDetails.Worker.newBuilder().setCode(detailedCode)) .build()); } /** Contains the WorkerMultiplexer instance and reference count. */ static class InstanceInfo { - private WorkerMultiplexer workerMultiplexer; + private final WorkerMultiplexer workerMultiplexer; private Integer refCount; public InstanceInfo(Path logFile, WorkerKey workerKey) { diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java index 6b3258c57147cf..f1940369647e89 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java @@ -39,7 +39,6 @@ final class WorkerProxy extends Worker { WorkerProxy( WorkerKey workerKey, int workerId, - Path workDir, Path logFile, WorkerMultiplexer workerMultiplexer) { super(workerKey, workerId, logFile); @@ -57,6 +56,7 @@ final class WorkerProxy extends Worker { @Override void setReporter(EventHandler reporter) { + // We might have created this multiplexer after setting the reporter for existing multiplexers workerMultiplexer.setReporter(reporter); } @@ -67,17 +67,17 @@ public void prepareExecution( workerMultiplexer.createProcess(workDir); } - /** Send the WorkRequest to multiplexer. */ @Override synchronized void destroy() { try { WorkerMultiplexerManager.removeInstance(workerKey); - } catch (InterruptedException e) { - logger.atWarning().withCause(e).log( - "InterruptedException was caught while destroying multiplexer. " - + "It could because the multiplexer was interrupted."); } catch (UserExecException e) { logger.atWarning().withCause(e).log("Exception"); + } finally { + if (this.shutdownHook != null) { + Runtime.getRuntime().removeShutdownHook(this.shutdownHook); + this.shutdownHook = null; + } } } @@ -110,4 +110,9 @@ public Optional getExitValue() { String getRecordingStreamMessage() { return workerMultiplexer.getRecordingStreamMessage(); } + + @Override + public String toString() { + return workerKey.getMnemonic() + " proxy worker #" + workerId; + } } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java index 7be36ea8a655d3..26f6019fa53136 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java @@ -57,6 +57,7 @@ import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; import com.google.protobuf.ByteString; import java.io.IOException; +import java.io.InterruptedIOException; import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; @@ -408,6 +409,7 @@ WorkResponse execInWorker( try { inputFiles.materializeVirtualInputs(execRoot); } catch (IOException e) { + restoreInterrupt(e); String message = "IOException while materializing virtual inputs:"; throw createUserExecException(e, message, Code.VIRTUAL_INPUT_MATERIALIZATION_FAILURE); } @@ -415,6 +417,7 @@ WorkResponse execInWorker( try { context.prefetchInputs(); } catch (IOException e) { + restoreInterrupt(e); String message = "IOException while prefetching for worker:"; throw createUserExecException(e, message, Code.PREFETCH_FAILURE); } @@ -426,6 +429,7 @@ WorkResponse execInWorker( worker.setReporter(workerOptions.workerVerbose ? reporter : null); request = createWorkRequest(spawn, context, flagFiles, inputFileCache, key); } catch (IOException e) { + restoreInterrupt(e); String message = "IOException while borrowing a worker from the pool:"; throw createUserExecException(e, message, Code.BORROW_FAILURE); } @@ -442,6 +446,7 @@ WorkResponse execInWorker( worker.prepareExecution(inputFiles, outputs, key.getWorkerFilesWithHashes().keySet()); spawnMetrics.setSetupTime(setupInputsTime.plus(prepareExecutionStopwatch.elapsed())); } catch (IOException e) { + restoreInterrupt(e); String message = ErrorMessage.builder() .message("IOException while preparing the execution environment of a worker:") @@ -456,6 +461,7 @@ WorkResponse execInWorker( try { worker.putRequest(request); } catch (IOException e) { + restoreInterrupt(e); String message = ErrorMessage.builder() .message( @@ -471,6 +477,7 @@ WorkResponse execInWorker( try { response = worker.getResponse(request.getRequestId()); } catch (IOException e) { + restoreInterrupt(e); // If protobuf or json reader couldn't parse the response, try to print whatever the // failing worker wrote to stdout - it's probably a stack trace or some kind of error // message that will help the user figure out why the compiler is failing. @@ -494,6 +501,7 @@ WorkResponse execInWorker( worker.finishExecution(execRoot); spawnMetrics.setProcessOutputsTime(processOutputsStopwatch.elapsed()); } catch (IOException e) { + restoreInterrupt(e); String message = ErrorMessage.builder() .message("IOException while finishing worker execution:") @@ -509,8 +517,10 @@ WorkResponse execInWorker( workers.invalidateObject(key, worker); } catch (IOException e1) { // The original exception is more important / helpful, so we'll just ignore this one. + restoreInterrupt(e1); + } finally { + worker = null; } - worker = null; } throw e; @@ -523,6 +533,12 @@ WorkResponse execInWorker( return response; } + private static void restoreInterrupt(IOException e) { + if (e instanceof InterruptedIOException) { + Thread.currentThread().interrupt(); + } + } + private static UserExecException createUserExecException( IOException e, String message, Code detailedCode) { return createUserExecException( diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerTest.java index 9075e2a458c106..45921c36133419 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerTest.java +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerTest.java @@ -62,7 +62,7 @@ public void testGetResponse_noOutstandingRequests() throws IOException, Interrup multiplexer.setProcessFactory(params -> new FakeSubprocess(serverInputStream)); WorkRequest request1 = WorkRequest.newBuilder().setRequestId(1).build(); - WorkerProxy worker = new WorkerProxy(workerKey, 2, logPath, logPath, multiplexer); + WorkerProxy worker = new WorkerProxy(workerKey, 2, logPath, multiplexer); worker.prepareExecution(null, null, null); worker.putRequest(request1); WorkResponse response1 = WorkResponse.newBuilder().setRequestId(1).build(); @@ -85,12 +85,12 @@ public void testGetResponse_basicConcurrency() OutputStream workerOutputStream = new PipedOutputStream(serverInputStream); multiplexer.setProcessFactory(params -> new FakeSubprocess(serverInputStream)); - WorkerProxy worker1 = new WorkerProxy(workerKey, 1, logPath, logPath, multiplexer); + WorkerProxy worker1 = new WorkerProxy(workerKey, 1, logPath, multiplexer); worker1.prepareExecution(null, null, null); WorkRequest request1 = WorkRequest.newBuilder().setRequestId(3).build(); worker1.putRequest(request1); - WorkerProxy worker2 = new WorkerProxy(workerKey, 2, logPath, logPath, multiplexer); + WorkerProxy worker2 = new WorkerProxy(workerKey, 2, logPath, multiplexer); worker2.prepareExecution(null, null, null); WorkRequest request2 = WorkRequest.newBuilder().setRequestId(42).build(); worker2.putRequest(request2); @@ -121,12 +121,12 @@ public void testGetResponse_slowMultiplexer() OutputStream workerOutputStream = new PipedOutputStream(serverInputStrean); multiplexer.setProcessFactory(params -> new FakeSubprocess(serverInputStrean)); - WorkerProxy worker1 = new WorkerProxy(workerKey, 1, logPath, logPath, multiplexer); + WorkerProxy worker1 = new WorkerProxy(workerKey, 1, logPath, multiplexer); worker1.prepareExecution(null, null, null); WorkRequest request1 = WorkRequest.newBuilder().setRequestId(3).build(); worker1.putRequest(request1); - WorkerProxy worker2 = new WorkerProxy(workerKey, 2, logPath, logPath, multiplexer); + WorkerProxy worker2 = new WorkerProxy(workerKey, 2, logPath, multiplexer); worker2.prepareExecution(null, null, null); WorkRequest request2 = WorkRequest.newBuilder().setRequestId(42).build(); worker2.putRequest(request2); @@ -178,12 +178,12 @@ public void testGetResponse_slowProxy() OutputStream workerOutputStream = new PipedOutputStream(serverInputStream); multiplexer.setProcessFactory(params -> new FakeSubprocess(serverInputStream)); - WorkerProxy worker1 = new WorkerProxy(workerKey, 1, logPath, logPath, multiplexer); + WorkerProxy worker1 = new WorkerProxy(workerKey, 1, logPath, multiplexer); worker1.prepareExecution(null, null, null); WorkRequest request1 = WorkRequest.newBuilder().setRequestId(3).build(); worker1.putRequest(request1); - WorkerProxy worker2 = new WorkerProxy(workerKey, 2, logPath, logPath, multiplexer); + WorkerProxy worker2 = new WorkerProxy(workerKey, 2, logPath, multiplexer); worker2.prepareExecution(null, null, null); WorkRequest request2 = WorkRequest.newBuilder().setRequestId(42).build(); worker2.putRequest(request2);