Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Sep 24, 2024
1 parent b5cf5d5 commit 08a9c1e
Show file tree
Hide file tree
Showing 17 changed files with 212 additions and 151 deletions.
1 change: 0 additions & 1 deletion runners/google-cloud-dataflow-java/worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ dependencies {
implementation project(":runners:core-java")
implementation project(":runners:java-fn-execution")
implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow")
implementation project(path: ':sdks:java:core')
shadow library.java.vendored_guava_32_1_2_jre

implementation library.java.google_auth_library_credentials
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,20 @@ synchronized Optional<ExecutableWork> completeWorkAndGetNextWorkForKey(
@Nullable Queue<ExecutableWork> workQueue = activeWork.get(shardedKey);
if (workQueue == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workId);
LOG.warn(
"Unable to complete inactive work for key={} and token={}. Work queue for key does not exist.",
shardedKey,
workId);
return Optional.empty();
}

removeCompletedWorkFromQueue(workQueue, shardedKey, workId);
return getNextWork(workQueue, shardedKey);
}

private synchronized void removeCompletedWorkFromQueue(
Queue<ExecutableWork> workQueue, ShardedKey shardedKey, WorkId workId) {
// avoid Preconditions.checkState here to prevent eagerly evaluating the
// format string parameters for the error message.
ExecutableWork completedWork = workQueue.peek();
@Nullable ExecutableWork completedWork = workQueue.peek();
if (completedWork == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Active key {} without work, expected token {}", shardedKey, workId);
Expand Down Expand Up @@ -346,7 +348,7 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
+ "<th>Active For</th>"
+ "<th>State</th>"
+ "<th>State Active For</th>"
+ "<th>Produced By</th>"
+ "<th>Backend</th>"
+ "</tr>");
// Use StringBuilder because we are appending in loop.
StringBuilder activeWorkStatus = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,59 +269,53 @@ public synchronized void shutdown() {
channelCachingStubFactory.shutdown();
}

private void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) {
CompletableFuture<Void> closeStaleStreams;

synchronized (this) {
LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
closeStaleStreams =
closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams());
ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams =
createAndStartNewStreams(newWindmillConnections.values()).join();
StreamingEngineConnectionState newConnectionsState =
StreamingEngineConnectionState.builder()
.setWindmillConnections(newWindmillConnections)
.setWindmillStreams(newStreams)
.setGlobalDataStreams(
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
.build();
LOG.info(
"Setting new connections: {}. Previous connections: {}.",
newConnectionsState,
connections.get());
connections.set(newConnectionsState);
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget);
}

// Close the streams outside the lock.
closeStaleStreams.join();
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) {
LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams());
ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams =
createAndStartNewStreams(newWindmillConnections.values()).join();
StreamingEngineConnectionState newConnectionsState =
StreamingEngineConnectionState.builder()
.setWindmillConnections(newWindmillConnections)
.setWindmillStreams(newStreams)
.setGlobalDataStreams(
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
.build();
LOG.info(
"Setting new connections: {}. Previous connections: {}.",
newConnectionsState,
connections.get());
connections.set(newConnectionsState);
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget);
}

