Skip to content

Commit

Permalink
Introduce --experimental_parallel_aquery_output.
Browse files Browse the repository at this point in the history
This flag would enable blaze to process the analysis graph in parallel while
constructing an aquery result, instead of sequentially. The implementation follows a producer-consumer pattern to reduce the impact of the IO bottleneck.

PiperOrigin-RevId: 518804027
Change-Id: I912a0170767c1a3a0184bbd024d48b037ec7de19
  • Loading branch information
joeleba authored and copybara-github committed Mar 23, 2023
1 parent 70ce837 commit 6a17457
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,42 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.aquery;

import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.actions.CommandLineExpansionException;
import com.google.devtools.build.lib.analysis.AspectValue;
import com.google.devtools.build.lib.analysis.ConfiguredTargetValue;
import com.google.devtools.build.lib.analysis.actions.TemplateExpansionException;
import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.TargetAccessor;
import com.google.devtools.build.lib.skyframe.RuleConfiguredTargetValue;
import com.google.devtools.build.lib.skyframe.SkyframeExecutor;
import com.google.devtools.build.lib.skyframe.actiongraph.v2.ActionGraphDump;
import com.google.devtools.build.lib.skyframe.actiongraph.v2.AqueryConsumingOutputHandler;
import com.google.devtools.build.lib.skyframe.actiongraph.v2.AqueryOutputHandler;
import com.google.devtools.build.lib.skyframe.actiongraph.v2.AqueryOutputHandler.OutputType;
import com.google.devtools.build.lib.skyframe.actiongraph.v2.MonolithicOutputHandler;
import com.google.devtools.build.lib.skyframe.actiongraph.v2.StreamedConsumingOutputHandler;
import com.google.devtools.build.lib.skyframe.actiongraph.v2.StreamedOutputHandler;
import com.google.protobuf.CodedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

