Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

Commit

Permalink
Automated g4 rollback of commit d9fea57.
Browse files Browse the repository at this point in the history
*** Reason for rollback ***

Rollforward with fix for test flakiness. BEP transport closed
events are delivered via their own threadpool and thus might
not have been sent immediately. BuildEventStreamerTest#testSimpleStream
now waits for a bit until the event has been delivered. I ran the test
with --runs_per_test=1000 several times and had no further failures.

*** Original change description ***

Automated g4 rollback of commit 3d596d6.

*** Reason for rollback ***

Made BuildEventStreamerTest#testSimpleStream 3% flaky based on --runs_per_test=1000.
RELNOTES: None
PiperOrigin-RevId: 154170833
  • Loading branch information
buchgr authored and vladmos committed Apr 25, 2017
1 parent 941a4ca commit 9e0308e
Show file tree
Hide file tree
Showing 13 changed files with 470 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2017 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.buildeventstream;

import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.events.ExtendedEventHandler.Postable;
import java.util.Collection;
import java.util.Set;

/**
* An event announcing a list of all active {@link BuildEventTransport}s.
*/
public class AnnounceBuildEventTransportsEvent implements Postable {

private final Set<BuildEventTransport> transports;

public AnnounceBuildEventTransportsEvent(Collection<BuildEventTransport> transports) {
this.transports = ImmutableSet.copyOf(transports);
}

/**
* Returns a list of all active build event transports.
*/
public Set<BuildEventTransport> transports() {
return transports;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.buildeventstream;

import java.util.concurrent.Future;
import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -27,6 +27,11 @@
@ThreadSafe
public interface BuildEventTransport {

/**
* The name of this transport as can be displayed to a user.
*/
String name();

/**
* Writes a build event to an endpoint. This method will always return quickly and will not
* wait for the write to complete.
Expand All @@ -49,5 +54,5 @@ public interface BuildEventTransport {
*
* <p>This method should not throw any exceptions.
*/
Future<Void> close();
ListenableFuture<Void> close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2017 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.buildeventstream;

import com.google.devtools.build.lib.events.ExtendedEventHandler.Postable;

/**
* An event signaling that a {@link BuildEventTransport} has been closed.
*/
public class BuildEventTransportClosedEvent implements Postable {

private final BuildEventTransport transport;

public BuildEventTransportClosedEvent(BuildEventTransport transport) {
this.transport = transport;
}

/**
* Returns the {@link BuildEventTransport} that has been closed.
*/
public BuildEventTransport transport() {
return transport;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public final class BinaryFormatFileTransport extends FileTransport {
this.pathConverter = pathConverter;
}

@Override
public String name() {
return this.getClass().getSimpleName();
}

@Override
public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) {
BuildEventConverters converters =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
Expand All @@ -27,7 +28,6 @@
import java.nio.channels.CompletionHandler;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -87,7 +87,7 @@ synchronized void writeData(byte[] data) {
}

@Override
public synchronized Future<Void> close() {
public synchronized ListenableFuture<Void> close() {
if (closing()) {
return closeFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public final class TextFormatFileTransport extends FileTransport {
this.pathConverter = pathConverter;
}

@Override
public String name() {
return this.getClass().getSimpleName();
}

@Override
public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) {
BuildEventConverters converters =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,31 @@

package com.google.devtools.build.lib.runtime;

import static com.google.devtools.build.lib.events.Event.of;
import static com.google.devtools.build.lib.events.EventKind.PROGRESS;
import static com.google.devtools.build.lib.util.Preconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.ActionExecutedEvent;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.EventReportingArtifacts;
import com.google.devtools.build.lib.analysis.NoBuildEvent;
import com.google.devtools.build.lib.buildeventstream.AbortedEvent;
import com.google.devtools.build.lib.buildeventstream.AnnounceBuildEventTransportsEvent;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEventId;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.Aborted.AbortReason;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEventId.NamedSetOfFilesId;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransportClosedEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEventWithOrderConstraint;
import com.google.devtools.build.lib.buildeventstream.ProgressEvent;
import com.google.devtools.build.lib.buildtool.buildevent.BuildCompleteEvent;
Expand All @@ -39,6 +47,7 @@
import com.google.devtools.build.lib.collect.nestedset.NestedSetView;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.rules.extra.ExtraAction;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -47,13 +56,26 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/** Listen for {@link BuildEvent} and stream them to the provided {@link BuildEventTransport}. */
/**
* Listens for {@link BuildEvent}s and streams them to the provided {@link BuildEventTransport}s.
*
* <p>The streamer takes care of closing all {@link BuildEventTransport}s. It does so after having
* received a {@link BuildCompleteEvent}. Furthermore, it emits two event types to the
* {@code eventBus}. After having received the first {@link BuildEvent} it emits a
* {@link AnnounceBuildEventTransportsEvent} that contains a list of all its transports.
* Furthermore, after a transport has been closed, it emits
* a {@link BuildEventTransportClosedEvent}.
*/
public class BuildEventStreamer implements EventHandler {

private final Collection<BuildEventTransport> transports;
private final Reporter reporter;
private Set<BuildEventId> announcedEvents;
private final Set<BuildEventId> postedEvents = new HashSet<>();
private final Multimap<BuildEventId, BuildEvent> pendingEvents = HashMultimap.create();
Expand Down Expand Up @@ -93,8 +115,10 @@ synchronized String maybeName(NestedSetView<Artifact> view) {
}
}

public BuildEventStreamer(Collection<BuildEventTransport> transports) {
public BuildEventStreamer(Collection<BuildEventTransport> transports, Reporter reporter) {
checkArgument(transports.size() > 0);
this.transports = transports;
this.reporter = reporter;
this.announcedEvents = null;
this.progressCount = 0;
}
Expand All @@ -119,6 +143,10 @@ private void post(BuildEvent event) {
announcedEvents.addAll(linkEvent.getChildrenEvents());
postedEvents.add(linkEvent.getEventId());
}

if (reporter != null) {
reporter.post(new AnnounceBuildEventTransportsEvent(transports));
}
} else {
if (!announcedEvents.contains(id)) {
linkEvent = ProgressEvent.progressChainIn(progressCount, id);
Expand Down Expand Up @@ -165,20 +193,57 @@ private void clearAnnouncedEvents() {
}
}

private void close() {
List<Future<Void>> shutdownFutures = new ArrayList<>(transports.size());
private ScheduledFuture<?> bepUploadWaitEvent(ScheduledExecutorService executor) {
final long startNanos = System.nanoTime();
return executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
long deltaNanos = System.nanoTime() - startNanos;
long deltaSeconds = TimeUnit.NANOSECONDS.toSeconds(deltaNanos);
Event waitEvt =
of(PROGRESS, null, "Waiting for build event protocol upload: " + deltaSeconds + "s");
if (reporter != null) {
reporter.handle(waitEvt);
}
}
}, 0, 1, TimeUnit.SECONDS);
}

for (BuildEventTransport transport : transports) {
shutdownFutures.add(transport.close());
}
private void close() {
ScheduledExecutorService executor = null;
try {
executor = Executors.newSingleThreadScheduledExecutor();
List<ListenableFuture<Void>> closeFutures = new ArrayList<>(transports.size());
for (final BuildEventTransport transport : transports) {
ListenableFuture<Void> closeFuture = transport.close();
closeFuture.addListener(new Runnable() {
@Override
public void run() {
if (reporter != null) {
reporter.post(new BuildEventTransportClosedEvent(transport));
}
}
}, executor);
closeFutures.add(closeFuture);
}

// Wait for all transports to close.
for (Future<Void> f : shutdownFutures) {
try {
f.get();
if (closeFutures.isEmpty()) {
// Don't spam events if there is nothing to close.
return;
}

ScheduledFuture<?> f = bepUploadWaitEvent(executor);
// Wait for all transports to close.
Futures.allAsList(closeFutures).get();
f.cancel(true);
} catch (Exception e) {
log.severe("Failed to close a build event transport: " + e);
}
} finally {
if (executor != null) {
executor.shutdown();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public String apply(Path path) {
ImmutableSet<BuildEventTransport> buildEventTransports
= createFromOptions(besOptions, pathConverter);
if (!buildEventTransports.isEmpty()) {
BuildEventStreamer streamer = new BuildEventStreamer(buildEventTransports);
BuildEventStreamer streamer = new BuildEventStreamer(buildEventTransports,
commandEnvironment != null ? commandEnvironment.getReporter() : null);
return Optional.of(streamer);
}
} catch (IOException e) {
Expand Down
Loading

0 comments on commit 9e0308e

Please sign in to comment.