Skip to content

Commit

Permalink
Remote: Fix crashes by InterruptedException when dynamic execution is…
Browse files Browse the repository at this point in the history
… enabled.

Fixes #14433.

The root cause is, inside `RemoteExecutionCache`, the result of `FindMissingDigests` is shared with other threads without considering error handling. For example, if there are two or more threads uploading the same input and one thread got interrupted when waiting for the result of `FindMissingDigests` call, the call is cancelled and others threads still waiting for the upload will receive upload error due to the cancellation which is wrong.

This PR fixes this by effectively applying reference count to the result of `FindMissingDigests` call so that if one thread got interrupted, as long as there are other threads depending on the result, the call won't be cancelled and the upload can continue.

Closes #15001.

PiperOrigin-RevId: 436180205
  • Loading branch information
coeuvre authored and copybara-github committed Mar 21, 2022
1 parent ccbcefd commit 702df84
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,39 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;
import static java.lang.String.format;

import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxFutures;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.protobuf.Message;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.subjects.AsyncSubject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;

/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {
Expand Down Expand Up @@ -73,62 +81,58 @@ public void ensureInputsPresent(
.addAll(additionalInputs.keySet())
.build();

// Collect digests that are not being or already uploaded
ConcurrentHashMap<Digest, AsyncSubject<Boolean>> missingDigestSubjects =
new ConcurrentHashMap<>();

List<ListenableFuture<Void>> uploadFutures = new ArrayList<>();
for (Digest digest : allDigests) {
Completable upload =
casUploadCache.execute(
digest,
Completable.defer(
() -> {
// The digest hasn't been processed, add it to the collection which will be used
// later for findMissingDigests call
AsyncSubject<Boolean> missingDigestSubject = AsyncSubject.create();
missingDigestSubjects.put(digest, missingDigestSubject);

return missingDigestSubject.flatMapCompletable(
missing -> {
if (!missing) {
return Completable.complete();
}
return RxFutures.toCompletable(
() -> uploadBlob(context, digest, merkleTree, additionalInputs),
MoreExecutors.directExecutor());
});
}),
force);
uploadFutures.add(RxFutures.toListenableFuture(upload));
if (allDigests.isEmpty()) {
return;
}

ImmutableSet<Digest> missingDigests;
try {
missingDigests = getFromFuture(findMissingDigests(context, missingDigestSubjects.keySet()));
} catch (IOException | InterruptedException e) {
for (Map.Entry<Digest, AsyncSubject<Boolean>> entry : missingDigestSubjects.entrySet()) {
entry.getValue().onError(e);
}
MissingDigestFinder missingDigestFinder = new MissingDigestFinder(context, allDigests.size());
Flowable<TransferResult> uploads =
Flowable.fromIterable(allDigests)
.flatMapSingle(
digest ->
uploadBlobIfMissing(
context, merkleTree, additionalInputs, force, missingDigestFinder, digest));

if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
try {
mergeBulkTransfer(uploads).blockingAwait();
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause != null) {
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
Throwables.throwIfInstanceOf(cause, IOException.class);
}
throw e;
}
}

for (Map.Entry<Digest, AsyncSubject<Boolean>> entry : missingDigestSubjects.entrySet()) {
AsyncSubject<Boolean> missingSubject = entry.getValue();
if (missingDigests.contains(entry.getKey())) {
missingSubject.onNext(true);
} else {
// The digest is already existed in the remote cache, skip the upload.
missingSubject.onNext(false);
}
missingSubject.onComplete();
}

waitForBulkTransfer(uploadFutures, /* cancelRemainingOnInterrupt=*/ false);
private Single<TransferResult> uploadBlobIfMissing(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
boolean force,
MissingDigestFinder missingDigestFinder,
Digest digest) {
Completable upload =
casUploadCache.execute(
digest,
Completable.defer(
() ->
// Only reach here if the digest is missing and is not being uploaded.
missingDigestFinder
.registerAndCount(digest)
.flatMapCompletable(
missingDigests -> {
if (missingDigests.contains(digest)) {
return toCompletable(
() -> uploadBlob(context, digest, merkleTree, additionalInputs),
directExecutor());
} else {
return Completable.complete();
}
})),
/* onIgnored= */ missingDigestFinder::count,
force);
return toTransferResult(upload);
}

private ListenableFuture<Void> uploadBlob(
Expand Down Expand Up @@ -160,4 +164,93 @@ private ListenableFuture<Void> uploadBlob(
"findMissingDigests returned a missing digest that has not been requested: %s",
digest)));
}

