From 65d18a5221926f2beda62c1c427f8f64833109ae Mon Sep 17 00:00:00 2001 From: David Cummings Date: Wed, 9 Dec 2020 15:37:38 -0800 Subject: [PATCH] Add ability to interrupt commands Add the ability to mark an invocation of a command as interruptible. If another command is executed while this command is running, the command will be interrupted. Closes #12526. PiperOrigin-RevId: 346652616 --- src/main/cpp/blaze.cc | 8 +- src/main/cpp/startup_options.cc | 2 + src/main/cpp/startup_options.h | 4 + .../runtime/BlazeServerStartupOptions.java | 8 + .../build/lib/server/CommandManager.java | 44 ++- .../build/lib/server/GrpcServerImpl.java | 9 +- src/main/protobuf/command_server.proto | 4 + .../build/lib/server/CommandManagerTest.java | 8 +- .../build/lib/server/GrpcServerTest.java | 285 ++++++++++++++++++ 9 files changed, 359 insertions(+), 13 deletions(-) diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc index f42a4f887aaf7a..e3f24cecd5a8a5 100644 --- a/src/main/cpp/blaze.cc +++ b/src/main/cpp/blaze.cc @@ -308,6 +308,7 @@ class BlazeServer final { const int connect_timeout_secs_; const bool batch_; const bool block_for_lock_; + const bool preemptible_; const blaze_util::Path output_base_; }; @@ -1050,8 +1051,9 @@ static bool IsVolatileArg(const string &arg) { // not used at server startup to be part of the startup command line. The // server command line difference logic can be simplified then. static const std::set volatile_startup_options = { - "--option_sources=", "--max_idle_secs=", "--connect_timeout_secs=", - "--local_startup_timeout_secs=", "--client_debug="}; + "--option_sources=", "--max_idle_secs=", + "--connect_timeout_secs=", "--local_startup_timeout_secs=", + "--client_debug=", "--preemptible="}; // Split arg based on the first "=" if one exists in arg. const string::size_type eq_pos = arg.find_first_of('='); @@ -1671,6 +1673,7 @@ BlazeServer::BlazeServer(const StartupOptions &startup_options) connect_timeout_secs_(startup_options.connect_timeout_secs), batch_(startup_options.batch), block_for_lock_(startup_options.block_for_lock), + preemptible_(startup_options.preemptible), output_base_(startup_options.output_base) { if (!startup_options.client_debug) { gpr_set_log_function(null_grpc_log_function); @@ -1945,6 +1948,7 @@ unsigned int BlazeServer::Communicate( command_server::RunRequest request; request.set_cookie(request_cookie_); request.set_block_for_lock(block_for_lock_); + request.set_preemptible(preemptible_); request.set_client_description("pid=" + blaze::GetProcessIdAsString()); for (const string &arg : arg_vector) { request.add_arg(arg); diff --git a/src/main/cpp/startup_options.cc b/src/main/cpp/startup_options.cc index 5ee43bac80a98a..6c4d40f611672a 100644 --- a/src/main/cpp/startup_options.cc +++ b/src/main/cpp/startup_options.cc @@ -85,6 +85,7 @@ StartupOptions::StartupOptions(const string &product_name, local_startup_timeout_secs(120), have_invocation_policy_(false), client_debug(false), + preemptible(false), java_logging_formatter( "com.google.devtools.build.lib.util.SingleLineFormatter"), expand_configs_in_place(true), @@ -130,6 +131,7 @@ StartupOptions::StartupOptions(const string &product_name, RegisterNullaryStartupFlag("batch_cpu_scheduling", &batch_cpu_scheduling); RegisterNullaryStartupFlag("block_for_lock", &block_for_lock); RegisterNullaryStartupFlag("client_debug", &client_debug); + RegisterNullaryStartupFlag("preemptible", &preemptible); RegisterNullaryStartupFlag("expand_configs_in_place", &expand_configs_in_place); RegisterNullaryStartupFlag("fatal_event_bus_exceptions", diff --git a/src/main/cpp/startup_options.h b/src/main/cpp/startup_options.h index 41aac66a687093..fcd6787a9e3e1a 100644 --- a/src/main/cpp/startup_options.h +++ b/src/main/cpp/startup_options.h @@ -245,6 +245,10 @@ class StartupOptions { // Whether to output addition debugging information in the client. bool client_debug; + // Whether the resulting command will be preempted if a subsequent command is + // run. + bool preemptible; + // Value of the java.util.logging.FileHandler.formatter Java property. std::string java_logging_formatter; diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java index a9f976e6592052..6a242c695297bd 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java @@ -375,6 +375,14 @@ public String getTypeDescription() { + "cause the server to restart.") public boolean clientDebug; + @Option( + name = "preemptible", + defaultValue = "false", // NOTE: only for documentation, value is set and used by the client. + documentationCategory = OptionDocumentationCategory.BAZEL_CLIENT_OPTIONS, + effectTags = {OptionEffectTag.EAGERNESS_TO_EXIT}, + help = "If true, the command can be preempted if another command is started.") + public boolean preemptible; + @Option( name = "connect_timeout_secs", defaultValue = "30", // NOTE: only for documentation, value is set and used by the client. diff --git a/src/main/java/com/google/devtools/build/lib/server/CommandManager.java b/src/main/java/com/google/devtools/build/lib/server/CommandManager.java index 04f079c1d34256..ba91bd8472cf16 100644 --- a/src/main/java/com/google/devtools/build/lib/server/CommandManager.java +++ b/src/main/java/com/google/devtools/build/lib/server/CommandManager.java @@ -43,6 +43,24 @@ class CommandManager { idle(); } + void preemptEligibleCommands() { + synchronized (runningCommandsMap) { + ImmutableSet.Builder commandsToInterruptBuilder = new ImmutableSet.Builder<>(); + + for (RunningCommand command : runningCommandsMap.values()) { + if (command.isPreemptible()) { + command.thread.interrupt(); + commandsToInterruptBuilder.add(command.id); + } + } + + ImmutableSet commandsToInterrupt = commandsToInterruptBuilder.build(); + if (!commandsToInterrupt.isEmpty()) { + startSlowInterruptWatcher(commandsToInterrupt); + } + } + } + void interruptInflightCommands() { synchronized (runningCommandsMap) { for (RunningCommand command : runningCommandsMap.values()) { @@ -54,7 +72,7 @@ void interruptInflightCommands() { } void doCancel(CancelRequest request) { - try (RunningCommand cancelCommand = create()) { + try (RunningCommand cancelCommand = createCommand()) { synchronized (runningCommandsMap) { RunningCommand pendingCommand = runningCommandsMap.get(request.getCommandId()); if (pendingCommand != null) { @@ -88,8 +106,19 @@ void waitForChange(long timeout) throws InterruptedException { } } - RunningCommand create() { - RunningCommand command = new RunningCommand(); + RunningCommand createPreemptibleCommand() { + RunningCommand command = new RunningCommand(true); + registerCommand(command); + return command; + } + + RunningCommand createCommand() { + RunningCommand command = new RunningCommand(false); + registerCommand(command); + return command; + } + + private void registerCommand(RunningCommand command) { synchronized (runningCommandsMap) { if (runningCommandsMap.isEmpty()) { busy(); @@ -98,7 +127,6 @@ RunningCommand create() { runningCommandsMap.notify(); } logger.atInfo().log("Starting command %s on thread %s", command.id, command.thread.getName()); - return command; } private void idle() { @@ -148,10 +176,12 @@ private void startSlowInterruptWatcher(final ImmutableSet commandIds) { class RunningCommand implements AutoCloseable { private final Thread thread; private final String id; + private final boolean preemptible; - private RunningCommand() { + private RunningCommand(boolean preemptible) { thread = Thread.currentThread(); id = UUID.randomUUID().toString(); + this.preemptible = preemptible; } @Override @@ -170,5 +200,9 @@ public void close() { String getId() { return id; } + + boolean isPreemptible() { + return this.preemptible; + } } } diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index aef68c390b91df..3e368bed432c94 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -512,7 +512,12 @@ private void executeCommand(RunRequest request, BlockingStreamObserver obse @Override public void ping(PingRequest pingRequest, StreamObserver streamObserver) { - try (RunningCommand command = commandManager.create()) { + try (RunningCommand command = commandManager.createCommand()) { PingResponse.Builder response = PingResponse.newBuilder(); if (pingRequest.getCookie().equals(requestCookie)) { response.setCookie(responseCookie); diff --git a/src/main/protobuf/command_server.proto b/src/main/protobuf/command_server.proto index 0fd03bac31eacb..636052db0ddc99 100644 --- a/src/main/protobuf/command_server.proto +++ b/src/main/protobuf/command_server.proto @@ -58,6 +58,10 @@ message RunRequest { // came from. These options have already been parsed and already have had // their effect. This information should only be used for logging. repeated StartupOption startup_options = 6; + + // Whether the resulting command can be preempted if additional commands + // are received. + bool preemptible = 7; } // Contains the a startup option with its source file. Uses bytes to preserve diff --git a/src/test/java/com/google/devtools/build/lib/server/CommandManagerTest.java b/src/test/java/com/google/devtools/build/lib/server/CommandManagerTest.java index e9e952a6139222..6df9d462625a9b 100644 --- a/src/test/java/com/google/devtools/build/lib/server/CommandManagerTest.java +++ b/src/test/java/com/google/devtools/build/lib/server/CommandManagerTest.java @@ -36,10 +36,10 @@ public class CommandManagerTest { public void testBasicOperationsOnSingleThread() { CommandManager underTest = new CommandManager(/*doIdleServerTasks=*/ false); assertThat(underTest.isEmpty()).isTrue(); - try (RunningCommand firstCommand = underTest.create()) { + try (RunningCommand firstCommand = underTest.createCommand()) { assertThat(underTest.isEmpty()).isFalse(); assertThat(isValidUuid(firstCommand.getId())).isTrue(); - try (RunningCommand secondCommand = underTest.create()) { + try (RunningCommand secondCommand = underTest.createCommand()) { assertThat(underTest.isEmpty()).isFalse(); assertThat(isValidUuid(secondCommand.getId())).isTrue(); assertThat(firstCommand.getId()).isNotEqualTo(secondCommand.getId()); @@ -75,11 +75,11 @@ public void testNotifiesOnBusyAndIdle() throws Exception { // We want to ensure at each step that we are actively awaiting notification. waitForThreadWaiting(waiting, thread); - try (RunningCommand firstCommand = underTest.create()) { + try (RunningCommand firstCommand = underTest.createCommand()) { cyclicBarrier.await(); assertThat(notificationCounter.get()).isEqualTo(1); waitForThreadWaiting(waiting, thread); - try (RunningCommand secondCommand = underTest.create()) { + try (RunningCommand secondCommand = underTest.createCommand()) { cyclicBarrier.await(); assertThat(notificationCounter.get()).isEqualTo(2); waitForThreadWaiting(waiting, thread); diff --git a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java index ce0c1b19dd9950..929fd3cf177543 100644 --- a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java +++ b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java @@ -34,6 +34,7 @@ import com.google.devtools.build.lib.server.GrpcServerImpl.BlockingStreamObserver; import com.google.devtools.build.lib.testutil.Suite; import com.google.devtools.build.lib.testutil.TestSpec; +import com.google.devtools.build.lib.testutil.TestUtils; import com.google.devtools.build.lib.util.Pair; import com.google.devtools.build.lib.util.io.OutErr; import com.google.devtools.build.lib.vfs.DigestHashFunction; @@ -112,6 +113,15 @@ private RunRequest createRequest(String... args) { .build(); } + private RunRequest createPreemptibleRequest(String... args) { + return RunRequest.newBuilder() + .setCookie(REQUEST_COOKIE) + .setClientDescription("client-description") + .setPreemptible(true) + .addAllArg(Arrays.stream(args).map(ByteString::copyFromUtf8).collect(Collectors.toList())) + .build(); + } + @Test public void testSendingSimpleMessage() throws Exception { AtomicReference> argsReceived = new AtomicReference<>(); @@ -460,6 +470,281 @@ public void onCompleted() { .isEqualTo(Code.INTERRUPTED); } + /** + * Ensure that if a command is marked as preemptible, running a second command interrupts the + * first command. + */ + @Test + public void testPreeempt() throws Exception { + String firstCommandArg = "Foo"; + String secondCommandArg = "Bar"; + + CommandDispatcher dispatcher = + new CommandDispatcher() { + @Override + public BlazeCommandResult exec( + InvocationPolicy invocationPolicy, + List args, + OutErr outErr, + LockingMode lockingMode, + String clientDescription, + long firstContactTimeMillis, + Optional>> startupOptionsTaggedWithBazelRc) + throws InterruptedException { + if (args.contains(firstCommandArg)) { + while (true) { + try { + Thread.sleep(TestUtils.WAIT_TIMEOUT_MILLISECONDS); + } catch (InterruptedException e) { + return BlazeCommandResult.failureDetail( + FailureDetail.newBuilder() + .setInterrupted(Interrupted.newBuilder().setCode(Code.INTERRUPTED)) + .build()); + } + } + } else { + return BlazeCommandResult.success(); + } + } + }; + createServer(dispatcher); + + CountDownLatch gotFoo = new CountDownLatch(1); + AtomicReference lastFooResponse = new AtomicReference<>(); + AtomicReference lastBarResponse = new AtomicReference<>(); + + CommandServerStub stub = CommandServerGrpc.newStub(channel); + stub.run( + createPreemptibleRequest(firstCommandArg), + new StreamObserver() { + @Override + public void onNext(RunResponse value) { + gotFoo.countDown(); + lastFooResponse.set(value); + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() {} + }); + + // Wait for the first command to startup + gotFoo.await(); + + CountDownLatch gotBar = new CountDownLatch(1); + stub.run( + createRequest(secondCommandArg), + new StreamObserver() { + @Override + public void onNext(RunResponse value) { + gotBar.countDown(); + lastBarResponse.set(value); + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() {} + }); + + gotBar.await(); + server.shutdown(); + server.awaitTermination(); + + assertThat(lastBarResponse.get().getFinished()).isTrue(); + assertThat(lastBarResponse.get().getExitCode()).isEqualTo(0); + assertThat(lastFooResponse.get().getFinished()).isTrue(); + assertThat(lastFooResponse.get().getExitCode()).isEqualTo(8); + assertThat(lastFooResponse.get().hasFailureDetail()).isTrue(); + assertThat(lastFooResponse.get().getFailureDetail().hasInterrupted()).isTrue(); + assertThat(lastFooResponse.get().getFailureDetail().getInterrupted().getCode()) + .isEqualTo(Code.INTERRUPTED); + } + + /** + * Ensure that if a command is marked as preemptible, running a second preemptible command + * interupts the first command. + */ + @Test + public void testMultiPreeempt() throws Exception { + String firstCommandArg = "Foo"; + String secondCommandArg = "Bar"; + + CommandDispatcher dispatcher = + new CommandDispatcher() { + @Override + public BlazeCommandResult exec( + InvocationPolicy invocationPolicy, + List args, + OutErr outErr, + LockingMode lockingMode, + String clientDescription, + long firstContactTimeMillis, + Optional>> startupOptionsTaggedWithBazelRc) + throws InterruptedException { + if (args.contains(firstCommandArg)) { + while (true) { + try { + Thread.sleep(TestUtils.WAIT_TIMEOUT_MILLISECONDS); + } catch (InterruptedException e) { + return BlazeCommandResult.failureDetail( + FailureDetail.newBuilder() + .setInterrupted(Interrupted.newBuilder().setCode(Code.INTERRUPTED)) + .build()); + } + } + } else { + return BlazeCommandResult.success(); + } + } + }; + createServer(dispatcher); + + CountDownLatch gotFoo = new CountDownLatch(1); + AtomicReference lastFooResponse = new AtomicReference<>(); + AtomicReference lastBarResponse = new AtomicReference<>(); + + CommandServerStub stub = CommandServerGrpc.newStub(channel); + stub.run( + createPreemptibleRequest(firstCommandArg), + new StreamObserver() { + @Override + public void onNext(RunResponse value) { + gotFoo.countDown(); + lastFooResponse.set(value); + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() {} + }); + + // Wait for the first command to startup + gotFoo.await(); + + CountDownLatch gotBar = new CountDownLatch(1); + stub.run( + createPreemptibleRequest(secondCommandArg), + new StreamObserver() { + @Override + public void onNext(RunResponse value) { + gotBar.countDown(); + lastBarResponse.set(value); + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() {} + }); + + gotBar.await(); + server.shutdown(); + server.awaitTermination(); + + assertThat(lastBarResponse.get().getFinished()).isTrue(); + assertThat(lastBarResponse.get().getExitCode()).isEqualTo(0); + assertThat(lastFooResponse.get().getFinished()).isTrue(); + assertThat(lastFooResponse.get().getExitCode()).isEqualTo(8); + assertThat(lastFooResponse.get().hasFailureDetail()).isTrue(); + assertThat(lastFooResponse.get().getFailureDetail().hasInterrupted()).isTrue(); + assertThat(lastFooResponse.get().getFailureDetail().getInterrupted().getCode()) + .isEqualTo(Code.INTERRUPTED); + } + + /** + * Ensure that when a command is not marked as preemptible, running a second command does not + * interrupt the first command. + */ + @Test + public void testNoPreeempt() throws Exception { + String firstCommandArg = "Foo"; + String secondCommandArg = "Bar"; + + CountDownLatch fooBlocked = new CountDownLatch(1); + CountDownLatch fooProceed = new CountDownLatch(1); + CountDownLatch barBlocked = new CountDownLatch(1); + CountDownLatch barProceed = new CountDownLatch(1); + + CommandDispatcher dispatcher = + new CommandDispatcher() { + @Override + public BlazeCommandResult exec( + InvocationPolicy invocationPolicy, + List args, + OutErr outErr, + LockingMode lockingMode, + String clientDescription, + long firstContactTimeMillis, + Optional>> startupOptionsTaggedWithBazelRc) + throws InterruptedException { + if (args.contains(firstCommandArg)) { + fooBlocked.countDown(); + fooProceed.await(); + } else { + barBlocked.countDown(); + barProceed.await(); + } + return BlazeCommandResult.success(); + } + }; + createServer(dispatcher); + + AtomicReference lastFooResponse = new AtomicReference<>(); + AtomicReference lastBarResponse = new AtomicReference<>(); + + CommandServerStub stub = CommandServerGrpc.newStub(channel); + stub.run( + createRequest(firstCommandArg), + new StreamObserver() { + @Override + public void onNext(RunResponse value) { + lastFooResponse.set(value); + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() {} + }); + fooBlocked.await(); + + stub.run( + createRequest(secondCommandArg), + new StreamObserver() { + @Override + public void onNext(RunResponse value) { + lastBarResponse.set(value); + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() {} + }); + barBlocked.await(); + + // At this point both commands should be blocked on proceed latch, carry on... + fooProceed.countDown(); + barProceed.countDown(); + + server.shutdown(); + server.awaitTermination(); + + assertThat(lastFooResponse.get().getFinished()).isTrue(); + assertThat(lastFooResponse.get().getExitCode()).isEqualTo(0); + assertThat(lastBarResponse.get().getFinished()).isTrue(); + assertThat(lastBarResponse.get().getExitCode()).isEqualTo(0); + } + @Test public void testFlowControl() throws Exception { // This test attempts to verify that FlowControl successfully blocks after some number of onNext