Skip to content

Commit

Permalink
Add support for fetching repos using a worker thread
Browse files Browse the repository at this point in the history
Added flag `--experimental_worker_for_repo_fetching`, whose value can be `off` (no change), `platform` (uses a platform/OS thread to do repo fetching), `virtual` (uses a virtual thread feat. Project Loom).

When a worker thread is used, the Skyframe thread does not perform any fetching itself. Instead, it gives its Environment object to the worker thread, and let it do the bulk of the work. When a new Skyframe dependency is registered and is found missing, the worker thread doesn't restart; instead, it signals to the host thread that it should restart, and waits for the host thread to give back a fresh Environment object.

This weird dance can work with both platform threads and virtual threads. Right now, passing `--experimental_worker_for_repo_fetching=virtual` results in a fatal error, but that should be easily fixable when JDK20+ lands.

Work towards #10515

PiperOrigin-RevId: 538909616
Change-Id: Iacc8b84ca2f90821fba5add171a4778ed2b48ab9
  • Loading branch information
Wyverald authored and copybara-github committed Jun 8, 2023
1 parent 655849b commit 1590dbc
Show file tree
Hide file tree
Showing 11 changed files with 455 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -154,6 +156,7 @@ public class BazelRepositoryModule extends BlazeModule {
private LockfileMode bazelLockfileMode = LockfileMode.OFF;
private List<String> allowedYankedVersions = ImmutableList.of();
private SingleExtensionEvalFunction singleExtensionEvalFunction;
private final ExecutorService repoFetchingWorkerThreadPool = Executors.newFixedThreadPool(100);

@Nullable private CredentialModule credentialModule;

Expand Down Expand Up @@ -312,6 +315,19 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {

RepositoryOptions repoOptions = env.getOptions().getOptions(RepositoryOptions.class);
if (repoOptions != null) {
switch (repoOptions.workerForRepoFetching) {
case OFF:
starlarkRepositoryFunction.setWorkerExecutorService(null);
break;
case PLATFORM:
starlarkRepositoryFunction.setWorkerExecutorService(repoFetchingWorkerThreadPool);
break;
case VIRTUAL:
throw new AbruptExitException(
detailedExitCode(
"using a virtual worker thread for repo fetching is not yet supported",
Code.BAD_DOWNLOADER_CONFIG));
}
downloadManager.setDisableDownload(repoOptions.disableDownload);
if (repoOptions.repositoryDownloaderRetries >= 0) {
downloadManager.setRetries(repoOptions.repositoryDownloaderRetries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,31 @@ public class RepositoryOptions extends OptionsBase {
+ "give, and in this case multiple URLs will be returned.")
public String downloaderConfig;

/** See {@link #workerForRepoFetching}. */
public enum WorkerForRepoFetching {
OFF,
PLATFORM,
VIRTUAL;

static class Converter extends EnumConverter<WorkerForRepoFetching> {
public Converter() {
super(WorkerForRepoFetching.class, "worker for repo fetching");
}
}
}

@Option(
name = "experimental_worker_for_repo_fetching",
defaultValue = "off",
converter = WorkerForRepoFetching.Converter.class,
documentationCategory = OptionDocumentationCategory.REMOTE,
effectTags = {OptionEffectTag.UNKNOWN},
help =
"The threading mode to use for repo fetching. If set to 'off', no worker thread is used,"
+ " and the repo fetching is subject to restarts. Otherwise, uses a platform thread"
+ " (i.e. OS thread) if set to 'platform' or a virtual thread if set to 'virtual'.")
public WorkerForRepoFetching workerForRepoFetching;

@Option(
name = "ignore_dev_dependency",
defaultValue = "false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ java_library(
"//src/main/java/net/starlark/java/annot",
"//src/main/java/net/starlark/java/eval",
"//src/main/java/net/starlark/java/syntax",
"//third_party:auto_value",
"//third_party:error_prone_annotations",
"//third_party:guava",
"//third_party:java-diff-utils",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2023 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.bazel.repository.starlark;

import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue;
import com.google.devtools.build.skyframe.SkyFunction;
import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import javax.annotation.Nullable;

/**
* Captures state that persists across different invocations of {@link
* com.google.devtools.build.lib.rules.repository.RepositoryDelegatorFunction}, specifically {@link
* StarlarkRepositoryFunction}.
*
* <p>This class is used to hold on to a worker thread (in reality just a {@link Future} object)
* when fetching repos using a worker thread is enabled. The worker thread uses a {@link
* SkyFunction.Environment} object acquired from the host thread, and can signal the host thread to
* restart to get a fresh environment object.
*/
class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {

/** A signal that the worker thread can send to the host Skyframe thread. */
enum Signal {
/**
* Indicates that the host thread should return {@code null}, causing a Skyframe restart. After
* sending this signal, the client will immediately block on {@code delegateEnvQueue}, waiting
* for the host thread to send a fresh {@link SkyFunction.Environment} over.
*/
RESTART,
/**
* Indicates that the worker thread has finished running, either yielding a result or an
* exception.
*/
DONE
}

/** The channel for the worker thread to send a signal to the host Skyframe thread. */
final BlockingQueue<Signal> signalQueue = new SynchronousQueue<>();
/**
* The channel for the host Skyframe thread to send fresh {@link SkyFunction.Environment} objects
* back to the worker thread.
*/
final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new SynchronousQueue<>();
/**
* This future holds on to the worker thread in order to cancel it when necessary; it also serves
* to tell whether a worker thread is already running.
*/
// This is volatile since we set it to null to indicate the worker thread isn't running, and this
// could happen on multiple threads. Canceling a future multiple times is safe, though, so we
// only need to worry about nullness. Using a mutex/synchronization is an alternative but it means
// we might block in `close()`, which is potentially bad (see its javadoc).
@Nullable volatile Future<RepositoryDirectoryValue.Builder> workerFuture = null;

SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
signalQueue.put(Signal.RESTART);
return delegateEnvQueue.take();
}

@Override
public void close() {
var myWorkerFuture = workerFuture;
workerFuture = null;
if (myWorkerFuture != null) {
myWorkerFuture.cancel(true);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright 2023 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.bazel.repository.starlark;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.skyframe.SkyFunction;
import com.google.devtools.build.skyframe.SkyKey;
import com.google.devtools.build.skyframe.SkyValue;
import com.google.devtools.build.skyframe.SkyframeLookupResult;
import com.google.devtools.build.skyframe.Version;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* A {@link SkyFunction.Environment} implementation designed to be used in a different thread than
* the corresponding SkyFunction runs in. It relies on a delegate Environment object to do
* underlying work. Its {@link #getValue} and {@link #getValueOrThrow} methods do not return {@code
* null} when the {@link SkyValue} in question is not available. Instead, it blocks and waits for
* the host Skyframe thread to restart, and replaces the delegate Environment with a fresh one from
* the restarted SkyFunction before continuing. (Note that those methods <em>do</em> return {@code
* null} if the SkyValue was evaluated but found to be in error.)
*
* <p>Crucially, the delegate Environment object must not be used by multiple threads at the same
* time. In effect, this is guaranteed by only one of the worker thread and host thread being active
* at any given time.
*/
class RepoFetchingWorkerSkyFunctionEnvironment
implements SkyFunction.Environment, ExtendedEventHandler, SkyframeLookupResult {
private final RepoFetchingSkyKeyComputeState state;
private SkyFunction.Environment delegate;

RepoFetchingWorkerSkyFunctionEnvironment(
RepoFetchingSkyKeyComputeState state, SkyFunction.Environment delegate) {
this.state = state;
this.delegate = delegate;
}

@Override
public boolean valuesMissing() {
return delegate.valuesMissing();
}

@Override
public SkyframeLookupResult getValuesAndExceptions(Iterable<? extends SkyKey> depKeys)
throws InterruptedException {
delegate.getValuesAndExceptions(depKeys);
if (!delegate.valuesMissing()) {
// Do NOT just return the return value of `delegate.getValuesAndExceptions` here! That would
// cause anyone holding onto the returned // result object to potentially use a stale version
// of it after a skyfunction restart.
return this;
}
// We null out `delegate` before blocking for the fresh env so that the old one becomes
// eligible for GC.
delegate = null;
delegate = state.signalForFreshEnv();
delegate.getValuesAndExceptions(depKeys);
return this;
}

@Nullable
@Override
public <E1 extends Exception, E2 extends Exception, E3 extends Exception> SkyValue getOrThrow(
SkyKey skyKey, Class<E1> e1, Class<E2> e2, Class<E3> e3) throws E1, E2, E3 {
return delegate.getLookupHandleForPreviouslyRequestedDeps().getOrThrow(skyKey, e1, e2, e3);
}

@Override
public boolean queryDep(SkyKey key, QueryDepCallback resultCallback) {
return delegate.getLookupHandleForPreviouslyRequestedDeps().queryDep(key, resultCallback);
}

@Nullable
@Override
public SkyValue getValue(SkyKey depKey) throws InterruptedException {
return getValuesAndExceptions(ImmutableList.of(depKey)).get(depKey);
}

@Nullable
@Override
public <E1 extends Exception> SkyValue getValueOrThrow(SkyKey depKey, Class<E1> e1)
throws E1, InterruptedException {
return getValuesAndExceptions(ImmutableList.of(depKey)).getOrThrow(depKey, e1);
}

@Nullable
@Override
public <E1 extends Exception, E2 extends Exception> SkyValue getValueOrThrow(
SkyKey depKey, Class<E1> e1, Class<E2> e2) throws E1, E2, InterruptedException {
return getValuesAndExceptions(ImmutableList.of(depKey)).getOrThrow(depKey, e1, e2);
}

@Nullable
@Override
public <E1 extends Exception, E2 extends Exception, E3 extends Exception>
SkyValue getValueOrThrow(SkyKey depKey, Class<E1> e1, Class<E2> e2, Class<E3> e3)
throws E1, E2, E3, InterruptedException {
return getValuesAndExceptions(ImmutableList.of(depKey)).getOrThrow(depKey, e1, e2, e3);
}

@Nullable
@Override
public <E1 extends Exception, E2 extends Exception, E3 extends Exception, E4 extends Exception>
SkyValue getValueOrThrow(
SkyKey depKey, Class<E1> e1, Class<E2> e2, Class<E3> e3, Class<E4> e4)
throws E1, E2, E3, E4, InterruptedException {
SkyValue value = delegate.getValueOrThrow(depKey, e1, e2, e3, e4);
if (value != null) {
return value;
}
// We null out `delegate` before blocking for the fresh env so that the old one becomes
// eligible for GC.
delegate = null;
delegate = state.signalForFreshEnv();
return delegate.getValueOrThrow(depKey, e1, e2, e3, e4);
}

@Override
public ExtendedEventHandler getListener() {
// Do NOT just return `delegate.getListener()` here! That would cause anyone holding onto the
// returned listener to potentially post events to a stale listener.
return this;
}

@Override
public void post(Postable obj) {
delegate.getListener().post(obj);
}

@Override
public void handle(Event event) {
delegate.getListener().handle(event);
}

@Override
public void registerDependencies(Iterable<SkyKey> keys) {
delegate.registerDependencies(keys);
}

@Override
public boolean inErrorBubblingForSkyFunctionsThatCanFullyRecoverFromErrors() {
return delegate.inErrorBubblingForSkyFunctionsThatCanFullyRecoverFromErrors();
}

@Override
public void dependOnFuture(ListenableFuture<?> future) {
delegate.dependOnFuture(future);
}

@Override
public boolean restartPermitted() {
return delegate.restartPermitted();
}

@Override
public SkyframeLookupResult getLookupHandleForPreviouslyRequestedDeps() {
return delegate.getLookupHandleForPreviouslyRequestedDeps();
}

@Override
public <T extends SkyKeyComputeState> T getState(Supplier<T> stateSupplier) {
return delegate.getState(stateSupplier);
}

@Nullable
@Override
public Version getMaxTransitiveSourceVersionSoFar() {
return delegate.getMaxTransitiveSourceVersionSoFar();
}
}
Loading

0 comments on commit 1590dbc

Please sign in to comment.