/** Close the streams that are no longer valid asynchronously. */
private CompletableFuture<Void> closeStaleStreams(
@SuppressWarnings("FutureReturnValueIgnored")
private void closeStaleStreams(
Collection<WindmillConnection> newWindmillConnections,
ImmutableMap<WindmillConnection, WindmillStreamSender> currentStreams) {
return CompletableFuture.allOf(
currentStreams.entrySet().stream()
.filter(
connectionAndStream ->
!newWindmillConnections.contains(connectionAndStream.getKey()))
.map(
entry ->
CompletableFuture.runAsync(
() -> {
LOG.debug("Closing streams to {}", entry);
entry.getValue().closeAllStreams();
entry
.getKey()
.directEndpoint()
.ifPresent(channelCachingStubFactory::remove);
LOG.debug("Successfully closed streams to {}", entry);
},
windmillStreamManager))
.toArray(CompletableFuture[]::new));
currentStreams.entrySet().stream()
.filter(
connectionAndStream -> !newWindmillConnections.contains(connectionAndStream.getKey()))
.forEach(
entry ->
CompletableFuture.runAsync(
() -> {
LOG.debug("Closing streams to {}", entry);
try {
entry.getValue().closeAllStreams();
entry
.getKey()
.directEndpoint()
.ifPresent(channelCachingStubFactory::remove);
LOG.debug("Successfully closed streams to {}", entry);
} catch (Exception e) {
LOG.error("Error closing streams to {}", entry);
}
},
windmillStreamManager));
}

private synchronized CompletableFuture<ImmutableMap<WindmillConnection, WindmillStreamSender>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void adjustBudget(long itemsDelta, long bytesDelta) {
GetWorkBudget.builder().setItems(itemsDelta).setBytes(bytesDelta).build();
getWorkBudget.getAndSet(adjustment);
if (started.get()) {
getWorkStream.get().adjustBudget(adjustment);
getWorkStream.get().setBudget(adjustment);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
private final String backendWorkerToken;
private final ResettableRequestObserver<RequestT> requestObserver;
private final AtomicBoolean isShutdown;
private final AtomicReference<DateTime> shutdownTime;

/**
* Indicates if the current {@link ResettableRequestObserver} was closed by calling {@link
Expand Down Expand Up @@ -140,6 +141,7 @@ protected AbstractWindmillStream(
new AbstractWindmillStream<RequestT, ResponseT>.ResponseObserver()));
this.sleeper = Sleeper.DEFAULT;
this.logger = logger;
this.shutdownTime = new AtomicReference<>();
}

private static String createThreadName(String streamType, String backendWorkerToken) {
Expand Down Expand Up @@ -293,11 +295,14 @@ public final void appendSummaryHtml(PrintWriter writer) {
writer.format(", %dms backoff remaining", sleepLeft);
}
writer.format(
", current stream is %dms old, last send %dms, last response %dms, closed: %s",
", current stream is %dms old, last send %dms, last response %dms, closed: %s, "
+ "isShutdown: %s, shutdown time: %s",
debugDuration(nowMs, startTimeMs.get()),
debugDuration(nowMs, lastSendTimeMs.get()),
debugDuration(nowMs, lastResponseTimeMs.get()),
streamClosed.get());
streamClosed.get(),
isShutdown.get(),
shutdownTime.get());
}

/**
Expand All @@ -307,7 +312,7 @@ public final void appendSummaryHtml(PrintWriter writer) {
protected abstract void appendSpecificHtml(PrintWriter writer);

@Override
public final void halfClose() {
public final synchronized void halfClose() {
clientClosed.set(true);
requestObserver.onCompleted();
streamClosed.set(true);
Expand Down Expand Up @@ -336,6 +341,7 @@ public final void shutdown() {
requestObserver()
.onError(new WindmillStreamShutdownException("Explicit call to shutdown stream."));
shutdownInternal();
shutdownTime.set(DateTime.now());
}
}

Expand All @@ -362,7 +368,7 @@ private static class ResettableRequestObserver<RequestT> implements StreamObserv
private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;

@GuardedBy("this")
private volatile @Nullable StreamObserver<RequestT> delegateRequestObserver;
private @Nullable StreamObserver<RequestT> delegateRequestObserver;

private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> requestObserverSupplier) {
this.requestObserverSupplier = requestObserverSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ public interface WindmillStream {
@ThreadSafe
interface GetWorkStream extends WindmillStream {
/** Adjusts the {@link GetWorkBudget} for the stream. */
void adjustBudget(long itemsDelta, long bytesDelta);
void setBudget(long itemsDelta, long bytesDelta);

default void adjustBudget(GetWorkBudget newBudget) {
adjustBudget(newBudget.items(), newBudget.bytes());
default void setBudget(GetWorkBudget newBudget) {
setBudget(newBudget.items(), newBudget.bytes());
}

/** Returns the remaining in-flight {@link GetWorkBudget}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ abstract static class ComputationMetadata {
private static ComputationMetadata fromProto(
Windmill.ComputationWorkItemMetadata metadataProto) {
return new AutoValue_GetWorkResponseChunkAssembler_ComputationMetadata(
metadataProto.getComputationId(),
Preconditions.checkNotNull(metadataProto.getComputationId()),
WindmillTimeUtils.windmillToHarnessWatermark(metadataProto.getInputDataWatermark()),
WindmillTimeUtils.windmillToHarnessWatermark(
metadataProto.getDependentRealtimeInputWatermark()));
Expand Down
Loading

0 comments on commit 08a9c1e

Please sign in to comment.