Skip to content

Commit

Permalink
Download toplevel artifacts within spawn execution
Browse files Browse the repository at this point in the history
so that after action execution, the outputs are already downloaded.

We have three types of downloads:
  1. Download within spawn execution: the download is part of the spawn execution. Similar to normal build that, during the spawn execution, outputs are written to local file system. Metadata are not injected to skyframe, but instead, are calculated, e.g RegularFileArtifactValue.
  2. For action that doesn't have spawn, e.g. SymlinkAction, the download is part of action finlization before injecting metadata into skyframe.
  3. Download afterwards: the download is not part of the spawn execution. During spawn execution, metadata of outputs are injected into skyframe as RemoteFileArtifactValue. Dynamic execution, for example, can then request the downloads later.

PiperOrigin-RevId: 529423585
Change-Id: If78663e22f0d36d621ffb35331c7ae08dc79fccd
  • Loading branch information
coeuvre authored and copybara-github committed May 4, 2023
1 parent d228ca0 commit a5dde12
Show file tree
Hide file tree
Showing 19 changed files with 344 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public interface RemoteArtifactChecker {
* Returns true if Bazel should trust (and not verify) build artifacts that were last seen
* remotely and do not exist locally.
*/
boolean shouldTrustRemoteArtifact(Artifact file, RemoteFileArtifactValue metadata);
boolean shouldTrustRemoteArtifact(ActionInput file, RemoteFileArtifactValue metadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public abstract class AbstractActionInputPrefetcher implements ActionInputPrefet
private final AsyncTaskCache.NoResult<Path> downloadCache = AsyncTaskCache.NoResult.create();
private final TempPathGenerator tempPathGenerator;
private final OutputPermissions outputPermissions;
protected final Set<Artifact> outputsAreInputs = Sets.newConcurrentHashSet();

protected final Path execRoot;
protected final RemoteOutputChecker remoteOutputChecker;
Expand Down Expand Up @@ -587,61 +586,34 @@ public List<Artifact> getArtifacts() {
}
}

@SuppressWarnings({"CheckReturnValue", "FutureReturnValueIgnored"})
public void finalizeAction(Action action, OutputMetadataStore outputMetadataStore)
throws IOException, InterruptedException {
List<Artifact> inputsToDownload = new ArrayList<>();
List<Artifact> outputsToDownload = new ArrayList<>();

for (Artifact output : action.getOutputs()) {
if (outputMetadataStore.artifactOmitted(output)) {
continue;
}

var metadata = outputMetadataStore.getOutputMetadata(output);
if (!metadata.isRemote()) {
continue;
}

if (outputsAreInputs.remove(output)) {
if (output.isTreeArtifact()) {
var children = outputMetadataStore.getTreeArtifactChildren((SpecialArtifact) output);
inputsToDownload.addAll(children);
} else {
inputsToDownload.add(output);
}
} else if (output.isTreeArtifact()) {
if (output.isTreeArtifact()) {
var children = outputMetadataStore.getTreeArtifactChildren((SpecialArtifact) output);
for (var file : children) {
if (remoteOutputChecker.shouldDownloadFile(file)) {
if (remoteOutputChecker.shouldDownloadFileAfterActionExecution(file)) {
outputsToDownload.add(file);
}
}
} else if (remoteOutputChecker.shouldDownloadFile(output)) {
outputsToDownload.add(output);
} else {
if (remoteOutputChecker.shouldDownloadFileAfterActionExecution(output)) {
outputsToDownload.add(output);
}
}
}

if (!inputsToDownload.isEmpty()) {
// "input" here means "input to another action" (but an output of this one), so
// getOutputMetadata() is the right method to pass to prefetchFiles()
var future =
prefetchFiles(inputsToDownload, outputMetadataStore::getOutputMetadata, Priority.HIGH);
addCallback(
future,
new FutureCallback<Void>() {
@Override
public void onSuccess(Void unused) {
reporter.post(new InputsEagerlyPrefetched(inputsToDownload));
}

@Override
public void onFailure(Throwable throwable) {
reporter.handle(
Event.warn(
String.format(
"Failed to eagerly prefetch inputs: %s", throwable.getMessage())));
}
},
directExecutor());
}

if (!outputsToDownload.isEmpty()) {
var future =
prefetchFiles(outputsToDownload, outputMetadataStore::getOutputMetadata, Priority.LOW);
Expand Down Expand Up @@ -669,4 +641,8 @@ public void flushOutputTree() throws InterruptedException {
public ImmutableSet<ActionInput> getMissingActionInputs() {
return ImmutableSet.copyOf(missingActionInputs);
}

public RemoteOutputChecker getRemoteOutputChecker() {
return remoteOutputChecker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ final class RemoteActionContextProvider {
private TempPathGenerator tempPathGenerator;
private RemoteExecutionService remoteExecutionService;
@Nullable private RemoteActionInputFetcher actionInputFetcher;
@Nullable private final RemoteOutputChecker remoteOutputChecker;

private RemoteActionContextProvider(
Executor executor,
Expand All @@ -56,14 +57,16 @@ private RemoteActionContextProvider(
@Nullable RemoteExecutionClient remoteExecutor,
@Nullable ListeningScheduledExecutorService retryScheduler,
DigestUtil digestUtil,
@Nullable Path logDir) {
@Nullable Path logDir,
@Nullable RemoteOutputChecker remoteOutputChecker) {
this.executor = executor;
this.env = Preconditions.checkNotNull(env, "env");
this.remoteCache = remoteCache;
this.remoteExecutor = remoteExecutor;
this.retryScheduler = retryScheduler;
this.digestUtil = digestUtil;
this.logDir = logDir;
this.remoteOutputChecker = remoteOutputChecker;
}

public static RemoteActionContextProvider createForPlaceholder(
Expand All @@ -73,27 +76,30 @@ public static RemoteActionContextProvider createForPlaceholder(
return new RemoteActionContextProvider(
directExecutor(),
env,
/*remoteCache=*/ null,
/*remoteExecutor=*/ null,
/* remoteCache= */ null,
/* remoteExecutor= */ null,
retryScheduler,
digestUtil,
/*logDir=*/ null);
/* logDir= */ null,
/* remoteOutputChecker= */ null);
}

public static RemoteActionContextProvider createForRemoteCaching(
Executor executor,
CommandEnvironment env,
RemoteCache remoteCache,
ListeningScheduledExecutorService retryScheduler,
DigestUtil digestUtil) {
DigestUtil digestUtil,
@Nullable RemoteOutputChecker remoteOutputChecker) {
return new RemoteActionContextProvider(
executor,
env,
remoteCache,
/*remoteExecutor=*/ null,
/* remoteExecutor= */ null,
retryScheduler,
digestUtil,
/*logDir=*/ null);
/* logDir= */ null,
remoteOutputChecker);
}

public static RemoteActionContextProvider createForRemoteExecution(
Expand All @@ -103,9 +109,17 @@ public static RemoteActionContextProvider createForRemoteExecution(
RemoteExecutionClient remoteExecutor,
ListeningScheduledExecutorService retryScheduler,
DigestUtil digestUtil,
Path logDir) {
Path logDir,
@Nullable RemoteOutputChecker remoteOutputChecker) {
return new RemoteActionContextProvider(
executor, env, remoteCache, remoteExecutor, retryScheduler, digestUtil, logDir);
executor,
env,
remoteCache,
remoteExecutor,
retryScheduler,
digestUtil,
logDir,
remoteOutputChecker);
}

private RemotePathResolver createRemotePathResolver() {
Expand Down Expand Up @@ -155,7 +169,8 @@ private RemoteExecutionService getRemoteExecutionService() {
remoteCache,
remoteExecutor,
tempPathGenerator,
captureCorruptedOutputsDir);
captureCorruptedOutputsDir,
remoteOutputChecker);
env.getEventBus().register(remoteExecutionService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,20 @@ void injectRemoteFile(PathFragment path, byte[] digest, long size, long expireAt
remoteOutputTree.injectRemoteFile(path, digest, size, expireAtEpochMilli);
}

void flush() throws IOException {
void flush() throws IOException, InterruptedException {
checkNotNull(metadataInjector, "metadataInjector is null");

for (Map.Entry<PathFragment, Artifact> entry : outputMapping.entrySet()) {
PathFragment path = execRoot.getRelative(entry.getKey());
Artifact output = entry.getValue();

maybeInjectMetadataForSymlink(path, output);
maybeInjectMetadataForSymlinkOrDownload(path, output);
}
}

/**
* Inject metadata for non-symlink outputs that were materialized as a symlink to a remote
* artifact.
* artifact, and download the target artifact if required by the remote output mode.
*
* <p>If a non-symlink output is materialized as a symlink, the symlink has "copy" semantics,
* i.e., the output metadata is identical to that of the symlink target. For these artifacts, we
Expand All @@ -162,8 +162,8 @@ void flush() throws IOException {
* fetching multiple copies when multiple symlinks to the same artifact are created in the
* same build.
*/
private void maybeInjectMetadataForSymlink(PathFragment linkPath, Artifact output)
throws IOException {
private void maybeInjectMetadataForSymlinkOrDownload(PathFragment linkPath, Artifact output)
throws IOException, InterruptedException {
if (output.isSymlink()) {
return;
}
Expand All @@ -185,6 +185,25 @@ private void maybeInjectMetadataForSymlink(PathFragment linkPath, Artifact outpu
targetPath.isAbsolute(),
"non-symlink artifact materialized as symlink must point to absolute path");

if (inputFetcher.getRemoteOutputChecker().shouldDownloadOutputDuringActionExecution(output)) {
var targetActionInput = getInput(targetPath.relativeTo(execRoot).getPathString());
if (targetActionInput != null) {
if (output.isTreeArtifact()) {
var metadata = getRemoteTreeMetadata(targetPath);
if (metadata != null) {
getFromFuture(
inputFetcher.prefetchFiles(
metadata.getChildren(), this::getInputMetadata, Priority.LOW));
}
} else {
getFromFuture(
inputFetcher.prefetchFiles(
ImmutableList.of(targetActionInput), this::getInputMetadata, Priority.LOW));
}
}
return;
}

if (output.isTreeArtifact()) {
TreeArtifactValue metadata = getRemoteTreeMetadata(targetPath);
if (metadata == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ public class RemoteExecutionService {
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean buildInterrupted = new AtomicBoolean(false);

@Nullable private final RemoteOutputChecker remoteOutputChecker;

public RemoteExecutionService(
Executor executor,
Reporter reporter,
Expand All @@ -191,7 +193,8 @@ public RemoteExecutionService(
@Nullable RemoteCache remoteCache,
@Nullable RemoteExecutionClient remoteExecutor,
TempPathGenerator tempPathGenerator,
@Nullable Path captureCorruptedOutputsDir) {
@Nullable Path captureCorruptedOutputsDir,
@Nullable RemoteOutputChecker remoteOutputChecker) {
this.reporter = reporter;
this.verboseFailures = verboseFailures;
this.execRoot = execRoot;
Expand All @@ -214,6 +217,7 @@ public RemoteExecutionService(
this.captureCorruptedOutputsDir = captureCorruptedOutputsDir;

this.scheduler = Schedulers.from(executor, /* interruptibleWorker= */ true);
this.remoteOutputChecker = remoteOutputChecker;
}

static Command buildCommand(
Expand Down Expand Up @@ -1158,7 +1162,7 @@ public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult re

ImmutableList.Builder<ListenableFuture<FileMetadata>> downloadsBuilder =
ImmutableList.builder();
boolean downloadOutputs = shouldDownloadOutputsFor(result, metadata);
boolean downloadOutputs = shouldDownloadOutputsFor(action, result, metadata);

// Download into temporary paths, then move everything at the end.
// This avoids holding the output lock while downloading, which would prevent the local branch
Expand Down Expand Up @@ -1322,7 +1326,7 @@ public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult re
}

private boolean shouldDownloadOutputsFor(
RemoteActionResult result, ActionResultMetadata metadata) {
RemoteAction action, RemoteActionResult result, ActionResultMetadata metadata) {
if (remoteOptions.remoteOutputsMode.downloadAllOutputs()) {
return true;
}
Expand All @@ -1341,6 +1345,14 @@ private boolean shouldDownloadOutputsFor(
Iterables.get(metadata.symlinks(), 0).path())));
return true;
}

checkNotNull(remoteOutputChecker, "remoteOutputChecker must not be null");
for (var output : action.getSpawn().getOutputFiles()) {
if (remoteOutputChecker.shouldDownloadOutputDuringActionExecution(output)) {
return true;
}
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,12 @@ private void initHttpAndDiskCache(
new RemoteCache(HTTP_AND_DISK_CACHE_CAPABILITIES, cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteCaching(
executorService, env, remoteCache, /* retryScheduler= */ null, digestUtil);
executorService,
env,
remoteCache,
/* retryScheduler= */ null,
digestUtil,
remoteOutputChecker);
}

@Override
Expand Down Expand Up @@ -657,7 +662,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteExecutor,
retryScheduler,
digestUtil,
logDir);
logDir,
remoteOutputChecker);
repositoryRemoteExecutorFactoryDelegate.init(
new RemoteRepositoryRemoteExecutorFactory(
remoteCache,
Expand Down Expand Up @@ -689,7 +695,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
cacheCapabilities.getCacheCapabilities(), cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteCaching(
executorService, env, remoteCache, retryScheduler, digestUtil);
executorService, env, remoteCache, retryScheduler, digestUtil, remoteOutputChecker);
}

buildEventArtifactUploaderFactoryDelegate.init(
Expand Down
Loading

0 comments on commit a5dde12

Please sign in to comment.