Skip to content

Commit

Permalink
feat: Implement failure circuit breaker
Browse files Browse the repository at this point in the history
Copy of #18120: I accidentally closed #18120 during rebase and doesn't have permission to reopen.

### Issue
We have noticed that any problems with the remote cache have a detrimental effect on build times. On investigation we found that the interface for the circuit breaker was left unimplemented.

### Solution
To address this issue, implemented a failure circuit breaker, which includes three new Bazel flags: 1) experimental_circuitbreaker_strategy, 2) experimental_remote_failure_threshold, and 3) experimental_emote_failure_window.

In this implementation, I have implemented failure strategy for circuit breaker and used failure count to trip the circuit.

Reasoning behind using failure count instead of failure rate : To measure failure rate I also need the success count. While both the failure and success count need to be an AtomicInteger as both will be modified concurrently by multiple threads. Even though getAndIncrement is very light weight operation, at very high request it might contribute to latency.

Reasoning behind using failure circuit breaker : A new instance of Retrier.CircuitBreaker is created for each build. Therefore, if the circuit breaker trips during a build, the remote cache will be disabled for that build. However, it will be enabled again
for the next build as a new instance of Retrier.CircuitBreaker will be created. If needed in the future we may add cool down strategy also. e.g. failure_and_cool_down_startegy.

closes #18136

Closes #18359.

PiperOrigin-RevId: 536349954
Change-Id: I5e1c57d4ad0ce07ddc4808bf1f327bc5df6ce704
  • Loading branch information
