From cc83bd97a8099314c548998ba8ff0b47376a9335 Mon Sep 17 00:00:00 2001 From: Anshuman Mishra Date: Tue, 30 May 2023 03:48:10 -0700 Subject: [PATCH 1/3] feat: Implement failure circuit breaker Copy of #18120: I accidentally closed #18120 during rebase and doesn't have permission to reopen. We have noticed that any problems with the remote cache have a detrimental effect on build times. On investigation we found that the interface for the circuit breaker was left unimplemented. To address this issue, implemented a failure circuit breaker, which includes three new Bazel flags: 1) experimental_circuitbreaker_strategy, 2) experimental_remote_failure_threshold, and 3) experimental_emote_failure_window. In this implementation, I have implemented failure strategy for circuit breaker and used failure count to trip the circuit. Reasoning behind using failure count instead of failure rate : To measure failure rate I also need the success count. While both the failure and success count need to be an AtomicInteger as both will be modified concurrently by multiple threads. Even though getAndIncrement is very light weight operation, at very high request it might contribute to latency. Reasoning behind using failure circuit breaker : A new instance of Retrier.CircuitBreaker is created for each build. Therefore, if the circuit breaker trips during a build, the remote cache will be disabled for that build. However, it will be enabled again for the next build as a new instance of Retrier.CircuitBreaker will be created. If needed in the future we may add cool down strategy also. e.g. failure_and_cool_down_startegy. closes https://github.com/bazelbuild/bazel/issues/18136 Closes #18359. PiperOrigin-RevId: 536349954 Change-Id: I5e1c57d4ad0ce07ddc4808bf1f327bc5df6ce704 --- .../google/devtools/build/lib/remote/BUILD | 2 + .../build/lib/remote/GrpcCacheClient.java | 4 + .../build/lib/remote/GrpcRemoteExecutor.java | 4 + .../build/lib/remote/RemoteModule.java | 12 +- .../build/lib/remote/RemoteSpawnRunner.java | 5 +- .../devtools/build/lib/remote/Retrier.java | 34 +- .../build/lib/remote/circuitbreaker/BUILD | 23 ++ .../circuitbreaker/CircuitBreakerFactory.java | 45 +++ .../circuitbreaker/FailureCircuitBreaker.java | 83 +++++ .../remote/options/CommonRemoteOptions.java | 24 ++ .../lib/remote/options/RemoteOptions.java | 49 +++ .../google/devtools/build/lib/remote/BUILD | 3 + .../build/lib/remote/RemoteModuleTest.java | 315 +++++++++--------- .../build/lib/remote/RetrierTest.java | 9 +- .../build/lib/remote/circuitbreaker/BUILD | 31 ++ .../CircuitBreakerFactoryTest.java | 44 +++ .../FailureCircuitBreakerTest.java | 68 ++++ 17 files changed, 579 insertions(+), 176 deletions(-) create mode 100644 src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD create mode 100644 src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactory.java create mode 100644 src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreaker.java create mode 100644 src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD create mode 100644 src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactoryTest.java create mode 100644 src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreakerTest.java diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index 1bce8a80426e37..0f2e76ed83aecf 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -7,6 +7,7 @@ package( filegroup( name = "srcs", srcs = glob(["*"]) + [ + "//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker:srcs", "//src/main/java/com/google/devtools/build/lib/remote/common:srcs", "//src/main/java/com/google/devtools/build/lib/remote/downloader:srcs", "//src/main/java/com/google/devtools/build/lib/remote/disk:srcs", @@ -84,6 +85,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/exec/local", "//src/main/java/com/google/devtools/build/lib/packages/semantics", "//src/main/java/com/google/devtools/build/lib/profiler", + "//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", "//src/main/java/com/google/devtools/build/lib/remote/disk", diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index dfd9c2a61a571c..ce0e917837f0c5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -495,4 +495,8 @@ ListenableFuture uploadChunker( MoreExecutors.directExecutor()); return f; } + + Retrier getRetrier() { + return this.retrier; + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index df3872ebfaeb46..8cd463a84ab74d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -243,4 +243,8 @@ public void close() { } channel.release(); } + + RemoteRetrier getRetrier() { + return this.retrier; + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 417ec5bf104f94..d0afeb2d27e00e 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -62,6 +62,7 @@ import com.google.devtools.build.lib.exec.SpawnStrategyRegistry; import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement; import com.google.devtools.build.lib.remote.ToplevelArtifactsDownloader.PathToMetadataConverter; +import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader; @@ -475,12 +476,11 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { GoogleAuthUtils.newCallCredentialsProvider(credentials); CallCredentials callCredentials = callCredentialsProvider.getCallCredentials(); + Retrier.CircuitBreaker circuitBreaker = + CircuitBreakerFactory.createCircuitBreaker(remoteOptions); RemoteRetrier retrier = new RemoteRetrier( - remoteOptions, - RemoteRetrier.RETRIABLE_GRPC_ERRORS, - retryScheduler, - Retrier.ALLOW_ALL_CALLS); + remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryScheduler, circuitBreaker); // We only check required capabilities for a given endpoint. // @@ -598,7 +598,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, // Handle NOT_FOUND internally retryScheduler, - Retrier.ALLOW_ALL_CALLS); + circuitBreaker); remoteExecutor = new ExperimentalGrpcRemoteExecutor( remoteOptions, execChannel.retain(), callCredentialsProvider, execRetrier); @@ -608,7 +608,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { remoteOptions, RemoteRetrier.RETRIABLE_GRPC_EXEC_ERRORS, retryScheduler, - Retrier.ALLOW_ALL_CALLS); + circuitBreaker); remoteExecutor = new GrpcRemoteExecutor(execChannel.retain(), callCredentialsProvider, execRetrier); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java index 1eba36e9f1d723..32f60bdb6f79f6 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -58,6 +58,7 @@ import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.remote.RemoteExecutionService.RemoteActionResult; import com.google.devtools.build.lib.remote.RemoteExecutionService.ServerLogs; +import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory; import com.google.devtools.build.lib.remote.common.BulkTransferException; import com.google.devtools.build.lib.remote.common.OperationObserver; import com.google.devtools.build.lib.remote.options.RemoteOptions; @@ -660,6 +661,8 @@ private void report(Event evt) { private static RemoteRetrier createExecuteRetrier( RemoteOptions options, ListeningScheduledExecutorService retryService) { return new ExecuteRetrier( - options.remoteMaxRetryAttempts, retryService, Retrier.ALLOW_ALL_CALLS); + options.remoteMaxRetryAttempts, + retryService, + CircuitBreakerFactory.createCircuitBreaker(options)); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java index 4711a06eb9e454..457880268764d5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java @@ -100,7 +100,7 @@ enum State { State state(); /** Called after an execution failed. */ - void recordFailure(); + void recordFailure(Exception e); /** Called after an execution succeeded. */ void recordSuccess(); @@ -130,7 +130,7 @@ public State state() { } @Override - public void recordFailure() {} + public void recordFailure(Exception e) {} @Override public void recordSuccess() {} @@ -245,7 +245,7 @@ public T execute(Callable call, Backoff backoff) throws Exception { circuitBreaker.recordSuccess(); return r; } catch (Exception e) { - circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(e); Throwables.throwIfInstanceOf(e, InterruptedException.class); if (State.TRIAL_CALL.equals(circuitState)) { throw e; @@ -272,19 +272,35 @@ public ListenableFuture executeAsync(AsyncCallable call) { * backoff. */ public ListenableFuture executeAsync(AsyncCallable call, Backoff backoff) { + final State circuitState = circuitBreaker.state(); + if (State.REJECT_CALLS.equals(circuitState)) { + return Futures.immediateFailedFuture(new CircuitBreakerException()); + } try { + ListenableFuture future = + Futures.transformAsync( + call.call(), + (f) -> { + circuitBreaker.recordSuccess(); + return Futures.immediateFuture(f); + }, + MoreExecutors.directExecutor()); return Futures.catchingAsync( - call.call(), + future, Exception.class, - t -> onExecuteAsyncFailure(t, call, backoff), + t -> onExecuteAsyncFailure(t, call, backoff, circuitState), MoreExecutors.directExecutor()); } catch (Exception e) { - return onExecuteAsyncFailure(e, call, backoff); + return onExecuteAsyncFailure(e, call, backoff, circuitState); } } private ListenableFuture onExecuteAsyncFailure( - Exception t, AsyncCallable call, Backoff backoff) { + Exception t, AsyncCallable call, Backoff backoff, State circuitState) { + circuitBreaker.recordFailure(t); + if (circuitState.equals(State.TRIAL_CALL)) { + return Futures.immediateFailedFuture(t); + } if (isRetriable(t)) { long waitMillis = backoff.nextDelayMillis(t); if (waitMillis >= 0) { @@ -310,4 +326,8 @@ public Backoff newBackoff() { public boolean isRetriable(Exception e) { return shouldRetry.test(e); } + + CircuitBreaker getCircuitBreaker() { + return this.circuitBreaker; + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD new file mode 100644 index 00000000000000..caa358ec8bbd44 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD @@ -0,0 +1,23 @@ +load("@rules_java//java:defs.bzl", "java_library") + +package( + default_applicable_licenses = ["//:license"], + default_visibility = ["//src:__subpackages__"], +) + +filegroup( + name = "srcs", + srcs = glob(["*"]), + visibility = ["//src:__subpackages__"], +) + +java_library( + name = "circuitbreaker", + srcs = glob(["*.java"]), + deps = [ + "//src/main/java/com/google/devtools/build/lib/remote:Retrier", + "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", + "//src/main/java/com/google/devtools/build/lib/remote/options", + "//third_party:guava", + ], +) diff --git a/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactory.java b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactory.java new file mode 100644 index 00000000000000..be8835b7c0b2d3 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactory.java @@ -0,0 +1,45 @@ +// Copyright 2023 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.circuitbreaker; + +import com.google.common.collect.ImmutableSet; +import com.google.devtools.build.lib.remote.Retrier; +import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import com.google.devtools.build.lib.remote.options.RemoteOptions; + +/** Factory for {@link Retrier.CircuitBreaker} */ +public class CircuitBreakerFactory { + + public static final ImmutableSet> DEFAULT_IGNORED_ERRORS = + ImmutableSet.of(CacheNotFoundException.class); + + private CircuitBreakerFactory() {} + + /** + * Creates the instance of the {@link Retrier.CircuitBreaker} as per the strategy defined in + * {@link RemoteOptions}. In case of undefined strategy defaults to {@link + * Retrier.ALLOW_ALL_CALLS} implementation. + * + * @param remoteOptions The configuration for the CircuitBreaker implementation. + * @return an instance of CircuitBreaker. + */ + public static Retrier.CircuitBreaker createCircuitBreaker(final RemoteOptions remoteOptions) { + if (remoteOptions.circuitBreakerStrategy == RemoteOptions.CircuitBreakerStrategy.FAILURE) { + return new FailureCircuitBreaker( + remoteOptions.remoteFailureThreshold, + (int) remoteOptions.remoteFailureWindowInterval.toMillis()); + } + return Retrier.ALLOW_ALL_CALLS; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreaker.java b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreaker.java new file mode 100644 index 00000000000000..b1b5739fd44c96 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreaker.java @@ -0,0 +1,83 @@ +// Copyright 2023 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.circuitbreaker; + +import com.google.common.collect.ImmutableSet; +import com.google.devtools.build.lib.remote.Retrier; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The {@link FailureCircuitBreaker} implementation of the {@link Retrier.CircuitBreaker} prevents + * further calls to a remote cache once the number of failures within a given window exceeds a + * specified threshold for a build. In the context of Bazel, a new instance of {@link + * Retrier.CircuitBreaker} is created for each build. Therefore, if the circuit breaker trips during + * a build, the remote cache will be disabled for that build. However, it will be enabled again for + * the next build as a new instance of {@link Retrier.CircuitBreaker} will be created. + */ +public class FailureCircuitBreaker implements Retrier.CircuitBreaker { + + private State state; + private final AtomicInteger failures; + private final int failureThreshold; + private final int slidingWindowSize; + private final ScheduledExecutorService scheduledExecutor; + private final ImmutableSet> ignoredErrors; + + /** + * Creates a {@link FailureCircuitBreaker}. + * + * @param failureThreshold is used to set the number of failures required to trip the circuit + * breaker in given time window. + * @param slidingWindowSize the size of the sliding window in milliseconds to calculate the number + * of failures. + */ + public FailureCircuitBreaker(int failureThreshold, int slidingWindowSize) { + this.failureThreshold = failureThreshold; + this.failures = new AtomicInteger(0); + this.slidingWindowSize = slidingWindowSize; + this.state = State.ACCEPT_CALLS; + this.scheduledExecutor = + slidingWindowSize > 0 ? Executors.newSingleThreadScheduledExecutor() : null; + this.ignoredErrors = CircuitBreakerFactory.DEFAULT_IGNORED_ERRORS; + } + + @Override + public State state() { + return this.state; + } + + @Override + public void recordFailure(Exception e) { + if (!ignoredErrors.contains(e.getClass())) { + int failureCount = failures.incrementAndGet(); + if (slidingWindowSize > 0) { + var unused = + scheduledExecutor.schedule( + failures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS); + } + // Since the state can only be changed to the open state, synchronization is not required. + if (failureCount > this.failureThreshold) { + this.state = State.REJECT_CALLS; + } + } + } + + @Override + public void recordSuccess() { + // do nothing, implement if we need to set threshold on failure rate instead of count. + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/options/CommonRemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/options/CommonRemoteOptions.java index 27da47a8fb2da0..ea18ccd85841fc 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/options/CommonRemoteOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/options/CommonRemoteOptions.java @@ -13,11 +13,16 @@ // limitations under the License. package com.google.devtools.build.lib.remote.options; +import com.google.devtools.common.options.Converter; +import com.google.devtools.common.options.Converters; import com.google.devtools.common.options.Option; import com.google.devtools.common.options.OptionDocumentationCategory; import com.google.devtools.common.options.OptionEffectTag; import com.google.devtools.common.options.OptionsBase; +import com.google.devtools.common.options.OptionsParsingException; +import java.time.Duration; import java.util.List; +import java.util.regex.Pattern; /** Options for remote execution and distributed caching that shared between Bazel and Blaze. */ public class CommonRemoteOptions extends OptionsBase { @@ -33,4 +38,23 @@ public class CommonRemoteOptions extends OptionsBase { + " the client to request certain artifacts that might be needed locally (e.g. IDE" + " support)") public List remoteDownloadRegex; + + /** Returns the specified duration. Assumes seconds if unitless. */ + public static class RemoteDurationConverter extends Converter.Contextless { + + private static final Pattern UNITLESS_REGEX = Pattern.compile("^[0-9]+$"); + + @Override + public Duration convert(String input) throws OptionsParsingException { + if (UNITLESS_REGEX.matcher(input).matches()) { + input += "s"; + } + return new Converters.DurationConverter().convert(input, /* conversionContext= */ null); + } + + @Override + public String getTypeDescription() { + return "An immutable length of time."; + } + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java index 4eeacdb58154ff..61dd3f8749c3e5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java @@ -682,6 +682,43 @@ public RemoteOutputsStrategyConverter() { + " blobs during the build.") public boolean useNewExitCodeForLostInputs; + @Option( + name = "experimental_circuit_breaker_strategy", + documentationCategory = OptionDocumentationCategory.REMOTE, + defaultValue = "null", + effectTags = {OptionEffectTag.EXECUTION}, + converter = CircuitBreakerStrategy.Converter.class, + help = + "Specifies the strategy for the circuit breaker to use. Available strategies are" + + " \"failure\". On invalid value for the option the behavior same as the option is" + + " not set.") + public CircuitBreakerStrategy circuitBreakerStrategy; + + @Option( + name = "experimental_remote_failure_threshold", + defaultValue = "100", + documentationCategory = OptionDocumentationCategory.REMOTE, + effectTags = {OptionEffectTag.EXECUTION}, + help = + "Sets the allowed number of failures in a specific time window after which it stops" + + " calling to the remote cache/executor. By default the value is 100. Setting this" + + " to 0 or negative means no limitation.") + public int remoteFailureThreshold; + + @Option( + name = "experimental_remote_failure_window_interval", + defaultValue = "60s", + documentationCategory = OptionDocumentationCategory.REMOTE, + effectTags = {OptionEffectTag.EXECUTION}, + converter = RemoteDurationConverter.class, + help = + "The interval in which the failure count of the remote requests are computed. On zero or" + + " negative value the failure duration is computed the whole duration of the" + + " execution.Following units can be used: Days (d), hours (h), minutes (m), seconds" + + " (s), and milliseconds (ms). If the unit is omitted, the value is interpreted as" + + " seconds.") + public Duration remoteFailureWindowInterval; + // The below options are not configurable by users, only tests. // This is part of the effort to reduce the overall number of flags. @@ -771,4 +808,16 @@ public boolean shouldPrintMessages(boolean success) { || this == ExecutionMessagePrintMode.ALL); } } + + /** An enum for specifying different strategy for circuit breaker. */ + public enum CircuitBreakerStrategy { + FAILURE; + + /** Converts to {@link CircuitBreakerStrategy}. */ + public static class Converter extends EnumConverter { + public Converter() { + super(CircuitBreakerStrategy.class, "CircuitBreaker strategy"); + } + } + } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD index 299e68e617671c..ce66229109266b 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD @@ -9,6 +9,7 @@ filegroup( name = "srcs", testonly = 0, srcs = glob(["**"]) + [ + "//src/test/java/com/google/devtools/build/lib/remote/circuitbreaker:srcs", "//src/test/java/com/google/devtools/build/lib/remote/downloader:srcs", "//src/test/java/com/google/devtools/build/lib/remote/http:srcs", "//src/test/java/com/google/devtools/build/lib/remote/grpc:srcs", @@ -74,6 +75,8 @@ java_test( "//src/main/java/com/google/devtools/build/lib/pkgcache", "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/build/lib/remote:abstract_action_input_prefetcher", + "//src/main/java/com/google/devtools/build/lib/remote:remote_output_checker", + "//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", "//src/main/java/com/google/devtools/build/lib/remote/disk", diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteModuleTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteModuleTest.java index a56673388d1779..f94953092b1bdc 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteModuleTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteModuleTest.java @@ -42,6 +42,7 @@ import com.google.devtools.build.lib.exec.BinTools; import com.google.devtools.build.lib.exec.ExecutionOptions; import com.google.devtools.build.lib.pkgcache.PackageOptions; +import com.google.devtools.build.lib.remote.circuitbreaker.FailureCircuitBreaker; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.runtime.BlazeRuntime; import com.google.devtools.build.lib.runtime.BlazeServerStartupOptions; @@ -72,6 +73,7 @@ import java.net.URI; import java.time.Duration; import java.util.ArrayList; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -79,7 +81,8 @@ /** Tests for {@link RemoteModule}. */ @RunWith(JUnit4.class) public final class RemoteModuleTest { - + private static final String EXECUTION_SERVER_NAME = "execution-server"; + private static final String CACHE_SERVER_NAME = "cache-server"; private static final ServerCapabilities CACHE_ONLY_CAPS = ServerCapabilities.newBuilder() .setLowApiVersion(ApiVersion.current.toSemVer()) @@ -106,6 +109,32 @@ public final class RemoteModuleTest { CacheCapabilities.newBuilder().addDigestFunctions(Value.SHA256).build()) .build(); + private static final ServerCapabilities EXEC_ONLY_CAPS = + ServerCapabilities.newBuilder() + .setLowApiVersion(ApiVersion.current.toSemVer()) + .setHighApiVersion(ApiVersion.current.toSemVer()) + .setExecutionCapabilities( + ExecutionCapabilities.newBuilder() + .setExecEnabled(true) + .setDigestFunction(Value.SHA256) + .build()) + .build(); + + private static final ServerCapabilities NONE_CAPS = + ServerCapabilities.newBuilder() + .setLowApiVersion(ApiVersion.current.toSemVer()) + .setHighApiVersion(ApiVersion.current.toSemVer()) + .build(); + + private static final CapabilitiesImpl INACCESSIBLE_GRPC_REMOTE = + new CapabilitiesImpl(null) { + @Override + public void getCapabilities( + GetCapabilitiesRequest request, StreamObserver responseObserver) { + responseObserver.onError(new UnsupportedOperationException()); + } + }; + private static CommandEnvironment createTestCommandEnvironment( RemoteModule remoteModule, RemoteOptions remoteOptions) throws IOException, AbruptExitException { @@ -188,21 +217,27 @@ private static Server createFakeServer(String serverName, BindableService... ser .build(); } + private RemoteModule remoteModule; + private RemoteOptions remoteOptions; + + @Before + public void initialize() { + remoteModule = new RemoteModule(); + remoteModule.setChannelFactory( + (target, proxy, options, interceptors) -> + InProcessChannelBuilder.forName(target).directExecutor().build()); + remoteOptions = Options.getDefaults(RemoteOptions.class); + } + @Test public void testVerifyCapabilities_executionAndCacheForSingleEndpoint() throws Exception { CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(EXEC_AND_CACHE_CAPS); - String executionServerName = "execution-server"; - Server executionServer = createFakeServer(executionServerName, executionServerCapabilitiesImpl); + Server executionServer = + createFakeServer(EXECUTION_SERVER_NAME, executionServerCapabilitiesImpl); executionServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); - - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteExecutor = executionServerName; + remoteOptions.remoteExecutor = EXECUTION_SERVER_NAME; CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -210,6 +245,7 @@ public void testVerifyCapabilities_executionAndCacheForSingleEndpoint() throws E assertThat(Thread.interrupted()).isFalse(); assertThat(executionServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); + assertCircuitBreakerInstance(); } finally { executionServer.shutdownNow(); executionServer.awaitTermination(); @@ -219,18 +255,11 @@ public void testVerifyCapabilities_executionAndCacheForSingleEndpoint() throws E @Test public void testVerifyCapabilities_cacheOnlyEndpoint() throws Exception { CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(CACHE_ONLY_CAPS); - String cacheServerName = "cache-server"; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, cacheServerCapabilitiesImpl); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); - - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteCache = CACHE_SERVER_NAME; CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -238,6 +267,7 @@ public void testVerifyCapabilities_cacheOnlyEndpoint() throws Exception { assertThat(Thread.interrupted()).isFalse(); assertThat(cacheServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); + assertCircuitBreakerInstance(); } finally { cacheServer.shutdownNow(); cacheServer.awaitTermination(); @@ -246,37 +276,18 @@ public void testVerifyCapabilities_cacheOnlyEndpoint() throws Exception { @Test public void testVerifyCapabilities_executionAndCacheForDifferentEndpoints() throws Exception { - ServerCapabilities caps = - ServerCapabilities.newBuilder() - .setLowApiVersion(ApiVersion.current.toSemVer()) - .setHighApiVersion(ApiVersion.current.toSemVer()) - .setExecutionCapabilities( - ExecutionCapabilities.newBuilder() - .setExecEnabled(true) - .setDigestFunction(Value.SHA256) - .build()) - .setCacheCapabilities( - CacheCapabilities.newBuilder().addDigestFunctions(Value.SHA256).build()) - .build(); - CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(caps); - String executionServerName = "execution-server"; - Server executionServer = createFakeServer(executionServerName, executionServerCapabilitiesImpl); + CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(EXEC_AND_CACHE_CAPS); + Server executionServer = + createFakeServer(EXECUTION_SERVER_NAME, executionServerCapabilitiesImpl); executionServer.start(); - CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(caps); - String cacheServerName = "cache-server"; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(EXEC_AND_CACHE_CAPS); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, cacheServerCapabilitiesImpl); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); - - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteExecutor = executionServerName; - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteExecutor = EXECUTION_SERVER_NAME; + remoteOptions.remoteCache = CACHE_SERVER_NAME; CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -285,6 +296,7 @@ public void testVerifyCapabilities_executionAndCacheForDifferentEndpoints() thro assertThat(Thread.interrupted()).isFalse(); assertThat(executionServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); assertThat(cacheServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); + assertCircuitBreakerInstance(); } finally { executionServer.shutdownNow(); cacheServer.shutdownNow(); @@ -296,42 +308,18 @@ public void testVerifyCapabilities_executionAndCacheForDifferentEndpoints() thro @Test public void testVerifyCapabilities_executionOnlyAndCacheOnlyEndpoints() throws Exception { - ServerCapabilities executionOnlyCaps = - ServerCapabilities.newBuilder() - .setLowApiVersion(ApiVersion.current.toSemVer()) - .setHighApiVersion(ApiVersion.current.toSemVer()) - .setExecutionCapabilities( - ExecutionCapabilities.newBuilder() - .setExecEnabled(true) - .setDigestFunction(Value.SHA256) - .build()) - .build(); - CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(executionOnlyCaps); - String executionServerName = "execution-server"; - Server executionServer = createFakeServer(executionServerName, executionServerCapabilitiesImpl); + CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(EXEC_ONLY_CAPS); + Server executionServer = + createFakeServer(EXECUTION_SERVER_NAME, executionServerCapabilitiesImpl); executionServer.start(); - ServerCapabilities cacheOnlyCaps = - ServerCapabilities.newBuilder() - .setLowApiVersion(ApiVersion.current.toSemVer()) - .setHighApiVersion(ApiVersion.current.toSemVer()) - .setCacheCapabilities( - CacheCapabilities.newBuilder().addDigestFunctions(Value.SHA256).build()) - .build(); - CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(cacheOnlyCaps); - String cacheServerName = "cache-server"; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(CACHE_ONLY_CAPS); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, cacheServerCapabilitiesImpl); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); - - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteExecutor = executionServerName; - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteExecutor = EXECUTION_SERVER_NAME; + remoteOptions.remoteCache = CACHE_SERVER_NAME; CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -340,6 +328,7 @@ public void testVerifyCapabilities_executionOnlyAndCacheOnlyEndpoints() throws E assertThat(Thread.interrupted()).isFalse(); assertThat(executionServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); assertThat(cacheServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); + assertCircuitBreakerInstance(); } finally { executionServer.shutdownNow(); cacheServer.shutdownNow(); @@ -352,24 +341,13 @@ public void testVerifyCapabilities_executionOnlyAndCacheOnlyEndpoints() throws E @Test public void testLocalFallback_shouldErrorForRemoteCacheWithoutRequiredCapabilities() throws Exception { - ServerCapabilities noneCaps = - ServerCapabilities.newBuilder() - .setLowApiVersion(ApiVersion.current.toSemVer()) - .setHighApiVersion(ApiVersion.current.toSemVer()) - .build(); - CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(noneCaps); - String cacheServerName = "cache-server"; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(NONE_CAPS); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, cacheServerCapabilitiesImpl); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteCache = CACHE_SERVER_NAME; remoteOptions.remoteLocalFallback = true; - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -383,26 +361,12 @@ public void testLocalFallback_shouldErrorForRemoteCacheWithoutRequiredCapabiliti @Test public void testLocalFallback_shouldErrorInaccessibleGrpcRemoteCacheIfFlagNotSet() throws Exception { - String cacheServerName = "cache-server"; - CapabilitiesImplBase cacheServerCapabilitiesImpl = - new CapabilitiesImplBase() { - @Override - public void getCapabilities( - GetCapabilitiesRequest request, StreamObserver responseObserver) { - responseObserver.onError(new UnsupportedOperationException()); - } - }; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, INACCESSIBLE_GRPC_REMOTE); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteCache = CACHE_SERVER_NAME; remoteOptions.remoteLocalFallback = false; - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -415,26 +379,12 @@ public void getCapabilities( @Test public void testLocalFallback_shouldIgnoreInaccessibleGrpcRemoteCache() throws Exception { - String cacheServerName = "cache-server"; - CapabilitiesImplBase cacheServerCapabilitiesImpl = - new CapabilitiesImplBase() { - @Override - public void getCapabilities( - GetCapabilitiesRequest request, StreamObserver responseObserver) { - responseObserver.onError(new UnsupportedOperationException()); - } - }; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, INACCESSIBLE_GRPC_REMOTE); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteCache = CACHE_SERVER_NAME; remoteOptions.remoteLocalFallback = true; - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -445,6 +395,7 @@ public void getCapabilities( assertThat(actionContextProvider).isNotNull(); assertThat(actionContextProvider.getRemoteCache()).isNull(); assertThat(actionContextProvider.getRemoteExecutionClient()).isNull(); + assertCircuitBreakerInstance(); } finally { cacheServer.shutdownNow(); cacheServer.awaitTermination(); @@ -453,26 +404,12 @@ public void getCapabilities( @Test public void testLocalFallback_shouldIgnoreInaccessibleGrpcRemoteExecutor() throws Exception { - CapabilitiesImplBase executionServerCapabilitiesImpl = - new CapabilitiesImplBase() { - @Override - public void getCapabilities( - GetCapabilitiesRequest request, StreamObserver responseObserver) { - responseObserver.onError(new UnsupportedOperationException()); - } - }; - String executionServerName = "execution-server"; - Server executionServer = createFakeServer(executionServerName, executionServerCapabilitiesImpl); + Server executionServer = createFakeServer(EXECUTION_SERVER_NAME, INACCESSIBLE_GRPC_REMOTE); executionServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteExecutor = executionServerName; + remoteOptions.remoteExecutor = EXECUTION_SERVER_NAME; remoteOptions.remoteLocalFallback = true; - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -483,6 +420,7 @@ public void getCapabilities( assertThat(actionContextProvider).isNotNull(); assertThat(actionContextProvider.getRemoteCache()).isNull(); assertThat(actionContextProvider.getRemoteExecutionClient()).isNull(); + assertCircuitBreakerInstance(); } finally { executionServer.shutdownNow(); executionServer.awaitTermination(); @@ -496,8 +434,6 @@ public void testNetrc_netrcWithoutRemoteCache() throws Exception { Scratch scratch = new Scratch(fileSystem); scratch.file(netrc, "machine foo.example.org login baruser password barpass"); AuthAndTLSOptions authAndTLSOptions = Options.getDefaults(AuthAndTLSOptions.class); - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - Cache>> credentialCache = Caffeine.newBuilder().build(); @@ -523,18 +459,11 @@ public void testNetrc_netrcWithoutRemoteCache() throws Exception { @Test public void testCacheCapabilities_propagatedToRemoteCache() throws Exception { CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(CACHE_ONLY_CAPS); - String cacheServerName = "cache-server"; - Server cacheServer = createFakeServer(cacheServerName, cacheServerCapabilitiesImpl); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, cacheServerCapabilitiesImpl); cacheServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); - - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteCache = cacheServerName; + remoteOptions.remoteCache = CACHE_SERVER_NAME; CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -555,18 +484,12 @@ public void testCacheCapabilities_propagatedToRemoteCache() throws Exception { @Test public void testCacheCapabilities_propagatedToRemoteExecutionCache() throws Exception { CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(EXEC_AND_CACHE_CAPS); - String executionServerName = "execution-server"; - Server executionServer = createFakeServer(executionServerName, executionServerCapabilitiesImpl); + Server executionServer = + createFakeServer(EXECUTION_SERVER_NAME, executionServerCapabilitiesImpl); executionServer.start(); try { - RemoteModule remoteModule = new RemoteModule(); - remoteModule.setChannelFactory( - (target, proxy, options, interceptors) -> - InProcessChannelBuilder.forName(target).directExecutor().build()); - - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteExecutor = executionServerName; + remoteOptions.remoteExecutor = EXECUTION_SERVER_NAME; CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); @@ -583,4 +506,80 @@ public void testCacheCapabilities_propagatedToRemoteExecutionCache() throws Exce executionServer.awaitTermination(); } } + + @Test + public void testVerifyCapabilities_executionAndCacheForSingleEndpointWithCircuitBreaker() + throws Exception { + CapabilitiesImpl executionServerCapabilitiesImpl = new CapabilitiesImpl(EXEC_AND_CACHE_CAPS); + Server executionServer = + createFakeServer(EXECUTION_SERVER_NAME, executionServerCapabilitiesImpl); + executionServer.start(); + + try { + remoteOptions.remoteExecutor = EXECUTION_SERVER_NAME; + remoteOptions.circuitBreakerStrategy = RemoteOptions.CircuitBreakerStrategy.FAILURE; + + CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); + + remoteModule.beforeCommand(env); + + assertThat(Thread.interrupted()).isFalse(); + assertThat(executionServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); + assertCircuitBreakerInstance(); + } finally { + executionServer.shutdownNow(); + executionServer.awaitTermination(); + } + } + + @Test + public void testVerifyCapabilities_cacheOnlyEndpointWithCircuitBreaker() throws Exception { + CapabilitiesImpl cacheServerCapabilitiesImpl = new CapabilitiesImpl(CACHE_ONLY_CAPS); + Server cacheServer = createFakeServer(CACHE_SERVER_NAME, cacheServerCapabilitiesImpl); + cacheServer.start(); + + try { + remoteOptions.remoteCache = CACHE_SERVER_NAME; + remoteOptions.circuitBreakerStrategy = RemoteOptions.CircuitBreakerStrategy.FAILURE; + + CommandEnvironment env = createTestCommandEnvironment(remoteModule, remoteOptions); + + remoteModule.beforeCommand(env); + + assertThat(Thread.interrupted()).isFalse(); + assertThat(cacheServerCapabilitiesImpl.getRequestCount()).isEqualTo(1); + assertCircuitBreakerInstance(); + } finally { + cacheServer.shutdownNow(); + cacheServer.awaitTermination(); + } + } + + private void assertCircuitBreakerInstance() { + RemoteActionContextProvider actionContextProvider = remoteModule.getActionContextProvider(); + assertThat(actionContextProvider).isNotNull(); + + Retrier.CircuitBreaker circuitBreaker; + if (actionContextProvider.getRemoteCache() != null) { + circuitBreaker = + ((GrpcCacheClient) actionContextProvider.getRemoteCache().cacheProtocol) + .getRetrier() + .getCircuitBreaker(); + } else if (actionContextProvider.getRemoteExecutionClient() != null) { + circuitBreaker = + ((GrpcRemoteExecutor) actionContextProvider.getRemoteExecutionClient()) + .getRetrier() + .getCircuitBreaker(); + } else { + // no remote cache or execution configured, circuitBreaker is null + return; + } + + if (remoteOptions.circuitBreakerStrategy == RemoteOptions.CircuitBreakerStrategy.FAILURE) { + assertThat(circuitBreaker).isInstanceOf(FailureCircuitBreaker.class); + } + if (remoteOptions.circuitBreakerStrategy == null) { + assertThat(circuitBreaker).isEqualTo(Retrier.ALLOW_ALL_CALLS); + } + } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java index a739309c21b2c1..7c30e1bf6eddc3 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java @@ -16,6 +16,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -93,7 +94,7 @@ public void retryShouldWork_failure() throws Exception { assertThat(e).hasMessageThat().isEqualTo("call failed"); assertThat(numCalls.get()).isEqualTo(3); - verify(alwaysOpen, times(3)).recordFailure(); + verify(alwaysOpen, times(3)).recordFailure(any(Exception.class)); verify(alwaysOpen, never()).recordSuccess(); } @@ -117,7 +118,7 @@ public void retryShouldWorkNoRetries_failure() throws Exception { assertThat(e).hasMessageThat().isEqualTo("call failed"); assertThat(numCalls.get()).isEqualTo(1); - verify(alwaysOpen, times(1)).recordFailure(); + verify(alwaysOpen, times(1)).recordFailure(e); verify(alwaysOpen, never()).recordSuccess(); } @@ -138,7 +139,7 @@ public void retryShouldWork_success() throws Exception { }); assertThat(val).isEqualTo(1); - verify(alwaysOpen, times(2)).recordFailure(); + verify(alwaysOpen, times(2)).recordFailure(any(Exception.class)); verify(alwaysOpen, times(1)).recordSuccess(); } @@ -350,7 +351,7 @@ public synchronized State state() { } @Override - public synchronized void recordFailure() { + public synchronized void recordFailure(Exception e) { consecutiveFailures++; if (consecutiveFailures >= maxConsecutiveFailures) { state = State.REJECT_CALLS; diff --git a/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD new file mode 100644 index 00000000000000..987a81bbcbadaa --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/BUILD @@ -0,0 +1,31 @@ +load("@rules_java//java:defs.bzl", "java_test") + +package( + default_applicable_licenses = ["//:license"], + default_testonly = 1, + default_visibility = ["//src:__subpackages__"], +) + +filegroup( + name = "srcs", + testonly = 0, + srcs = glob(["**"]), + visibility = ["//src:__subpackages__"], +) + +java_test( + name = "circuitbreaker", + srcs = glob(["*.java"]), + test_class = "com.google.devtools.build.lib.AllTests", + deps = [ + "//src/main/java/com/google/devtools/build/lib/remote", + "//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker", + "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", + "//src/main/java/com/google/devtools/build/lib/remote/options", + "//src/main/java/com/google/devtools/common/options", + "//src/test/java/com/google/devtools/build/lib:test_runner", + "//third_party:junit4", + "//third_party:truth", + "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", + ], +) diff --git a/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactoryTest.java b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactoryTest.java new file mode 100644 index 00000000000000..51baee1162f2eb --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactoryTest.java @@ -0,0 +1,44 @@ +// Copyright 2023 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.circuitbreaker; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.devtools.build.lib.remote.Retrier; +import com.google.devtools.build.lib.remote.options.RemoteOptions; +import com.google.devtools.build.lib.remote.options.RemoteOptions.CircuitBreakerStrategy; +import com.google.devtools.common.options.Options; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link CircuitBreakerFactory}. */ +@RunWith(JUnit4.class) +public class CircuitBreakerFactoryTest { + @Test + public void testCreateCircuitBreaker_failureStrategy() { + RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); + remoteOptions.circuitBreakerStrategy = CircuitBreakerStrategy.FAILURE; + + assertThat(CircuitBreakerFactory.createCircuitBreaker(remoteOptions)) + .isInstanceOf(FailureCircuitBreaker.class); + } + + @Test + public void testCreateCircuitBreaker_nullStrategy() { + RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); + assertThat(CircuitBreakerFactory.createCircuitBreaker(remoteOptions)) + .isEqualTo(Retrier.ALLOW_ALL_CALLS); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreakerTest.java b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreakerTest.java new file mode 100644 index 00000000000000..2d00a8e0e816ab --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreakerTest.java @@ -0,0 +1,68 @@ +// Copyright 2023 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.circuitbreaker; + +import static com.google.common.truth.Truth.assertThat; + +import build.bazel.remote.execution.v2.Digest; +import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State; +import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class FailureCircuitBreakerTest { + + @Test + public void testRecordFailure() throws InterruptedException { + final int failureThreshold = 10; + final int windowInterval = 100; + FailureCircuitBreaker failureCircuitBreaker = + new FailureCircuitBreaker(failureThreshold, windowInterval); + + List listOfExceptionThrownOnFailure = new ArrayList<>(); + for (int index = 0; index < failureThreshold; index++) { + listOfExceptionThrownOnFailure.add(new Exception()); + } + for (int index = 0; index < failureThreshold * 9; index++) { + listOfExceptionThrownOnFailure.add(new CacheNotFoundException(Digest.newBuilder().build())); + } + + Collections.shuffle(listOfExceptionThrownOnFailure); + + // make calls equals to threshold number of not ignored failure calls in parallel. + listOfExceptionThrownOnFailure.stream() + .parallel() + .forEach(failureCircuitBreaker::recordFailure); + assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS); + + // Sleep for windowInterval + 1ms. + Thread.sleep(windowInterval + 1 /*to compensate any delay*/); + + // make calls equals to threshold number of not ignored failure calls in parallel. + listOfExceptionThrownOnFailure.stream() + .parallel() + .forEach(failureCircuitBreaker::recordFailure); + assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS); + + // Sleep for less than windowInterval. + Thread.sleep(windowInterval - 5); + failureCircuitBreaker.recordFailure(new Exception()); + assertThat(failureCircuitBreaker.state()).isEqualTo(State.REJECT_CALLS); + } +} From 976faf0327694b1565f0d230f39fdfe039c2f45f Mon Sep 17 00:00:00 2001 From: Anshuman Mishra Date: Tue, 30 May 2023 16:08:39 -0700 Subject: [PATCH 2/3] remove target included in cherry-pick by mistake --- src/test/java/com/google/devtools/build/lib/remote/BUILD | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD index ce66229109266b..bb054fa51e0f18 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD @@ -75,7 +75,6 @@ java_test( "//src/main/java/com/google/devtools/build/lib/pkgcache", "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/build/lib/remote:abstract_action_input_prefetcher", - "//src/main/java/com/google/devtools/build/lib/remote:remote_output_checker", "//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", From d43cba76c713fa49d0fa057e531e4a39245027cb Mon Sep 17 00:00:00 2001 From: Anshuman Mishra Date: Mon, 22 May 2023 17:16:18 -0700 Subject: [PATCH 3/3] Use failure_rate instead of failure count for circuit breaker --- .../circuitbreaker/CircuitBreakerFactory.java | 3 +- .../circuitbreaker/FailureCircuitBreaker.java | 50 +++++++++++++------ .../lib/remote/options/RemoteOptions.java | 15 +++--- .../FailureCircuitBreakerTest.java | 42 ++++++++++++---- 4 files changed, 76 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactory.java b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactory.java index be8835b7c0b2d3..6ab6b4258d2cbd 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactory.java @@ -23,6 +23,7 @@ public class CircuitBreakerFactory { public static final ImmutableSet> DEFAULT_IGNORED_ERRORS = ImmutableSet.of(CacheNotFoundException.class); + public static final int DEFAULT_MIN_CALL_COUNT_TO_COMPUTE_FAILURE_RATE = 100; private CircuitBreakerFactory() {} @@ -37,7 +38,7 @@ private CircuitBreakerFactory() {} public static Retrier.CircuitBreaker createCircuitBreaker(final RemoteOptions remoteOptions) { if (remoteOptions.circuitBreakerStrategy == RemoteOptions.CircuitBreakerStrategy.FAILURE) { return new FailureCircuitBreaker( - remoteOptions.remoteFailureThreshold, + remoteOptions.remoteFailureRateThreshold, (int) remoteOptions.remoteFailureWindowInterval.toMillis()); } return Retrier.ALLOW_ALL_CALLS; diff --git a/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreaker.java b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreaker.java index b1b5739fd44c96..2baeba4ed07f3d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreaker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreaker.java @@ -21,34 +21,38 @@ import java.util.concurrent.atomic.AtomicInteger; /** - * The {@link FailureCircuitBreaker} implementation of the {@link Retrier.CircuitBreaker} prevents - * further calls to a remote cache once the number of failures within a given window exceeds a - * specified threshold for a build. In the context of Bazel, a new instance of {@link - * Retrier.CircuitBreaker} is created for each build. Therefore, if the circuit breaker trips during - * a build, the remote cache will be disabled for that build. However, it will be enabled again for - * the next build as a new instance of {@link Retrier.CircuitBreaker} will be created. + * The {@link FailureCircuitBreaker} implementation of the {@link Retrier.CircuitBreaker} prevents further calls to + * a remote cache once the failures rate within a given window exceeds a specified threshold for a build. + * In the context of Bazel, a new instance of {@link Retrier.CircuitBreaker} is created for each build. Therefore, if + * the circuit breaker trips during a build, the remote cache will be disabled for that build. However, it will be + * enabled again for the next build as a new instance of {@link Retrier.CircuitBreaker} will be created. */ public class FailureCircuitBreaker implements Retrier.CircuitBreaker { private State state; + private final AtomicInteger successes; private final AtomicInteger failures; - private final int failureThreshold; + private final AtomicInteger ignoredFailures; + private final int failureRateThreshold; private final int slidingWindowSize; + private final int minCallCountToComputeFailureRate; private final ScheduledExecutorService scheduledExecutor; private final ImmutableSet> ignoredErrors; /** * Creates a {@link FailureCircuitBreaker}. * - * @param failureThreshold is used to set the number of failures required to trip the circuit - * breaker in given time window. - * @param slidingWindowSize the size of the sliding window in milliseconds to calculate the number - * of failures. + * @param failureRateThreshold is used to set the min percentage of failure required to trip the circuit breaker in + * given time window. + * @param slidingWindowSize the size of the sliding window in milliseconds to calculate the number of failures. */ - public FailureCircuitBreaker(int failureThreshold, int slidingWindowSize) { - this.failureThreshold = failureThreshold; + public FailureCircuitBreaker(int failureRateThreshold, int slidingWindowSize) { this.failures = new AtomicInteger(0); + this.successes = new AtomicInteger(0); + this.ignoredFailures = new AtomicInteger(0); + this.failureRateThreshold = failureRateThreshold; this.slidingWindowSize = slidingWindowSize; + this.minCallCountToComputeFailureRate = CircuitBreakerFactory.DEFAULT_MIN_CALL_COUNT_TO_COMPUTE_FAILURE_RATE; this.state = State.ACCEPT_CALLS; this.scheduledExecutor = slidingWindowSize > 0 ? Executors.newSingleThreadScheduledExecutor() : null; @@ -64,20 +68,36 @@ public State state() { public void recordFailure(Exception e) { if (!ignoredErrors.contains(e.getClass())) { int failureCount = failures.incrementAndGet(); + int totalCallCount = successes.get() + failureCount + ignoredFailures.get(); if (slidingWindowSize > 0) { var unused = scheduledExecutor.schedule( failures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS); } + + if (totalCallCount < minCallCountToComputeFailureRate) { + // The remote call count is below the threshold required to calculate the failure rate. + return; + } + double failureRate = (failureCount * 100.0) / totalCallCount; + // Since the state can only be changed to the open state, synchronization is not required. - if (failureCount > this.failureThreshold) { + if (failureRate > this.failureRateThreshold) { this.state = State.REJECT_CALLS; } + } else { + ignoredFailures.incrementAndGet(); + if (slidingWindowSize > 0) { + scheduledExecutor.schedule(ignoredFailures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS); + } } } @Override public void recordSuccess() { - // do nothing, implement if we need to set threshold on failure rate instead of count. + successes.incrementAndGet(); + if (slidingWindowSize > 0) { + scheduledExecutor.schedule(successes::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS); + } } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java index 61dd3f8749c3e5..793eccdfa51756 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java @@ -695,15 +695,16 @@ public RemoteOutputsStrategyConverter() { public CircuitBreakerStrategy circuitBreakerStrategy; @Option( - name = "experimental_remote_failure_threshold", - defaultValue = "100", + name = "experimental_remote_failure_rate_threshold", + defaultValue = "10", documentationCategory = OptionDocumentationCategory.REMOTE, effectTags = {OptionEffectTag.EXECUTION}, + converter = Converters.PercentageConverter.class, help = - "Sets the allowed number of failures in a specific time window after which it stops" - + " calling to the remote cache/executor. By default the value is 100. Setting this" - + " to 0 or negative means no limitation.") - public int remoteFailureThreshold; + "Sets the allowed number of failure rate in percentage for a specific time window after which it stops " + + "calling to the remote cache/executor. By default the value is 10. Setting this to 0 means no " + + "limitation.") + public int remoteFailureRateThreshold; @Option( name = "experimental_remote_failure_window_interval", @@ -712,7 +713,7 @@ public RemoteOutputsStrategyConverter() { effectTags = {OptionEffectTag.EXECUTION}, converter = RemoteDurationConverter.class, help = - "The interval in which the failure count of the remote requests are computed. On zero or" + "The interval in which the failure rate of the remote requests are computed. On zero or" + " negative value the failure duration is computed the whole duration of the" + " execution.Following units can be used: Days (d), hours (h), minutes (m), seconds" + " (s), and milliseconds (ms). If the unit is omitted, the value is interpreted as" diff --git a/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreakerTest.java b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreakerTest.java index 2d00a8e0e816ab..502c49ddff6225 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreakerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreakerTest.java @@ -18,46 +18,46 @@ import build.bazel.remote.execution.v2.Digest; import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; + import java.util.ArrayList; import java.util.Collections; import java.util.List; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.util.stream.IntStream; + @RunWith(JUnit4.class) public class FailureCircuitBreakerTest { @Test - public void testRecordFailure() throws InterruptedException { - final int failureThreshold = 10; + public void testRecordFailure_withIgnoredErrors() throws InterruptedException { + final int failureRateThreshold = 10; final int windowInterval = 100; FailureCircuitBreaker failureCircuitBreaker = - new FailureCircuitBreaker(failureThreshold, windowInterval); + new FailureCircuitBreaker(failureRateThreshold, windowInterval); List listOfExceptionThrownOnFailure = new ArrayList<>(); - for (int index = 0; index < failureThreshold; index++) { + for (int index = 0; index < failureRateThreshold; index++) { listOfExceptionThrownOnFailure.add(new Exception()); } - for (int index = 0; index < failureThreshold * 9; index++) { + for (int index = 0; index < failureRateThreshold * 9; index++) { listOfExceptionThrownOnFailure.add(new CacheNotFoundException(Digest.newBuilder().build())); } Collections.shuffle(listOfExceptionThrownOnFailure); // make calls equals to threshold number of not ignored failure calls in parallel. - listOfExceptionThrownOnFailure.stream() - .parallel() - .forEach(failureCircuitBreaker::recordFailure); + listOfExceptionThrownOnFailure.stream().parallel().forEach(failureCircuitBreaker::recordFailure); assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS); // Sleep for windowInterval + 1ms. Thread.sleep(windowInterval + 1 /*to compensate any delay*/); // make calls equals to threshold number of not ignored failure calls in parallel. - listOfExceptionThrownOnFailure.stream() - .parallel() - .forEach(failureCircuitBreaker::recordFailure); + listOfExceptionThrownOnFailure.stream().parallel().forEach(failureCircuitBreaker::recordFailure); assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS); // Sleep for less than windowInterval. @@ -65,4 +65,24 @@ public void testRecordFailure() throws InterruptedException { failureCircuitBreaker.recordFailure(new Exception()); assertThat(failureCircuitBreaker.state()).isEqualTo(State.REJECT_CALLS); } + + @Test + public void testRecordFailure_minCallCriteriaNotMet() throws InterruptedException { + final int failureRateThreshold = 10; + final int windowInterval = 100; + final int minCallToComputeFailure = CircuitBreakerFactory.DEFAULT_MIN_CALL_COUNT_TO_COMPUTE_FAILURE_RATE; + FailureCircuitBreaker failureCircuitBreaker = + new FailureCircuitBreaker(failureRateThreshold, windowInterval); + + // make half failure call, half success call and number of total call less than minCallToComputeFailure. + IntStream.range(0, minCallToComputeFailure >> 1).parallel() + .forEach(i -> failureCircuitBreaker.recordFailure(new Exception())); + IntStream.range(0, minCallToComputeFailure >> 1).parallel().forEach(i -> failureCircuitBreaker.recordSuccess()); + assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS); + + // Sleep for less than windowInterval. + Thread.sleep(windowInterval - 20); + failureCircuitBreaker.recordFailure(new Exception()); + assertThat(failureCircuitBreaker.state()).isEqualTo(State.REJECT_CALLS); + } }