/** Default output callback for aquery, prints proto output. */
public class ActionGraphProtoOutputFormatterCallback extends AqueryThreadsafeCallback {
// TODO(b/274595070): Clean this up after flag flip.

// Arbitrarily chosen. Large enough for good performance, small enough not to cause OOMs.
private static final int BLOCKING_QUEUE_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private final OutputType outputType;
private final ActionGraphDump actionGraphDump;
private final AqueryActionFilter actionFilters;
Expand All @@ -58,7 +71,8 @@ public class ActionGraphProtoOutputFormatterCallback extends AqueryThreadsafeCal
super(eventHandler, options, out, skyframeExecutor, accessor);
this.outputType = outputType;
this.actionFilters = actionFilters;
this.aqueryOutputHandler = constructAqueryOutputHandler(outputType, out, printStream);
this.aqueryOutputHandler =
constructAqueryOutputHandler(outputType, out, printStream, options.parallelAqueryOutput);
this.actionGraphDump =
new ActionGraphDump(
options.includeCommandline,
Expand All @@ -73,11 +87,22 @@ public class ActionGraphProtoOutputFormatterCallback extends AqueryThreadsafeCal

public static AqueryOutputHandler constructAqueryOutputHandler(
OutputType outputType, OutputStream out, PrintStream printStream) {
return constructAqueryOutputHandler(outputType, out, printStream, /* parallelized= */ false);
}

private static AqueryOutputHandler constructAqueryOutputHandler(
OutputType outputType, OutputStream out, PrintStream printStream, boolean parallelized) {
switch (outputType) {
case BINARY:
case TEXT:
return new StreamedOutputHandler(
outputType, CodedOutputStream.newInstance(out, OUTPUT_BUFFER_SIZE), printStream);
return parallelized
? new StreamedConsumingOutputHandler(
outputType,
CodedOutputStream.newInstance(out, OUTPUT_BUFFER_SIZE),
printStream,
new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE))
: new StreamedOutputHandler(
outputType, CodedOutputStream.newInstance(out, OUTPUT_BUFFER_SIZE), printStream);
case JSON:
return new MonolithicOutputHandler(printStream);
}
Expand All @@ -90,39 +115,118 @@ public String getName() {
return outputType.formatName();
}

@Override
public void close(boolean failFast) throws IOException {
if (!failFast) {
try (SilentCloseable c = Profiler.instance().profile("aqueryOutputHandler.close")) {
aqueryOutputHandler.close();
}
}
}

@Override
public void processOutput(Iterable<KeyedConfiguredTargetValue> partialResult)
throws IOException, InterruptedException {
if (options.parallelAqueryOutput
&& aqueryOutputHandler instanceof AqueryConsumingOutputHandler) {
processOutputInParallel(partialResult);
return;
}

try (SilentCloseable c = Profiler.instance().profile("process partial result")) {
// Enabling includeParamFiles should enable includeCommandline by default.
options.includeCommandline |= options.includeParamFiles;

for (KeyedConfiguredTargetValue keyedConfiguredTargetValue : partialResult) {
ConfiguredTargetValue configuredTargetValue =
keyedConfiguredTargetValue.getConfiguredTargetValue();
if (!(configuredTargetValue instanceof RuleConfiguredTargetValue)) {
// We have to include non-rule values in the graph to visit their dependencies, but they
// don't have any actions to print out.
continue;
}
actionGraphDump.dumpConfiguredTarget((RuleConfiguredTargetValue) configuredTargetValue);
if (options.useAspects) {
for (AspectValue aspectValue : accessor.getAspectValues(keyedConfiguredTargetValue)) {
actionGraphDump.dumpAspect(aspectValue, configuredTargetValue);
}
}
processSingleEntry(keyedConfiguredTargetValue);
}
} catch (CommandLineExpansionException | TemplateExpansionException e) {
throw new IOException(e.getMessage());
}
}

@Override
public void close(boolean failFast) throws IOException {
if (!failFast) {
try (SilentCloseable c = Profiler.instance().profile("aqueryOutputHandler.close")) {
aqueryOutputHandler.close();
private void processSingleEntry(KeyedConfiguredTargetValue keyedConfiguredTargetValue)
throws CommandLineExpansionException,
InterruptedException,
IOException,
TemplateExpansionException {
ConfiguredTargetValue configuredTargetValue =
keyedConfiguredTargetValue.getConfiguredTargetValue();
if (!(configuredTargetValue instanceof RuleConfiguredTargetValue)) {
// We have to include non-rule values in the graph to visit their dependencies, but they
// don't have any actions to print out.
return;
}
actionGraphDump.dumpConfiguredTarget((RuleConfiguredTargetValue) configuredTargetValue);
if (options.useAspects) {
for (AspectValue aspectValue : accessor.getAspectValues(keyedConfiguredTargetValue)) {
actionGraphDump.dumpAspect(aspectValue, configuredTargetValue);
}
}
}

private void processOutputInParallel(Iterable<KeyedConfiguredTargetValue> partialResult)
throws IOException, InterruptedException {
AqueryConsumingOutputHandler aqueryConsumingOutputHandler =
(AqueryConsumingOutputHandler) aqueryOutputHandler;
try (SilentCloseable c = Profiler.instance().profile("process partial result")) {
// Enabling includeParamFiles should enable includeCommandline by default.
options.includeCommandline |= options.includeParamFiles;
aqueryConsumingOutputHandler.startConsumer();
ForkJoinPool executor =
NamedForkJoinPool.newNamedPool("aquery", Runtime.getRuntime().availableProcessors());

try {
List<Future<Void>> futures = executor.invokeAll(toTasks(partialResult));
for (Future<Void> future : futures) {
future.get();
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof CommandLineExpansionException
|| cause instanceof TemplateExpansionException) {
// This is kinda weird, but keeping it in line with the status quo for now.
// TODO(b/266179316): Clean this up.
throw new IOException(cause.getMessage());
}
if (cause instanceof IOException) {
throw (IOException) cause;
}
if (cause instanceof InterruptedException) {
throw (InterruptedException) cause;
}
throw new IllegalStateException("Unexpected exception type: ", e);
} finally {
aqueryConsumingOutputHandler.stopConsumer();
executor.shutdown();
}
}
}

private ImmutableList<AqueryOutputTask> toTasks(Iterable<KeyedConfiguredTargetValue> values) {
ImmutableList.Builder<AqueryOutputTask> tasks = ImmutableList.builder();
for (KeyedConfiguredTargetValue value : values) {
tasks.add(new AqueryOutputTask(value));
}
return tasks.build();
}

private final class AqueryOutputTask implements Callable<Void> {

private final KeyedConfiguredTargetValue keyedConfiguredTargetValue;

AqueryOutputTask(KeyedConfiguredTargetValue keyedConfiguredTargetValue) {
this.keyedConfiguredTargetValue = keyedConfiguredTargetValue;
}

@Override
public Void call()
throws CommandLineExpansionException,
TemplateExpansionException,
IOException,
InterruptedException {
processSingleEntry(keyedConfiguredTargetValue);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,14 @@ public class AqueryOptions extends CommonQueryOptions {
+ " output. This does not deduplicate depsets that don't share an immediate parent."
+ " This does not affect the final effective list of input artifacts of the actions.")
public boolean deduplicateDepsets;

@Option(
name = "experimental_parallel_aquery_output",
defaultValue = "false",
documentationCategory = OptionDocumentationCategory.QUERY,
effectTags = {OptionEffectTag.UNKNOWN},
help =
"Whether aquery proto/textproto output should be written in parallel. No-op for the "
+ "other output formats.")
public boolean parallelAqueryOutput;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2023 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.skyframe.actiongraph.v2;

/** AqueryOutputHandler that receives and consumes tasks via a work queue. */
public interface AqueryConsumingOutputHandler extends AqueryOutputHandler {

void startConsumer();

void stopConsumer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
"//src/main/java/net/starlark/java/eval",
"//src/main/protobuf:analysis_v2_java_proto",
"//third_party:auto_value",
"//third_party:guava",
"//third_party:jsr305",
"//third_party/protobuf:protobuf_java",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
package com.google.devtools.build.lib.skyframe.actiongraph.v2;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Basic class to abstract action graph cache functionality.
*/
abstract class BaseCache<K, P> {
private final Map<K, Integer> cache = new HashMap<>();
private final Map<K, Integer> cache = new ConcurrentHashMap<>();
protected final AqueryOutputHandler aqueryOutputHandler;

BaseCache(AqueryOutputHandler aqueryOutputHandler) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.skyframe.actiongraph.v2;

import com.google.auto.value.AutoValue;
import com.google.protobuf.Message;
import javax.annotation.Nullable;

/**
* Represent a task to be consumed by a {@link AqueryConsumingOutputHandler}.
*
* <p>We have separate Proto/TextProto subclasses to reduce some memory waste: we'll never need both
* the fieldNumber and the messageLabel in a PrintTask.
*/
@SuppressWarnings("InterfaceWithOnlyStatics")
public interface PrintTask {

/** A task for the proto format. */
@AutoValue
abstract class ProtoPrintTask implements PrintTask {
@Nullable
abstract Message message();

abstract int fieldNumber();

public static ProtoPrintTask create(Message message, int fieldNumber) {
return new AutoValue_PrintTask_ProtoPrintTask(message, fieldNumber);
}
}

/** A task for the textproto format. */
@AutoValue
abstract class TextProtoPrintTask implements PrintTask {
@Nullable
abstract Message message();

abstract String messageLabel();

public static TextProtoPrintTask create(Message message, String messageLabel) {
return new AutoValue_PrintTask_TextProtoPrintTask(message, messageLabel);
}
}
}
Loading

0 comments on commit 6a17457

Please sign in to comment.