amishra-u authored and copybara-github committed May 30, 2023
1 parent 468c056 commit 5575ff2
Show file tree
Hide file tree
Showing 16 changed files with 554 additions and 176 deletions.
2 changes: 2 additions & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package(
filegroup(
name = "srcs",
srcs = glob(["*"]) + [
"//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/common:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/disk:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/downloader:srcs",
Expand Down Expand Up @@ -85,6 +86,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/exec/local",
"//src/main/java/com/google/devtools/build/lib/packages/semantics",
"//src/main/java/com/google/devtools/build/lib/profiler",
"//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker",
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,4 +500,8 @@ ListenableFuture<Void> uploadChunker(
MoreExecutors.directExecutor());
return f;
}

Retrier getRetrier() {
return this.retrier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,8 @@ public void close() {
}
channel.release();
}

RemoteRetrier getRetrier() {
return this.retrier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import com.google.devtools.build.lib.exec.ModuleActionContextRegistry;
import com.google.devtools.build.lib.exec.SpawnStrategyRegistry;
import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement;
import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader;
Expand Down Expand Up @@ -510,12 +511,11 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
GoogleAuthUtils.newCallCredentialsProvider(credentials);
CallCredentials callCredentials = callCredentialsProvider.getCallCredentials();

Retrier.CircuitBreaker circuitBreaker =
CircuitBreakerFactory.createCircuitBreaker(remoteOptions);
RemoteRetrier retrier =
new RemoteRetrier(
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_ERRORS,
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryScheduler, circuitBreaker);

// We only check required capabilities for a given endpoint.
//
Expand Down Expand Up @@ -636,7 +636,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_ERRORS, // Handle NOT_FOUND internally
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
circuitBreaker);
remoteExecutor =
new ExperimentalGrpcRemoteExecutor(
executionCapabilities,
Expand All @@ -650,7 +650,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions,
RemoteRetrier.RETRIABLE_GRPC_EXEC_ERRORS,
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
circuitBreaker);
remoteExecutor =
new GrpcRemoteExecutor(
executionCapabilities, execChannel.retain(), callCredentialsProvider, execRetrier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.RemoteExecutionService.RemoteActionResult;
import com.google.devtools.build.lib.remote.RemoteExecutionService.ServerLogs;
import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.OperationObserver;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
Expand Down Expand Up @@ -655,6 +656,8 @@ private void report(Event evt) {
private static RemoteRetrier createExecuteRetrier(
RemoteOptions options, ListeningScheduledExecutorService retryService) {
return new ExecuteRetrier(
options.remoteMaxRetryAttempts, retryService, Retrier.ALLOW_ALL_CALLS);
options.remoteMaxRetryAttempts,
retryService,
CircuitBreakerFactory.createCircuitBreaker(options));
}
}
34 changes: 27 additions & 7 deletions src/main/java/com/google/devtools/build/lib/remote/Retrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ enum State {
State state();

/** Called after an execution failed. */
void recordFailure();
void recordFailure(Exception e);

/** Called after an execution succeeded. */
void recordSuccess();
Expand Down Expand Up @@ -130,7 +130,7 @@ public State state() {
}

@Override
public void recordFailure() {}
public void recordFailure(Exception e) {}

@Override
public void recordSuccess() {}
Expand Down Expand Up @@ -245,7 +245,7 @@ public <T> T execute(Callable<T> call, Backoff backoff) throws Exception {
circuitBreaker.recordSuccess();
return r;
} catch (Exception e) {
circuitBreaker.recordFailure();
circuitBreaker.recordFailure(e);
Throwables.throwIfInstanceOf(e, InterruptedException.class);
if (State.TRIAL_CALL.equals(circuitState)) {
throw e;
Expand All @@ -272,19 +272,35 @@ public <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call) {
* backoff.
*/
public <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call, Backoff backoff) {
final State circuitState = circuitBreaker.state();
if (State.REJECT_CALLS.equals(circuitState)) {
return Futures.immediateFailedFuture(new CircuitBreakerException());
}
try {
ListenableFuture<T> future =
Futures.transformAsync(
call.call(),
(f) -> {
circuitBreaker.recordSuccess();
return Futures.immediateFuture(f);
},
MoreExecutors.directExecutor());
return Futures.catchingAsync(
call.call(),
future,
Exception.class,
t -> onExecuteAsyncFailure(t, call, backoff),
t -> onExecuteAsyncFailure(t, call, backoff, circuitState),
MoreExecutors.directExecutor());
} catch (Exception e) {
return onExecuteAsyncFailure(e, call, backoff);
return onExecuteAsyncFailure(e, call, backoff, circuitState);
}
}

private <T> ListenableFuture<T> onExecuteAsyncFailure(
Exception t, AsyncCallable<T> call, Backoff backoff) {
Exception t, AsyncCallable<T> call, Backoff backoff, State circuitState) {
circuitBreaker.recordFailure(t);
if (circuitState.equals(State.TRIAL_CALL)) {
return Futures.immediateFailedFuture(t);
}
if (isRetriable(t)) {
long waitMillis = backoff.nextDelayMillis(t);
if (waitMillis >= 0) {
Expand All @@ -310,4 +326,8 @@ public Backoff newBackoff() {
public boolean isRetriable(Exception e) {
return shouldRetry.test(e);
}

CircuitBreaker getCircuitBreaker() {
return this.circuitBreaker;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@rules_java//java:defs.bzl", "java_library")

package(
default_applicable_licenses = ["//:license"],
default_visibility = ["//src:__subpackages__"],
)

filegroup(
name = "srcs",
srcs = glob(["*"]),
visibility = ["//src:__subpackages__"],
)

java_library(
name = "circuitbreaker",
srcs = glob(["*.java"]),
deps = [
"//src/main/java/com/google/devtools/build/lib/remote:Retrier",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/remote/options",
"//third_party:guava",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.circuitbreaker;

import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.remote.Retrier;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.options.RemoteOptions;

/** Factory for {@link Retrier.CircuitBreaker} */
public class CircuitBreakerFactory {

public static final ImmutableSet<Class<? extends Exception>> DEFAULT_IGNORED_ERRORS =
ImmutableSet.of(CacheNotFoundException.class);

private CircuitBreakerFactory() {}

/**
* Creates the instance of the {@link Retrier.CircuitBreaker} as per the strategy defined in
* {@link RemoteOptions}. In case of undefined strategy defaults to {@link
* Retrier.ALLOW_ALL_CALLS} implementation.
*
* @param remoteOptions The configuration for the CircuitBreaker implementation.
* @return an instance of CircuitBreaker.
*/
public static Retrier.CircuitBreaker createCircuitBreaker(final RemoteOptions remoteOptions) {
if (remoteOptions.circuitBreakerStrategy == RemoteOptions.CircuitBreakerStrategy.FAILURE) {
return new FailureCircuitBreaker(
remoteOptions.remoteFailureThreshold,
(int) remoteOptions.remoteFailureWindowInterval.toMillis());
}
return Retrier.ALLOW_ALL_CALLS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.circuitbreaker;

import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.remote.Retrier;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* The {@link FailureCircuitBreaker} implementation of the {@link Retrier.CircuitBreaker} prevents
* further calls to a remote cache once the number of failures within a given window exceeds a
* specified threshold for a build. In the context of Bazel, a new instance of {@link
* Retrier.CircuitBreaker} is created for each build. Therefore, if the circuit breaker trips during
* a build, the remote cache will be disabled for that build. However, it will be enabled again for
* the next build as a new instance of {@link Retrier.CircuitBreaker} will be created.
*/
public class FailureCircuitBreaker implements Retrier.CircuitBreaker {

private State state;
private final AtomicInteger failures;
private final int failureThreshold;
private final int slidingWindowSize;
private final ScheduledExecutorService scheduledExecutor;
private final ImmutableSet<Class<? extends Exception>> ignoredErrors;

/**
* Creates a {@link FailureCircuitBreaker}.
*
* @param failureThreshold is used to set the number of failures required to trip the circuit
* breaker in given time window.
* @param slidingWindowSize the size of the sliding window in milliseconds to calculate the number
* of failures.
*/
public FailureCircuitBreaker(int failureThreshold, int slidingWindowSize) {
this.failureThreshold = failureThreshold;
this.failures = new AtomicInteger(0);
this.slidingWindowSize = slidingWindowSize;
this.state = State.ACCEPT_CALLS;
this.scheduledExecutor =
slidingWindowSize > 0 ? Executors.newSingleThreadScheduledExecutor() : null;
this.ignoredErrors = CircuitBreakerFactory.DEFAULT_IGNORED_ERRORS;
}

@Override
public State state() {
return this.state;
}

@Override
public void recordFailure(Exception e) {
if (!ignoredErrors.contains(e.getClass())) {
int failureCount = failures.incrementAndGet();
if (slidingWindowSize > 0) {
var unused =
scheduledExecutor.schedule(
failures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS);
}
// Since the state can only be changed to the open state, synchronization is not required.
if (failureCount > this.failureThreshold) {
this.state = State.REJECT_CALLS;
}
}
}

@Override
public void recordSuccess() {
// do nothing, implement if we need to set threshold on failure rate instead of count.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,43 @@ public RemoteOutputsStrategyConverter() {
+ "cache misses and retries.")
public boolean remoteDiscardMerkleTrees;

@Option(
name = "experimental_circuit_breaker_strategy",
documentationCategory = OptionDocumentationCategory.REMOTE,
defaultValue = "null",
effectTags = {OptionEffectTag.EXECUTION},
converter = CircuitBreakerStrategy.Converter.class,
help =
"Specifies the strategy for the circuit breaker to use. Available strategies are"
+ " \"failure\". On invalid value for the option the behavior same as the option is"
+ " not set.")
public CircuitBreakerStrategy circuitBreakerStrategy;

@Option(
name = "experimental_remote_failure_threshold",
defaultValue = "100",
documentationCategory = OptionDocumentationCategory.REMOTE,
effectTags = {OptionEffectTag.EXECUTION},
help =
"Sets the allowed number of failures in a specific time window after which it stops"
+ " calling to the remote cache/executor. By default the value is 100. Setting this"
+ " to 0 or negative means no limitation.")
public int remoteFailureThreshold;

@Option(
name = "experimental_remote_failure_window_interval",
defaultValue = "60s",
documentationCategory = OptionDocumentationCategory.REMOTE,
effectTags = {OptionEffectTag.EXECUTION},
converter = RemoteDurationConverter.class,
help =
"The interval in which the failure count of the remote requests are computed. On zero or"
+ " negative value the failure duration is computed the whole duration of the"
+ " execution.Following units can be used: Days (d), hours (h), minutes (m), seconds"
+ " (s), and milliseconds (ms). If the unit is omitted, the value is interpreted as"
+ " seconds.")
public Duration remoteFailureWindowInterval;

// The below options are not configurable by users, only tests.
// This is part of the effort to reduce the overall number of flags.

Expand Down Expand Up @@ -749,4 +786,16 @@ public boolean shouldPrintMessages(boolean success) {
|| this == ExecutionMessagePrintMode.ALL);
}
}

/** An enum for specifying different strategy for circuit breaker. */
public enum CircuitBreakerStrategy {
FAILURE;

/** Converts to {@link CircuitBreakerStrategy}. */
public static class Converter extends EnumConverter<CircuitBreakerStrategy> {
public Converter() {
super(CircuitBreakerStrategy.class, "CircuitBreaker strategy");
}
}
}
}
Loading

0 comments on commit 5575ff2

Please sign in to comment.