/**
* A missing digest finder that initiates the request when the internal counter reaches an
* expected count.
*/
class MissingDigestFinder {
private final int expectedCount;

private final AsyncSubject<ImmutableSet<Digest>> digestsSubject;
private final Single<ImmutableSet<Digest>> resultSingle;

@GuardedBy("this")
private final Set<Digest> digests;

@GuardedBy("this")
private int currentCount = 0;

MissingDigestFinder(RemoteActionExecutionContext context, int expectedCount) {
checkArgument(expectedCount > 0, "expectedCount should be greater than 0");
this.expectedCount = expectedCount;
this.digestsSubject = AsyncSubject.create();
this.digests = new HashSet<>();

AtomicBoolean findMissingDigestsCalled = new AtomicBoolean(false);
this.resultSingle =
Single.fromObservable(
digestsSubject
.flatMapSingle(
digests -> {
boolean wasCalled = findMissingDigestsCalled.getAndSet(true);
// Make sure we don't have re-subscription caused by refCount() below.
checkState(!wasCalled, "FindMissingDigests is called more than once");
return toSingle(
() -> findMissingDigests(context, digests), directExecutor());
})
// Use replay here because we could have a race condition that downstream hasn't
// been added to the subscription list (to receive the upstream result) while
// upstream is completed.
.replay(1)
.refCount());
}

/**
* Register the {@code digest} and increase the counter.
*
* <p>Returned Single cannot be subscribed more than once.
*
* @return Single that emits the result of the {@code FindMissingDigest} request.
*/
Single<ImmutableSet<Digest>> registerAndCount(Digest digest) {
AtomicBoolean subscribed = new AtomicBoolean(false);
// count() will potentially trigger the findMissingDigests call. Adding and counting before
// returning the Single could introduce a race that the result of findMissingDigests is
// available but the consumer doesn't get it because it hasn't subscribed the returned
// Single. In this case, it subscribes after upstream is completed resulting a re-run of
// findMissingDigests (due to refCount()).
//
// Calling count() inside doOnSubscribe to ensure the consumer already subscribed to the
// returned Single to avoid a re-execution of findMissingDigests.
return resultSingle.doOnSubscribe(
d -> {
boolean wasSubscribed = subscribed.getAndSet(true);
checkState(!wasSubscribed, "Single is subscribed more than once");
synchronized (this) {
digests.add(digest);
}
count();
});
}

/** Increase the counter. */
void count() {
ImmutableSet<Digest> digestsResult = null;

synchronized (this) {
if (currentCount < expectedCount) {
currentCount++;
if (currentCount == expectedCount) {
digestsResult = ImmutableSet.copyOf(digests);
}
}
}

if (digestsResult != null) {
digestsSubject.onNext(digestsResult);
digestsSubject.onComplete();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Action;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -256,14 +257,25 @@ public boolean isDisposed() {
/**
* Executes a task.
*
* @see #execute(Object, Single, Action, boolean).
*/
public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
return execute(key, task, () -> {}, force);
}

/**
* Executes a task. If the task has already finished, this execution of the task is ignored unless
* `force` is true. If the task is in progress this execution of the task is always ignored.
*
* <p>If the cache is already shutdown, a {@link CancellationException} will be emitted.
*
* @param key identifies the task.
* @param onIgnored callback called when provided task is ignored.
* @param force re-execute a finished task if set to {@code true}.
* @return a {@link Single} which turns to completed once the task is finished or propagates the
* error if any.
*/
public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
public Single<ValueT> execute(KeyT key, Single<ValueT> task, Action onIgnored, boolean force) {
return Single.create(
emitter -> {
synchronized (lock) {
Expand All @@ -273,14 +285,20 @@ public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
}

if (!force && finished.containsKey(key)) {
onIgnored.run();
emitter.onSuccess(finished.get(key));
return;
}

finished.remove(key);

Execution execution =
inProgress.computeIfAbsent(key, ignoredKey -> new Execution(key, task));
Execution execution = inProgress.get(key);
if (execution != null) {
onIgnored.run();
} else {
execution = new Execution(key, task);
inProgress.put(key, execution);
}

// We must subscribe the execution within the scope of lock to avoid race condition
// that:
Expand Down Expand Up @@ -425,10 +443,15 @@ public Completable executeIfNot(KeyT key, Completable task) {
cache.executeIfNot(key, task.toSingleDefault(Optional.empty())));
}

/** Same as {@link AsyncTaskCache#executeIfNot} but operates on {@link Completable}. */
/** Same as {@link AsyncTaskCache#execute} but operates on {@link Completable}. */
public Completable execute(KeyT key, Completable task, boolean force) {
return execute(key, task, () -> {}, force);
}

/** Same as {@link AsyncTaskCache#execute} but operates on {@link Completable}. */
public Completable execute(KeyT key, Completable task, Action onIgnored, boolean force) {
return Completable.fromSingle(
cache.execute(key, task.toSingleDefault(Optional.empty()), force));
cache.execute(key, task.toSingleDefault(Optional.empty()), onIgnored, force));
}

/** Returns a set of keys for tasks which is finished. */
Expand Down
Loading

0 comments on commit 702df84

Please sign in to comment.