diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle b/runners/google-cloud-dataflow-java/worker/build.gradle index 62d02148e3a2d..b7e6e981effec 100644 --- a/runners/google-cloud-dataflow-java/worker/build.gradle +++ b/runners/google-cloud-dataflow-java/worker/build.gradle @@ -189,7 +189,6 @@ dependencies { implementation project(":runners:core-java") implementation project(":runners:java-fn-execution") implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow") - implementation project(path: ':sdks:java:core') shadow library.java.vendored_guava_32_1_2_jre implementation library.java.google_auth_library_credentials diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index bd8257ff842bc..0e23fb145fce6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -108,9 +108,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; import org.joda.time.Instant; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index 99c3dd4558f6a..0f094e7895f20 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -240,18 +240,20 @@ synchronized Optional completeWorkAndGetNextWorkForKey( @Nullable Queue workQueue = activeWork.get(shardedKey); if (workQueue == null) { // Work may have been completed due to clearing of stuck commits. - LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workId); + LOG.warn( + "Unable to complete inactive work for key={} and token={}. Work queue for key does not exist.", + shardedKey, + workId); return Optional.empty(); } + removeCompletedWorkFromQueue(workQueue, shardedKey, workId); return getNextWork(workQueue, shardedKey); } private synchronized void removeCompletedWorkFromQueue( Queue workQueue, ShardedKey shardedKey, WorkId workId) { - // avoid Preconditions.checkState here to prevent eagerly evaluating the - // format string parameters for the error message. - ExecutableWork completedWork = workQueue.peek(); + @Nullable ExecutableWork completedWork = workQueue.peek(); if (completedWork == null) { // Work may have been completed due to clearing of stuck commits. LOG.warn("Active key {} without work, expected token {}", shardedKey, workId); @@ -346,7 +348,7 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { + "Active For" + "State" + "State Active For" - + "Produced By" + + "Backend" + ""); // Use StringBuilder because we are appending in loop. StringBuilder activeWorkStatus = new StringBuilder(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java index 156795f71e883..82b3e07db8d3e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java @@ -269,59 +269,53 @@ public synchronized void shutdown() { channelCachingStubFactory.shutdown(); } - private void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { - CompletableFuture closeStaleStreams; - - synchronized (this) { - LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints); - ImmutableMap newWindmillConnections = - createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints()); - closeStaleStreams = - closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams()); - ImmutableMap newStreams = - createAndStartNewStreams(newWindmillConnections.values()).join(); - StreamingEngineConnectionState newConnectionsState = - StreamingEngineConnectionState.builder() - .setWindmillConnections(newWindmillConnections) - .setWindmillStreams(newStreams) - .setGlobalDataStreams( - createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints())) - .build(); - LOG.info( - "Setting new connections: {}. Previous connections: {}.", - newConnectionsState, - connections.get()); - connections.set(newConnectionsState); - getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget); - } - - // Close the streams outside the lock. - closeStaleStreams.join(); + private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { + LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints); + ImmutableMap newWindmillConnections = + createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints()); + closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams()); + ImmutableMap newStreams = + createAndStartNewStreams(newWindmillConnections.values()).join(); + StreamingEngineConnectionState newConnectionsState = + StreamingEngineConnectionState.builder() + .setWindmillConnections(newWindmillConnections) + .setWindmillStreams(newStreams) + .setGlobalDataStreams( + createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints())) + .build(); + LOG.info( + "Setting new connections: {}. Previous connections: {}.", + newConnectionsState, + connections.get()); + connections.set(newConnectionsState); + getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget); } /** Close the streams that are no longer valid asynchronously. */ - private CompletableFuture closeStaleStreams( + @SuppressWarnings("FutureReturnValueIgnored") + private void closeStaleStreams( Collection newWindmillConnections, ImmutableMap currentStreams) { - return CompletableFuture.allOf( - currentStreams.entrySet().stream() - .filter( - connectionAndStream -> - !newWindmillConnections.contains(connectionAndStream.getKey())) - .map( - entry -> - CompletableFuture.runAsync( - () -> { - LOG.debug("Closing streams to {}", entry); - entry.getValue().closeAllStreams(); - entry - .getKey() - .directEndpoint() - .ifPresent(channelCachingStubFactory::remove); - LOG.debug("Successfully closed streams to {}", entry); - }, - windmillStreamManager)) - .toArray(CompletableFuture[]::new)); + currentStreams.entrySet().stream() + .filter( + connectionAndStream -> !newWindmillConnections.contains(connectionAndStream.getKey())) + .forEach( + entry -> + CompletableFuture.runAsync( + () -> { + LOG.debug("Closing streams to {}", entry); + try { + entry.getValue().closeAllStreams(); + entry + .getKey() + .directEndpoint() + .ifPresent(channelCachingStubFactory::remove); + LOG.debug("Successfully closed streams to {}", entry); + } catch (Exception e) { + LOG.error("Error closing streams to {}", entry); + } + }, + windmillStreamManager)); } private synchronized CompletableFuture> diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java index c4b5d071d31ef..3d93629951ea9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java @@ -159,7 +159,7 @@ public void adjustBudget(long itemsDelta, long bytesDelta) { GetWorkBudget.builder().setItems(itemsDelta).setBytes(bytesDelta).build(); getWorkBudget.getAndSet(adjustment); if (started.get()) { - getWorkStream.get().adjustBudget(adjustment); + getWorkStream.get().setBudget(adjustment); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java index 1f7dfa5182c33..8a5efbd2da965 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java @@ -92,6 +92,7 @@ public abstract class AbstractWindmillStream implements Win private final String backendWorkerToken; private final ResettableRequestObserver requestObserver; private final AtomicBoolean isShutdown; + private final AtomicReference shutdownTime; /** * Indicates if the current {@link ResettableRequestObserver} was closed by calling {@link @@ -140,6 +141,7 @@ protected AbstractWindmillStream( new AbstractWindmillStream.ResponseObserver())); this.sleeper = Sleeper.DEFAULT; this.logger = logger; + this.shutdownTime = new AtomicReference<>(); } private static String createThreadName(String streamType, String backendWorkerToken) { @@ -293,11 +295,14 @@ public final void appendSummaryHtml(PrintWriter writer) { writer.format(", %dms backoff remaining", sleepLeft); } writer.format( - ", current stream is %dms old, last send %dms, last response %dms, closed: %s", + ", current stream is %dms old, last send %dms, last response %dms, closed: %s, " + + "isShutdown: %s, shutdown time: %s", debugDuration(nowMs, startTimeMs.get()), debugDuration(nowMs, lastSendTimeMs.get()), debugDuration(nowMs, lastResponseTimeMs.get()), - streamClosed.get()); + streamClosed.get(), + isShutdown.get(), + shutdownTime.get()); } /** @@ -307,7 +312,7 @@ public final void appendSummaryHtml(PrintWriter writer) { protected abstract void appendSpecificHtml(PrintWriter writer); @Override - public final void halfClose() { + public final synchronized void halfClose() { clientClosed.set(true); requestObserver.onCompleted(); streamClosed.set(true); @@ -336,6 +341,7 @@ public final void shutdown() { requestObserver() .onError(new WindmillStreamShutdownException("Explicit call to shutdown stream.")); shutdownInternal(); + shutdownTime.set(DateTime.now()); } } @@ -362,7 +368,7 @@ private static class ResettableRequestObserver implements StreamObserv private final Supplier> requestObserverSupplier; @GuardedBy("this") - private volatile @Nullable StreamObserver delegateRequestObserver; + private @Nullable StreamObserver delegateRequestObserver; private ResettableRequestObserver(Supplier> requestObserverSupplier) { this.requestObserverSupplier = requestObserverSupplier; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java index a44ac7bd1cae9..526575c94e321 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java @@ -57,10 +57,10 @@ public interface WindmillStream { @ThreadSafe interface GetWorkStream extends WindmillStream { /** Adjusts the {@link GetWorkBudget} for the stream. */ - void adjustBudget(long itemsDelta, long bytesDelta); + void setBudget(long itemsDelta, long bytesDelta); - default void adjustBudget(GetWorkBudget newBudget) { - adjustBudget(newBudget.items(), newBudget.bytes()); + default void setBudget(GetWorkBudget newBudget) { + setBudget(newBudget.items(), newBudget.bytes()); } /** Returns the remaining in-flight {@link GetWorkBudget}. */ diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java index 9f30f75919f97..25c959e6d6034 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java @@ -103,7 +103,7 @@ abstract static class ComputationMetadata { private static ComputationMetadata fromProto( Windmill.ComputationWorkItemMetadata metadataProto) { return new AutoValue_GetWorkResponseChunkAssembler_ComputationMetadata( - metadataProto.getComputationId(), + Preconditions.checkNotNull(metadataProto.getComputationId()), WindmillTimeUtils.windmillToHarnessWatermark(metadataProto.getInputDataWatermark()), WindmillTimeUtils.windmillToHarnessWatermark( metadataProto.getDependentRealtimeInputWatermark())); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java index e830b04e0f522..74d8814d20d56 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java @@ -17,10 +17,12 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; +import com.google.auto.value.AutoValue; import java.io.PrintWriter; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -44,7 +46,6 @@ import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +72,7 @@ public final class GrpcDirectGetWorkStream .build(); private final AtomicReference maxGetWorkBudget; - private final AtomicReference inFlightBudget; + private final GetWorkBudgetTracker budgetTracker; private final GetWorkRequest requestHeader; private final WorkItemScheduler workItemScheduler; private final ThrottleTimer getWorkThrottleTimer; @@ -127,8 +128,8 @@ private GrpcDirectGetWorkStream( .setItems(requestHeader.getMaxItems()) .setBytes(requestHeader.getMaxBytes()) .build()); - this.inFlightBudget = new AtomicReference<>(GetWorkBudget.noBudget()); this.lastRequest = new AtomicReference<>(); + this.budgetTracker = GetWorkBudgetTracker.create(); } public static GrpcDirectGetWorkStream create( @@ -179,31 +180,21 @@ private static Watermarks createWatermarks( * which can deadlock since we send on the stream beneath the synchronization. {@link * AbstractWindmillStream#send(Object)} is synchronized so the sends are already guarded. */ - private void sendRequestExtension() { - GetWorkBudget currentInFlightBudget = inFlightBudget.get(); - GetWorkBudget currentMaxBudget = maxGetWorkBudget.get(); - - // If the outstanding items or bytes limit has gotten too low, top both off with a - // GetWorkExtension. The goal is to keep the limits relatively close to their maximum - // values without sending too many extension requests. - if (currentInFlightBudget.items() < currentMaxBudget.items() / 2 - || currentInFlightBudget.bytes() < currentMaxBudget.bytes() / 2) { - GetWorkBudget extension = currentMaxBudget.subtract(currentInFlightBudget); - if (extension.items() > 0 || extension.bytes() > 0) { - inFlightBudget.getAndUpdate(budget -> budget.apply(extension)); - executeSafely( - () -> { - StreamingGetWorkRequest request = - StreamingGetWorkRequest.newBuilder() - .setRequestExtension( - Windmill.StreamingGetWorkRequestExtension.newBuilder() - .setMaxItems(extension.items()) - .setMaxBytes(extension.bytes())) - .build(); - lastRequest.getAndSet(request); - send(request); - }); - } + private void sendRequestExtension(GetWorkBudget extension) { + if (extension.items() > 0 || extension.bytes() > 0) { + executeSafely( + () -> { + StreamingGetWorkRequest request = + StreamingGetWorkRequest.newBuilder() + .setRequestExtension( + Windmill.StreamingGetWorkRequestExtension.newBuilder() + .setMaxItems(extension.items()) + .setMaxBytes(extension.bytes())) + .build(); + lastRequest.set(request); + budgetTracker.recordBudgetRequested(extension); + send(request); + }); } } @@ -211,18 +202,20 @@ private void sendRequestExtension() { protected synchronized void onNewStream() { workItemAssemblers.clear(); if (!isShutdown()) { - GetWorkBudget currentMaxGetWorkBudget = maxGetWorkBudget.get(); - inFlightBudget.getAndSet(currentMaxGetWorkBudget); + budgetTracker.reset(); + GetWorkBudget initialGetWorkBudget = + budgetTracker.computeBudgetExtension(maxGetWorkBudget.get()); StreamingGetWorkRequest request = StreamingGetWorkRequest.newBuilder() .setRequest( requestHeader .toBuilder() - .setMaxItems(currentMaxGetWorkBudget.items()) - .setMaxBytes(currentMaxGetWorkBudget.bytes()) + .setMaxItems(initialGetWorkBudget.items()) + .setMaxBytes(initialGetWorkBudget.bytes()) .build()) .build(); - lastRequest.getAndSet(request); + lastRequest.set(request); + budgetTracker.recordBudgetRequested(initialGetWorkBudget); send(request); } } @@ -236,8 +229,18 @@ protected boolean hasPendingRequests() { public void appendSpecificHtml(PrintWriter writer) { // Number of buffers is same as distinct workers that sent work on this stream. writer.format( - "GetWorkStream: %d buffers, in-flight budget: %s; last sent request: %s.", - workItemAssemblers.size(), inFlightBudget.get(), lastRequest.get()); + "GetWorkStream: %d buffers, " + + "max budget: %s, " + + "in-flight budget: %s, " + + "total budget requested: %s, " + + "total budget received: %s," + + "last sent request: %s. ", + workItemAssemblers.size(), + maxGetWorkBudget.get(), + budgetTracker.inFlightBudget(), + budgetTracker.totalRequestedBudget(), + budgetTracker.totalReceivedBudget(), + lastRequest.get()); } @Override @@ -255,17 +258,17 @@ protected void onResponse(StreamingGetWorkResponseChunk chunk) { } private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) { - // Record the fact that there are now fewer outstanding messages and bytes on the stream. - inFlightBudget.updateAndGet(budget -> budget.subtract(1, assembledWorkItem.bufferedSize())); WorkItem workItem = assembledWorkItem.workItem(); GetWorkResponseChunkAssembler.ComputationMetadata metadata = assembledWorkItem.computationMetadata(); workItemScheduler.scheduleWork( workItem, - createWatermarks(workItem, Preconditions.checkNotNull(metadata)), - createProcessingContext(Preconditions.checkNotNull(metadata.computationId())), + createWatermarks(workItem, metadata), + createProcessingContext(metadata.computationId()), assembledWorkItem.latencyAttributions()); - sendRequestExtension(); + budgetTracker.recordBudgetReceived(assembledWorkItem.bufferedSize()); + GetWorkBudget extension = budgetTracker.computeBudgetExtension(maxGetWorkBudget.get()); + sendRequestExtension(extension); } private Work.ProcessingContext createProcessingContext(String computationId) { @@ -283,19 +286,100 @@ protected void startThrottleTimer() { } @Override - public void adjustBudget(long itemsDelta, long bytesDelta) { - maxGetWorkBudget.getAndSet( - GetWorkBudget.builder().setItems(itemsDelta).setBytes(bytesDelta).build()); - sendRequestExtension(); + public void setBudget(long items, long bytes) { + GetWorkBudget currentMaxGetWorkBudget = + maxGetWorkBudget.updateAndGet( + ignored -> GetWorkBudget.builder().setItems(items).setBytes(bytes).build()); + GetWorkBudget extension = budgetTracker.computeBudgetExtension(currentMaxGetWorkBudget); + sendRequestExtension(extension); } @Override public GetWorkBudget remainingBudget() { - return maxGetWorkBudget.get().subtract(inFlightBudget.get()); + return maxGetWorkBudget.get().subtract(budgetTracker.inFlightBudget()); } @Override protected void shutdownInternal() { workItemAssemblers.clear(); } + + /** + * Tracks sent and received GetWorkBudget and uses this information to generate request + * extensions. + */ + @AutoValue + abstract static class GetWorkBudgetTracker { + + private static GetWorkBudgetTracker create() { + return new AutoValue_GrpcDirectGetWorkStream_GetWorkBudgetTracker( + new AtomicLong(), new AtomicLong(), new AtomicLong(), new AtomicLong()); + } + + abstract AtomicLong itemsRequested(); + + abstract AtomicLong bytesRequested(); + + abstract AtomicLong itemsReceived(); + + abstract AtomicLong bytesReceived(); + + private void reset() { + itemsRequested().set(0); + bytesRequested().set(0); + itemsReceived().set(0); + bytesReceived().set(0); + } + + private void recordBudgetRequested(GetWorkBudget budgetRequested) { + itemsRequested().addAndGet(budgetRequested.items()); + bytesRequested().addAndGet(budgetRequested.bytes()); + } + + private void recordBudgetReceived(long bytesReceived) { + itemsReceived().incrementAndGet(); + bytesReceived().addAndGet(bytesReceived); + } + + /** + * If the outstanding items or bytes limit has gotten too low, top both off with a + * GetWorkExtension. The goal is to keep the limits relatively close to their maximum values + * without sending too many extension requests. + */ + private GetWorkBudget computeBudgetExtension(GetWorkBudget maxGetWorkBudget) { + // Expected items and bytes can go negative here, since WorkItems returned might be larger + // than the initially requested budget. + long inFlightItems = itemsRequested().get() - itemsReceived().get(); + long inFlightBytes = bytesRequested().get() - bytesReceived().get(); + + // Don't send negative budget extensions. + long requestBytes = Math.max(0, maxGetWorkBudget.bytes() - inFlightBytes); + long requestItems = Math.max(0, maxGetWorkBudget.items() - inFlightItems); + + return (inFlightItems > requestItems / 2 && inFlightBytes > requestBytes / 2) + ? GetWorkBudget.noBudget() + : GetWorkBudget.builder().setItems(requestItems).setBytes(requestBytes).build(); + } + + private GetWorkBudget inFlightBudget() { + return GetWorkBudget.builder() + .setItems(itemsRequested().get() - itemsReceived().get()) + .setBytes(bytesRequested().get() - bytesReceived().get()) + .build(); + } + + private GetWorkBudget totalRequestedBudget() { + return GetWorkBudget.builder() + .setItems(itemsRequested().get()) + .setBytes(bytesRequested().get()) + .build(); + } + + private GetWorkBudget totalReceivedBudget() { + return GetWorkBudget.builder() + .setItems(itemsReceived().get()) + .setBytes(bytesReceived().get()) + .build(); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java index 503312c291869..27d983f83c639 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java @@ -342,7 +342,7 @@ private ResponseT issueRequest(QueuedRequest request, ParseFn ResponseT issueRequest(QueuedRequest request, ParseFn ResponseT issueRequest(QueuedRequest request, ParseFn createStreamCancelledErrorMessage() { throw new IllegalStateException(); } }) + .limit(STREAM_CANCELLED_ERROR_LOG_LIMIT) .collect(toImmutableList()); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java index 343758ce87a6a..a5f2685a2e898 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java @@ -194,7 +194,7 @@ protected void startThrottleTimer() { } @Override - public void adjustBudget(long itemsDelta, long bytesDelta) { + public void setBudget(long itemsDelta, long bytesDelta) { // no-op } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java index 9ced0139bfd00..da698865c86fd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java @@ -19,12 +19,10 @@ import com.google.errorprone.annotations.concurrent.GuardedBy; import java.io.PrintWriter; -import java.util.List; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse; @@ -171,13 +169,9 @@ protected void sendHealthCheck() { @Override protected void appendSpecificHtml(PrintWriter writer) { synchronized (metadataLock) { - List backendWorkerTokens = - latestResponse.getWorkEndpointsList().stream() - .map(WorkerMetadataResponse.Endpoint::getBackendWorkerToken) - .collect(Collectors.toList()); writer.format( "GetWorkerMetadataStream: job_header=[%s], current_metadata=[%s]", - workerMetadataRequest.getHeader(), backendWorkerTokens); + workerMetadataRequest.getHeader(), latestResponse); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java index 4872c3878af60..2f4fbce9c6090 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java @@ -22,7 +22,6 @@ import com.google.auto.value.AutoBuilder; import java.io.PrintWriter; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.Timer; @@ -325,7 +324,6 @@ private StreamObserverFactory newStreamObserverFactory(long deadline) { public void appendSummaryHtml(PrintWriter writer) { writer.write("Active Streams:
"); streamRegistry.stream() - .sorted(Comparator.comparing(AbstractWindmillStream::backendWorkerToken)) .collect( toImmutableListMultimap( AbstractWindmillStream::backendWorkerToken, Function.identity())) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java index eed1429112dbc..010d3d81e15ac 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java @@ -90,13 +90,13 @@ public void onNext(T value) { // buffer periodically. This reduces the overhead of blocking while still restricting // memory because there is a limited # of streams, and we have a max messages size of 2MB. if (++messagesSinceReady <= messagesBetweenIsReadyChecks) { - tryOnNext(value); + outboundObserver.onNext(value); return; } if (outboundObserver.isReady()) { messagesSinceReady = 0; - tryOnNext(value); + outboundObserver.onNext(value); return; } } @@ -116,7 +116,7 @@ public void onNext(T value) { synchronized (lock) { messagesSinceReady = 0; - tryOnNext(value); + outboundObserver.onNext(value); return; } } catch (TimeoutException e) { @@ -141,24 +141,7 @@ public void onNext(T value) { waitSeconds = waitSeconds * 2; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - StreamObserverCancelledException ex = new StreamObserverCancelledException(e); - LOG.error("Interrupted while waiting for outboundObserver to become ready.", ex); - throw ex; - } - } - } - - /** - * Only send the next value if the phaser is not terminated by the time we acquire the lock since - * the phaser can be terminated at any time. - */ - private void tryOnNext(T value) { - if (isReadyNotifier.isTerminated()) { - return; - } - synchronized (lock) { - if (!isReadyNotifier.isTerminated()) { - outboundObserver.onNext(value); + throw new StreamObserverCancelledException(e); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java index 166806ecdc8f8..adb8ffe8753ff 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java @@ -85,7 +85,7 @@ public GetWorkBudget subtract(long items, long bytes) { public abstract static class Builder { public abstract Builder setBytes(long bytes); - public abstract Builder setItems(long budget); + public abstract Builder setItems(long items); abstract long items(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java index b3f7467cdbd34..f2edeb2b8c2ec 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java @@ -245,7 +245,7 @@ public void halfClose() { } @Override - public void adjustBudget(long itemsDelta, long bytesDelta) { + public void setBudget(long itemsDelta, long bytesDelta) { // no-op. }