diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index f196852b22532..90f072be997ed 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -23,14 +23,13 @@ import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.MapTask; import com.google.auto.value.AutoValue; -import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -48,26 +47,25 @@ import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache; import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; -import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; -import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor; import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig; +import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness; +import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; +import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusPages; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; -import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter; @@ -104,12 +102,11 @@ import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -119,7 +116,8 @@ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) -public class StreamingDataflowWorker { +public final class StreamingDataflowWorker { + /** * Sinks are marked 'full' in {@link StreamingModeExecutionContext} once the amount of data sinked * (across all the sinks, if there are more than one) reaches this limit. This serves as hint for @@ -128,47 +126,44 @@ public class StreamingDataflowWorker { */ public static final int MAX_SINK_BYTES = 10_000_000; - // Maximum number of threads for processing. Currently, each thread processes one key at a time. - static final int MAX_PROCESSING_THREADS = 300; - static final long THREAD_EXPIRATION_TIME_SEC = 60; - static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3; - static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1); private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorker.class); - private static final Duration GET_DATA_STREAM_TIMEOUT = Duration.standardSeconds(30); + + /** + * Maximum number of threads for processing. Currently, each thread processes one key at a time. + */ + private static final int MAX_PROCESSING_THREADS = 300; /** The idGenerator to generate unique id globally. */ private static final IdGenerator ID_GENERATOR = IdGenerators.decrementingLongs(); - private static final int DEFAULT_STATUS_PORT = 8081; - // Maximum size of the result of a GetWork request. + /** Maximum size of the result of a GetWork request. */ private static final long MAX_GET_WORK_FETCH_BYTES = 64L << 20; // 64m /** Maximum number of failure stacktraces to report in each update sent to backend. */ private static final int MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000; - private static final Random clientIdGenerator = new Random(); + private static final long THREAD_EXPIRATION_TIME_SEC = 60; + private static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1); + private static final Duration GET_DATA_STREAM_TIMEOUT = Duration.standardSeconds(30); + private static final int DEFAULT_STATUS_PORT = 8081; + private static final Random CLIENT_ID_GENERATOR = new Random(); private static final String CHANNELZ_PATH = "/channelz"; - final WindmillStateCache stateCache; + + private final WindmillStateCache stateCache; private final StreamingWorkerStatusPages statusPages; private final ComputationConfig.Fetcher configFetcher; private final ComputationStateCache computationStateCache; private final BoundedQueueExecutor workUnitExecutor; - private final WindmillServerStub windmillServer; - private final Thread dispatchThread; + private final StreamingWorkerHarness streamingWorkerHarness; private final AtomicBoolean running = new AtomicBoolean(); private final DataflowWorkerHarnessOptions options; - private final long clientId; - private final GetDataClient getDataClient; - private final MemoryMonitor memoryMonitor; - private final Thread memoryMonitorThread; + private final BackgroundMemoryMonitor memoryMonitor; private final ReaderCache readerCache; private final DataflowExecutionStateSampler sampler = DataflowExecutionStateSampler.instance(); private final ActiveWorkRefresher activeWorkRefresher; private final WorkCommitter workCommitter; private final StreamingWorkerStatusReporter workerStatusReporter; private final StreamingCounters streamingCounters; - private final StreamingWorkScheduler streamingWorkScheduler; - private final HeartbeatSender heartbeatSender; private StreamingDataflowWorker( WindmillServerStub windmillServer, @@ -226,39 +221,42 @@ private StreamingDataflowWorker( this.workUnitExecutor = workUnitExecutor; - memoryMonitorThread = new Thread(memoryMonitor); - memoryMonitorThread.setPriority(Thread.MIN_PRIORITY); - memoryMonitorThread.setName("MemoryMonitor"); - - dispatchThread = - new Thread( - () -> { - LOG.info("Dispatch starting"); - if (windmillServiceEnabled) { - streamingDispatchLoop(); - } else { - dispatchLoop(); - } - LOG.info("Dispatch done"); - }); - dispatchThread.setDaemon(true); - dispatchThread.setPriority(Thread.MIN_PRIORITY); - dispatchThread.setName("DispatchThread"); - this.clientId = clientId; - this.windmillServer = windmillServer; + this.workerStatusReporter = workerStatusReporter; + this.streamingCounters = streamingCounters; + this.memoryMonitor = BackgroundMemoryMonitor.create(memoryMonitor); + StreamingWorkScheduler streamingWorkScheduler = + StreamingWorkScheduler.create( + options, + clock, + readerCache, + mapTaskExecutorFactory, + workUnitExecutor, + stateCache::forComputation, + failureTracker, + workFailureProcessor, + streamingCounters, + hotKeyLogger, + sampler, + operationalLimits, + ID_GENERATOR, + stageInfoMap); ThrottlingGetDataMetricTracker getDataMetricTracker = new ThrottlingGetDataMetricTracker(memoryMonitor); - + WorkerStatusPages workerStatusPages = + WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); + StreamingWorkerStatusPages.Builder statusPagesBuilder = StreamingWorkerStatusPages.builder(); int stuckCommitDurationMillis; + GetDataClient getDataClient; + HeartbeatSender heartbeatSender; if (windmillServiceEnabled) { WindmillStreamPool getDataStreamPool = WindmillStreamPool.create( Math.max(1, options.getWindmillGetDataStreamCount()), GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream); - this.getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); - this.heartbeatSender = + getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); + heartbeatSender = new StreamPoolHeartbeatSender( options.getUseSeparateWindmillHeartbeatStreams() ? WindmillStreamPool.create( @@ -266,9 +264,16 @@ private StreamingDataflowWorker( : getDataStreamPool); stuckCommitDurationMillis = options.getStuckCommitDurationMillis() > 0 ? options.getStuckCommitDurationMillis() : 0; + statusPagesBuilder + .setDebugCapture( + new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) + .setChannelzServlet( + new ChannelzServlet( + CHANNELZ_PATH, options, windmillServer::getWindmillServiceEndpoints)) + .setWindmillStreamFactory(windmillStreamFactory); } else { - this.getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker); - this.heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData); + getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker); + heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData); stuckCommitDurationMillis = 0; } @@ -282,49 +287,40 @@ private StreamingDataflowWorker( executorSupplier.apply("RefreshWork"), getDataMetricTracker::trackHeartbeats); - WorkerStatusPages workerStatusPages = - WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); - StreamingWorkerStatusPages.Builder statusPagesBuilder = - StreamingWorkerStatusPages.builder() + this.statusPages = + statusPagesBuilder .setClock(clock) .setClientId(clientId) .setIsRunning(running) .setStatusPages(workerStatusPages) .setStateCache(stateCache) - .setComputationStateCache(computationStateCache) + .setComputationStateCache(this.computationStateCache) .setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes) .setGetDataStatusProvider(getDataClient::printHtml) - .setWorkUnitExecutor(workUnitExecutor); + .setWorkUnitExecutor(workUnitExecutor) + .build(); - this.statusPages = - windmillServiceEnabled - ? statusPagesBuilder - .setDebugCapture( - new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) - .setChannelzServlet(new ChannelzServlet(CHANNELZ_PATH, options, windmillServer)) - .setWindmillStreamFactory(windmillStreamFactory) - .build() - : statusPagesBuilder.build(); + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaximumBundlesOutstanding()) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); - this.workerStatusReporter = workerStatusReporter; - this.streamingCounters = streamingCounters; - this.memoryMonitor = memoryMonitor; - this.streamingWorkScheduler = - StreamingWorkScheduler.create( - options, - clock, - readerCache, - mapTaskExecutorFactory, - workUnitExecutor, - stateCache::forComputation, - failureTracker, - workFailureProcessor, - streamingCounters, - hotKeyLogger, - sampler, - operationalLimits, - ID_GENERATOR, - stageInfoMap); + this.streamingWorkerHarness = + SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setGetWorkSender( + windmillServiceEnabled + ? GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)) + : GetWorkSender.forAppliance(() -> windmillServer.getWork(request))) + .build(); LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled); LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint()); @@ -333,7 +329,7 @@ private StreamingDataflowWorker( } public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) { - long clientId = clientIdGenerator.nextLong(); + long clientId = CLIENT_ID_GENERATOR.nextLong(); MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options); ConcurrentMap stageInfo = new ConcurrentHashMap<>(); StreamingCounters streamingCounters = StreamingCounters.create(); @@ -438,9 +434,10 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o ComputationConfig.Fetcher configFetcher; WindmillServerStub windmillServer; ComputationStateCache computationStateCache; - GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(createStubFactory(options)); GrpcWindmillStreamFactory windmillStreamFactory; if (options.isEnableStreamingEngine()) { + GrpcDispatcherClient dispatcherClient = + GrpcDispatcherClient.create(createStubFactory(options)); configFetcher = StreamingEngineComputationConfigFetcher.create( options.getGlobalConfigRefreshPeriod().getMillis(), @@ -469,7 +466,10 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build(); windmillServer = - GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); + GrpcWindmillServer.create( + options, + windmillStreamFactory, + GrpcDispatcherClient.create(createStubFactory(options))); } else { windmillStreamFactory = windmillStreamFactoryBuilder.build(); windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport()); @@ -704,10 +704,6 @@ private static ChannelCachingStubFactory createStubFactory( return ChannelCachingRemoteStubFactory.create(workerOptions.getGcpCredential(), channelCache); } - private static void sleep(int millis) { - Uninterruptibles.sleepUninterruptibly(millis, TimeUnit.MILLISECONDS); - } - private static int chooseMaxThreads(DataflowWorkerHarnessOptions options) { if (options.getNumberOfWorkerHarnessThreads() != 0) { return options.getNumberOfWorkerHarnessThreads(); @@ -736,7 +732,7 @@ private static void enableBigQueryMetrics() { } @VisibleForTesting - final void reportPeriodicWorkerUpdatesForTest() { + void reportPeriodicWorkerUpdatesForTest() { workerStatusReporter.reportPeriodicWorkerUpdates(); } @@ -765,6 +761,11 @@ int numCommitThreads() { return workCommitter.parallelism(); } + @VisibleForTesting + CacheStats getStateCacheStats() { + return stateCache.getCacheStats(); + } + @VisibleForTesting ComputationStateCache getComputationStateCache() { return computationStateCache; @@ -773,14 +774,10 @@ ComputationStateCache getComputationStateCache() { @SuppressWarnings("FutureReturnValueIgnored") public void start() { running.set(true); - configFetcher.start(); - - memoryMonitorThread.start(); - dispatchThread.start(); + memoryMonitor.start(); + streamingWorkerHarness.start(); sampler.start(); - - workCommitter.start(); workerStatusReporter.start(); activeWorkRefresher.start(); } @@ -794,121 +791,19 @@ private void startStatusPages() { void stop() { try { configFetcher.stop(); - activeWorkRefresher.stop(); statusPages.stop(); running.set(false); - dispatchThread.interrupt(); - dispatchThread.join(); - - workCommitter.stop(); - memoryMonitor.stop(); - memoryMonitorThread.join(); + streamingWorkerHarness.shutdown(); + memoryMonitor.shutdown(); workUnitExecutor.shutdown(); - computationStateCache.closeAndInvalidateAll(); - workerStatusReporter.stop(); } catch (Exception e) { LOG.warn("Exception while shutting down: ", e); } } - private void dispatchLoop() { - while (running.get()) { - memoryMonitor.waitForResources("GetWork"); - - int backoff = 1; - Windmill.GetWorkResponse workResponse = null; - do { - try { - workResponse = getWork(); - if (workResponse.getWorkCount() > 0) { - break; - } - } catch (WindmillServerStub.RpcException e) { - LOG.warn("GetWork failed, retrying:", e); - } - sleep(backoff); - backoff = Math.min(1000, backoff * 2); - } while (running.get()); - for (final Windmill.ComputationWorkItems computationWork : workResponse.getWorkList()) { - final String computationId = computationWork.getComputationId(); - Optional maybeComputationState = computationStateCache.get(computationId); - if (!maybeComputationState.isPresent()) { - continue; - } - - final ComputationState computationState = maybeComputationState.get(); - final Instant inputDataWatermark = - WindmillTimeUtils.windmillToHarnessWatermark(computationWork.getInputDataWatermark()); - Watermarks.Builder watermarks = - Watermarks.builder() - .setInputDataWatermark(Preconditions.checkNotNull(inputDataWatermark)) - .setSynchronizedProcessingTime( - WindmillTimeUtils.windmillToHarnessWatermark( - computationWork.getDependentRealtimeInputWatermark())); - - for (final Windmill.WorkItem workItem : computationWork.getWorkList()) { - streamingWorkScheduler.scheduleWork( - computationState, - workItem, - watermarks.setOutputDataWatermark(workItem.getOutputDataWatermark()).build(), - Work.createProcessingContext( - computationId, getDataClient, workCommitter::commit, heartbeatSender), - /* getWorkStreamLatencies= */ Collections.emptyList()); - } - } - } - } - - void streamingDispatchLoop() { - while (running.get()) { - GetWorkStream stream = - windmillServer.getWorkStream( - Windmill.GetWorkRequest.newBuilder() - .setClientId(clientId) - .setMaxItems(chooseMaximumBundlesOutstanding()) - .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) - .build(), - (String computation, - Instant inputDataWatermark, - Instant synchronizedProcessingTime, - Windmill.WorkItem workItem, - Collection getWorkStreamLatencies) -> - computationStateCache - .get(computation) - .ifPresent( - computationState -> { - memoryMonitor.waitForResources("GetWork"); - streamingWorkScheduler.scheduleWork( - computationState, - workItem, - Watermarks.builder() - .setInputDataWatermark(inputDataWatermark) - .setSynchronizedProcessingTime(synchronizedProcessingTime) - .setOutputDataWatermark(workItem.getOutputDataWatermark()) - .build(), - Work.createProcessingContext( - computationState.getComputationId(), - getDataClient, - workCommitter::commit, - heartbeatSender), - getWorkStreamLatencies); - })); - try { - // Reconnect every now and again to enable better load balancing. - // If at any point the server closes the stream, we will reconnect immediately; otherwise - // we half-close the stream after some time and create a new one. - if (!stream.awaitTermination(GET_WORK_STREAM_TIMEOUT_MINUTES, TimeUnit.MINUTES)) { - stream.halfClose(); - } - } catch (InterruptedException e) { - // Continue processing until !running.get() - } - } - } - private void onCompleteCommit(CompleteCommit completeCommit) { if (completeCommit.status() != Windmill.CommitStatus.OK) { readerCache.invalidateReader( @@ -927,15 +822,6 @@ private void onCompleteCommit(CompleteCommit completeCommit) { completeCommit.shardedKey(), completeCommit.workId())); } - private Windmill.GetWorkResponse getWork() { - return windmillServer.getWork( - Windmill.GetWorkRequest.newBuilder() - .setClientId(clientId) - .setMaxItems(chooseMaximumBundlesOutstanding()) - .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) - .build()); - } - @VisibleForTesting public Iterable buildCounters() { return Iterables.concat( @@ -967,4 +853,34 @@ private static ConfigFetcherComputationStateCacheAndWindmillClient create( abstract GrpcWindmillStreamFactory windmillStreamFactory(); } + + /** + * Monitors memory pressure on a background executor. May be used to throttle calls, blocking if + * there is memory pressure. + */ + @AutoValue + abstract static class BackgroundMemoryMonitor { + private static BackgroundMemoryMonitor create(MemoryMonitor memoryMonitor) { + return new AutoValue_StreamingDataflowWorker_BackgroundMemoryMonitor( + memoryMonitor, + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("MemoryMonitor") + .setPriority(Thread.MIN_PRIORITY) + .build())); + } + + abstract MemoryMonitor memoryMonitor(); + + abstract ExecutorService executor(); + + private void start() { + executor().execute(memoryMonitor()); + } + + private void shutdown() { + memoryMonitor().stop(); + executor().shutdown(); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java similarity index 91% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java index b9573ff94cc9a..3556b7ce29198 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; +package org.apache.beam.runners.dataflow.worker.streaming.harness; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet; @@ -47,6 +47,8 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.StreamGetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.ThrottlingGetDataMetricTracker; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler; @@ -68,16 +70,19 @@ import org.slf4j.LoggerFactory; /** - * Client for StreamingEngine. Given a {@link GetWorkBudget}, divides the budget and starts the - * {@link WindmillStream.GetWorkStream}(s). + * {@link StreamingWorkerHarness} implementation that manages fan out to multiple backend + * destinations. Given a {@link GetWorkBudget}, divides the budget and starts the {@link + * WindmillStream.GetWorkStream}(s). */ @Internal @CheckReturnValue @ThreadSafe -public final class StreamingEngineClient { - private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineClient.class); +public final class FanOutStreamingEngineWorkerHarness implements StreamingWorkerHarness { + private static final Logger LOG = + LoggerFactory.getLogger(FanOutStreamingEngineWorkerHarness.class); private static final String PUBLISH_NEW_WORKER_METADATA_THREAD = "PublishNewWorkerMetadataThread"; private static final String CONSUME_NEW_WORKER_METADATA_THREAD = "ConsumeNewWorkerMetadataThread"; + private final JobHeader jobHeader; private final GrpcWindmillStreamFactory streamFactory; private final WorkItemScheduler workItemScheduler; @@ -101,7 +106,7 @@ public final class StreamingEngineClient { private volatile boolean started; @SuppressWarnings("FutureReturnValueIgnored") - private StreamingEngineClient( + private FanOutStreamingEngineWorkerHarness( JobHeader jobHeader, GetWorkBudget totalGetWorkBudget, GrpcWindmillStreamFactory streamFactory, @@ -152,23 +157,15 @@ private StreamingEngineClient( private static ExecutorService singleThreadedExecutorServiceOf(String threadName) { return Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setNameFormat(threadName) - .setUncaughtExceptionHandler( - (t, e) -> { - LOG.error( - "{} failed due to uncaught exception during execution. ", t.getName(), e); - throw new StreamingEngineClientException(e); - }) - .build()); + new ThreadFactoryBuilder().setNameFormat(threadName).build()); } /** - * Creates an instance of {@link StreamingEngineClient} in a non-started state. + * Creates an instance of {@link FanOutStreamingEngineWorkerHarness} in a non-started state. * * @implNote Does not block the calling thread. Callers must explicitly call {@link #start()}. */ - public static StreamingEngineClient create( + public static FanOutStreamingEngineWorkerHarness create( JobHeader jobHeader, GetWorkBudget totalGetWorkBudget, GrpcWindmillStreamFactory streamingEngineStreamFactory, @@ -178,7 +175,7 @@ public static StreamingEngineClient create( GrpcDispatcherClient dispatcherClient, Function workCommitterFactory, ThrottlingGetDataMetricTracker getDataMetricTracker) { - return new StreamingEngineClient( + return new FanOutStreamingEngineWorkerHarness( jobHeader, totalGetWorkBudget, streamingEngineStreamFactory, @@ -192,7 +189,7 @@ public static StreamingEngineClient create( } @VisibleForTesting - static StreamingEngineClient forTesting( + static FanOutStreamingEngineWorkerHarness forTesting( JobHeader jobHeader, GetWorkBudget totalGetWorkBudget, GrpcWindmillStreamFactory streamFactory, @@ -203,8 +200,8 @@ static StreamingEngineClient forTesting( long clientId, Function workCommitterFactory, ThrottlingGetDataMetricTracker getDataMetricTracker) { - StreamingEngineClient streamingEngineClient = - new StreamingEngineClient( + FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkProvider = + new FanOutStreamingEngineWorkerHarness( jobHeader, totalGetWorkBudget, streamFactory, @@ -215,11 +212,12 @@ static StreamingEngineClient forTesting( clientId, workCommitterFactory, getDataMetricTracker); - streamingEngineClient.start(); - return streamingEngineClient; + fanOutStreamingEngineWorkProvider.start(); + return fanOutStreamingEngineWorkProvider; } @SuppressWarnings("ReturnValueIgnored") + @Override public synchronized void start() { Preconditions.checkState(!started, "StreamingEngineClient cannot start twice."); // Starts the stream, this value is memoized. @@ -270,7 +268,8 @@ private void startWorkerMetadataConsumer() { } @VisibleForTesting - public synchronized void finish() { + @Override + public synchronized void shutdown() { Preconditions.checkState(started, "StreamingEngineClient never started."); getWorkerMetadataStream.get().halfClose(); getWorkBudgetRefresher.stop(); @@ -334,10 +333,13 @@ private synchronized ImmutableMap createNewWindmil .collect( toImmutableMap( Function.identity(), - // Reuse existing stubs if they exist. endpoint -> - currentConnections.getOrDefault( - endpoint, WindmillConnection.from(endpoint, this::createWindmillStub)))); + // Reuse existing stubs if they exist. Optional.orElseGet only calls the + // supplier if the value is not present, preventing constructing expensive + // objects. + Optional.ofNullable(currentConnections.get(endpoint)) + .orElseGet( + () -> WindmillConnection.from(endpoint, this::createWindmillStub)))); } private synchronized ImmutableMap @@ -423,11 +425,4 @@ private CloudWindmillServiceV1Alpha1Stub createWindmillStub(Endpoint endpoint) { .map(channelCachingStubFactory::createWindmillServiceStub) .orElseGet(dispatcherClient::getWindmillServiceStub); } - - private static class StreamingEngineClientException extends IllegalStateException { - - private StreamingEngineClientException(Throwable exception) { - super(exception); - } - } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java new file mode 100644 index 0000000000000..bc93e6d89c415 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.harness; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +import com.google.auto.value.AutoBuilder; +import com.google.auto.value.AutoOneOf; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; +import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.RpcException; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; +import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient; +import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link StreamingWorkerHarness} implementations that fetch {@link + * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) from a single source. + */ +@Internal +public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { + private static final Logger LOG = LoggerFactory.getLogger(SingleSourceWorkerHarness.class); + private static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3; + + private final AtomicBoolean isRunning; + private final WorkCommitter workCommitter; + private final GetDataClient getDataClient; + private final HeartbeatSender heartbeatSender; + private final StreamingWorkScheduler streamingWorkScheduler; + private final Runnable waitForResources; + private final Function> computationStateFetcher; + private final ExecutorService workProviderExecutor; + private final GetWorkSender getWorkSender; + + SingleSourceWorkerHarness( + WorkCommitter workCommitter, + GetDataClient getDataClient, + HeartbeatSender heartbeatSender, + StreamingWorkScheduler streamingWorkScheduler, + Runnable waitForResources, + Function> computationStateFetcher, + GetWorkSender getWorkSender) { + this.workCommitter = workCommitter; + this.getDataClient = getDataClient; + this.heartbeatSender = heartbeatSender; + this.streamingWorkScheduler = streamingWorkScheduler; + this.waitForResources = waitForResources; + this.computationStateFetcher = computationStateFetcher; + this.workProviderExecutor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.MIN_PRIORITY) + .setNameFormat("DispatchThread") + .build()); + this.isRunning = new AtomicBoolean(false); + this.getWorkSender = getWorkSender; + } + + public static SingleSourceWorkerHarness.Builder builder() { + return new AutoBuilder_SingleSourceWorkerHarness_Builder(); + } + + @Override + public void start() { + Preconditions.checkState( + isRunning.compareAndSet(false, true), + "Multiple calls to {}.start() are not allowed.", + getClass()); + workCommitter.start(); + workProviderExecutor.execute( + () -> { + getDispatchLoop().run(); + LOG.info("Dispatch done"); + }); + } + + private Runnable getDispatchLoop() { + switch (getWorkSender.getKind()) { + case APPLIANCE: + LOG.info("Starting Dispatch in Appliance mode."); + return () -> applianceDispatchLoop(getWorkSender.appliance()); + case STREAMING_ENGINE: + LOG.info("Starting Dispatch in Streaming Engine mode."); + return () -> streamingEngineDispatchLoop(getWorkSender.streamingEngine()); + default: + // Will never happen switch is exhaustive. + throw new IllegalStateException("Invalid GetWorkSender.Kind: " + getWorkSender.getKind()); + } + } + + @Override + public void shutdown() { + Preconditions.checkState( + isRunning.compareAndSet(true, false), + "Multiple calls to {}.shutdown() are not allowed.", + getClass()); + workProviderExecutor.shutdown(); + boolean isTerminated = false; + try { + isTerminated = workProviderExecutor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Unable to shutdown {}", getClass()); + } + + if (!isTerminated) { + workProviderExecutor.shutdownNow(); + } + workCommitter.stop(); + } + + private void streamingEngineDispatchLoop( + Function getWorkStreamFactory) { + while (isRunning.get()) { + WindmillStream.GetWorkStream stream = + getWorkStreamFactory.apply( + (computationId, + inputDataWatermark, + synchronizedProcessingTime, + workItem, + getWorkStreamLatencies) -> + computationStateFetcher + .apply(computationId) + .ifPresent( + computationState -> { + waitForResources.run(); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + Watermarks.builder() + .setInputDataWatermark( + Preconditions.checkNotNull(inputDataWatermark)) + .setSynchronizedProcessingTime(synchronizedProcessingTime) + .setOutputDataWatermark(workItem.getOutputDataWatermark()) + .build(), + Work.createProcessingContext( + computationId, + getDataClient, + workCommitter::commit, + heartbeatSender), + getWorkStreamLatencies); + })); + try { + // Reconnect every now and again to enable better load balancing. + // If at any point the server closes the stream, we will reconnect immediately; otherwise + // we half-close the stream after some time and create a new one. + if (!stream.awaitTermination(GET_WORK_STREAM_TIMEOUT_MINUTES, TimeUnit.MINUTES)) { + stream.halfClose(); + } + } catch (InterruptedException e) { + // Continue processing until !running.get() + } + } + } + + private void applianceDispatchLoop(Supplier getWorkFn) { + while (isRunning.get()) { + waitForResources.run(); + int backoff = 1; + Windmill.GetWorkResponse workResponse = null; + do { + try { + workResponse = getWorkFn.get(); + if (workResponse.getWorkCount() > 0) { + break; + } + } catch (RpcException e) { + LOG.warn("GetWork failed, retrying:", e); + } + sleepUninterruptibly(backoff, TimeUnit.MILLISECONDS); + backoff = Math.min(1000, backoff * 2); + } while (isRunning.get()); + for (Windmill.ComputationWorkItems computationWork : + Preconditions.checkNotNull(workResponse).getWorkList()) { + String computationId = computationWork.getComputationId(); + Optional maybeComputationState = + computationStateFetcher.apply(computationId); + if (!maybeComputationState.isPresent()) { + continue; + } + + ComputationState computationState = maybeComputationState.get(); + Instant inputDataWatermark = + WindmillTimeUtils.windmillToHarnessWatermark(computationWork.getInputDataWatermark()); + Watermarks.Builder watermarks = + Watermarks.builder() + .setInputDataWatermark(Preconditions.checkNotNull(inputDataWatermark)) + .setSynchronizedProcessingTime( + WindmillTimeUtils.windmillToHarnessWatermark( + computationWork.getDependentRealtimeInputWatermark())); + + for (Windmill.WorkItem workItem : computationWork.getWorkList()) { + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + watermarks.setOutputDataWatermark(workItem.getOutputDataWatermark()).build(), + Work.createProcessingContext( + computationId, getDataClient, workCommitter::commit, heartbeatSender), + /* getWorkStreamLatencies= */ Collections.emptyList()); + } + } + } + } + + @AutoBuilder + public interface Builder { + Builder setWorkCommitter(WorkCommitter workCommitter); + + Builder setGetDataClient(GetDataClient getDataClient); + + Builder setHeartbeatSender(HeartbeatSender heartbeatSender); + + Builder setStreamingWorkScheduler(StreamingWorkScheduler streamingWorkScheduler); + + Builder setWaitForResources(Runnable waitForResources); + + Builder setComputationStateFetcher( + Function> computationStateFetcher); + + Builder setGetWorkSender(GetWorkSender getWorkSender); + + SingleSourceWorkerHarness build(); + } + + @AutoOneOf(GetWorkSender.Kind.class) + public abstract static class GetWorkSender { + + public static GetWorkSender forStreamingEngine( + Function getWorkStreamFactory) { + return AutoOneOf_SingleSourceWorkerHarness_GetWorkSender.streamingEngine( + getWorkStreamFactory); + } + + public static GetWorkSender forAppliance(Supplier getWorkFn) { + return AutoOneOf_SingleSourceWorkerHarness_GetWorkSender.appliance(getWorkFn); + } + + abstract Function streamingEngine(); + + abstract Supplier appliance(); + + abstract Kind getKind(); + + enum Kind { + STREAMING_ENGINE, + APPLIANCE + } + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineConnectionState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingEngineConnectionState.java similarity index 97% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineConnectionState.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingEngineConnectionState.java index 8d784456d655b..3c85ee6abe1f5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineConnectionState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingEngineConnectionState.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; +package org.apache.beam.runners.dataflow.worker.streaming.harness; import com.google.auto.value.AutoValue; import java.util.function.Supplier; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java new file mode 100644 index 0000000000000..c1b4570e22600 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.harness; + +import org.apache.beam.sdk.annotations.Internal; + +/** Provides an interface to start streaming worker processing. */ +@Internal +public interface StreamingWorkerHarness { + void start(); + + void shutdown(); +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java similarity index 93% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java index 7d09726e4b28a..45aa403ee71b4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; +package org.apache.beam.runners.dataflow.worker.streaming.harness; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -29,9 +29,11 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetSpender; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.FixedStreamHeartbeatSender; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; @@ -57,7 +59,7 @@ */ @Internal @ThreadSafe -public class WindmillStreamSender { +final class WindmillStreamSender implements GetWorkBudgetSpender { private final AtomicBoolean started; private final AtomicReference getWorkBudget; private final Supplier getWorkStream; @@ -107,7 +109,7 @@ private WindmillStreamSender( workItemScheduler)); } - public static WindmillStreamSender create( + static WindmillStreamSender create( WindmillConnection connection, GetWorkRequest getWorkRequest, GetWorkBudget getWorkBudget, @@ -151,6 +153,7 @@ void closeAllStreams() { } } + @Override public void adjustBudget(long itemsDelta, long bytesDelta) { getWorkBudget.set(getWorkBudget.get().apply(itemsDelta, bytesDelta)); if (started.get()) { @@ -158,19 +161,16 @@ public void adjustBudget(long itemsDelta, long bytesDelta) { } } - public void adjustBudget(GetWorkBudget adjustment) { - adjustBudget(adjustment.items(), adjustment.bytes()); - } - - public GetWorkBudget remainingGetWorkBudget() { + @Override + public GetWorkBudget remainingBudget() { return started.get() ? getWorkStream.get().remainingBudget() : getWorkBudget.get(); } - public long getAndResetThrottleTime() { + long getAndResetThrottleTime() { return streamingEngineThrottleTimers.getAndResetThrottleTime(); } - public long getCurrentActiveCommitBytes() { + long getCurrentActiveCommitBytes() { return started.get() ? workCommitter.get().currentActiveCommitBytes() : 0; } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java index e0f823d79ade5..adfb380d21647 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServlet.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.servlet.ServletException; @@ -31,7 +32,6 @@ import org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions; import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet; import org.apache.beam.runners.dataflow.worker.status.DebugCapture; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.channelz.v1.*; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.protobuf.services.ChannelzService; @@ -47,16 +47,16 @@ public class ChannelzServlet extends BaseStatusServlet implements DebugCapture.C private static final int MAX_TOP_CHANNELS_TO_RETURN = 500; private final ChannelzService channelzService; - private final WindmillServerStub windmillServerStub; + private final Supplier> currentWindmillEndpoints; private final boolean showOnlyWindmillServiceChannels; public ChannelzServlet( String path, DataflowStreamingPipelineOptions options, - WindmillServerStub windmillServerStub) { + Supplier> currentWindmillEndpoints) { super(path); channelzService = ChannelzService.newInstance(MAX_TOP_CHANNELS_TO_RETURN); - this.windmillServerStub = windmillServerStub; + this.currentWindmillEndpoints = currentWindmillEndpoints; showOnlyWindmillServiceChannels = options.getChannelzShowOnlyWindmillServiceChannels(); } @@ -81,14 +81,6 @@ public void captureData(PrintWriter writer) { writer.println(""); } - // channelz proto says there won't be cycles in the ref graph. - // we track visited ids to be defensive and prevent any accidental cycles. - private static class VisitedSets { - - Set channels = new HashSet<>(); - Set subchannels = new HashSet<>(); - } - private void appendTopChannels(PrintWriter writer) { SettableFuture future = SettableFuture.create(); // IDEA: If there are more than MAX_TOP_CHANNELS_TO_RETURN top channels @@ -127,8 +119,7 @@ private void appendTopChannels(PrintWriter writer) { } private List filterWindmillChannels(List channels) { - ImmutableSet windmillServiceEndpoints = - windmillServerStub.getWindmillServiceEndpoints(); + ImmutableSet windmillServiceEndpoints = currentWindmillEndpoints.get(); Set windmillServiceHosts = windmillServiceEndpoints.stream().map(HostAndPort::getHost).collect(Collectors.toSet()); List windmillChannels = new ArrayList<>(); @@ -291,4 +282,12 @@ public void onCompleted() { } }; } + + // channelz proto says there won't be cycles in the ref graph. + // we track visited ids to be defensive and prevent any accidental cycles. + private static class VisitedSets { + + Set channels = new HashSet<>(); + Set subchannels = new HashSet<>(); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java new file mode 100644 index 0000000000000..9f30f75919f97 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link ByteString} buffer of {@link + * org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkResponseChunk}(s). + * + *

Once all serialized chunks of an {@link WorkItem} have been received flushes (deserializes) + * the chunk of bytes and metadata into an {@link AssembledWorkItem}. + * + * @implNote This class is not thread safe, and provides no synchronization underneath. + */ +@NotThreadSafe +final class GetWorkResponseChunkAssembler { + private static final Logger LOG = LoggerFactory.getLogger(GetWorkResponseChunkAssembler.class); + + private final GetWorkTimingInfosTracker workTimingInfosTracker; + private @Nullable ComputationMetadata metadata; + private ByteString data; + private long bufferedSize; + + GetWorkResponseChunkAssembler() { + workTimingInfosTracker = new GetWorkTimingInfosTracker(System::currentTimeMillis); + data = ByteString.EMPTY; + bufferedSize = 0; + metadata = null; + } + + /** + * Appends the response chunk bytes to the {@link #data }byte buffer. Return the assembled + * WorkItem if all response chunks for a WorkItem have been received. + */ + Optional append(Windmill.StreamingGetWorkResponseChunk chunk) { + if (chunk.hasComputationMetadata()) { + metadata = ComputationMetadata.fromProto(chunk.getComputationMetadata()); + } + + data = data.concat(chunk.getSerializedWorkItem()); + bufferedSize += chunk.getSerializedWorkItem().size(); + workTimingInfosTracker.addTimingInfo(chunk.getPerWorkItemTimingInfosList()); + + // If the entire WorkItem has been received, assemble the WorkItem. + return chunk.getRemainingBytesForWorkItem() == 0 ? flushToWorkItem() : Optional.empty(); + } + + /** + * Attempt to flush the {@link #data} bytes into a {@link WorkItem} w/ it's metadata. Resets the + * data byte string and tracking metadata afterwards, whether the {@link WorkItem} deserialization + * was successful or not. + */ + private Optional flushToWorkItem() { + try { + return Optional.of( + AssembledWorkItem.create( + WorkItem.parseFrom(data.newInput()), + Preconditions.checkNotNull(metadata), + workTimingInfosTracker.getLatencyAttributions(), + bufferedSize)); + } catch (IOException e) { + LOG.error("Failed to parse work item from stream: ", e); + } finally { + workTimingInfosTracker.reset(); + data = ByteString.EMPTY; + bufferedSize = 0; + } + + return Optional.empty(); + } + + @AutoValue + abstract static class ComputationMetadata { + private static ComputationMetadata fromProto( + Windmill.ComputationWorkItemMetadata metadataProto) { + return new AutoValue_GetWorkResponseChunkAssembler_ComputationMetadata( + metadataProto.getComputationId(), + WindmillTimeUtils.windmillToHarnessWatermark(metadataProto.getInputDataWatermark()), + WindmillTimeUtils.windmillToHarnessWatermark( + metadataProto.getDependentRealtimeInputWatermark())); + } + + abstract String computationId(); + + abstract Instant inputDataWatermark(); + + abstract Instant synchronizedProcessingTime(); + } + + @AutoValue + abstract static class AssembledWorkItem { + + private static AssembledWorkItem create( + WorkItem workItem, + ComputationMetadata computationMetadata, + List latencyAttributions, + long size) { + return new AutoValue_GetWorkResponseChunkAssembler_AssembledWorkItem( + workItem, computationMetadata, latencyAttributions, size); + } + + abstract WorkItem workItem(); + + abstract ComputationMetadata computationMetadata(); + + abstract List latencyAttributions(); + + abstract long bufferedSize(); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java index 58f72610e2d35..45d010d7cfac5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; -import com.google.auto.value.AutoValue; -import java.io.IOException; import java.io.PrintWriter; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -26,12 +24,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; -import javax.annotation.Nullable; -import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationWorkItemMetadata; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkResponseChunk; @@ -40,6 +35,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GetWorkResponseChunkAssembler.AssembledWorkItem; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler; @@ -47,13 +43,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.util.BackOff; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Implementation of {@link GetWorkStream} that passes along a specific {@link @@ -66,7 +58,6 @@ public final class GrpcDirectGetWorkStream extends AbstractWindmillStream implements GetWorkStream { - private static final Logger LOG = LoggerFactory.getLogger(GrpcDirectGetWorkStream.class); private static final StreamingGetWorkRequest HEALTH_CHECK_REQUEST = StreamingGetWorkRequest.newBuilder() .setRequestExtension( @@ -90,8 +81,10 @@ public final class GrpcDirectGetWorkStream * Map of stream IDs to their buffers. Used to aggregate streaming gRPC response chunks as they * come in. Once all chunks for a response has been received, the chunk is processed and the * buffer is cleared. + * + * @implNote Buffers are not persisted across stream restarts. */ - private final ConcurrentMap workItemBuffers; + private final ConcurrentMap workItemAssemblers; private GrpcDirectGetWorkStream( String backendWorkerToken, @@ -120,7 +113,7 @@ private GrpcDirectGetWorkStream( this.request = request; this.getWorkThrottleTimer = getWorkThrottleTimer; this.workItemScheduler = workItemScheduler; - this.workItemBuffers = new ConcurrentHashMap<>(); + this.workItemAssemblers = new ConcurrentHashMap<>(); this.heartbeatSender = Suppliers.memoize(heartbeatSender::get); this.workCommitter = Suppliers.memoize(workCommitter::get); this.getDataClient = Suppliers.memoize(getDataClient::get); @@ -163,7 +156,8 @@ public static GrpcDirectGetWorkStream create( return getWorkStream; } - private static Watermarks createWatermarks(WorkItem workItem, ComputationMetadata metadata) { + private static Watermarks createWatermarks( + WorkItem workItem, GetWorkResponseChunkAssembler.ComputationMetadata metadata) { return Watermarks.builder() .setInputDataWatermark(metadata.inputDataWatermark()) .setOutputDataWatermark(workItem.getOutputDataWatermark()) @@ -171,14 +165,8 @@ private static Watermarks createWatermarks(WorkItem workItem, ComputationMetadat .build(); } - private synchronized GetWorkBudget getThenResetBudgetAdjustment() { - return nextBudgetAdjustment.getAndUpdate(unused -> GetWorkBudget.noBudget()); - } - - private void sendRequestExtension() { - // Just sent the request extension, reset the nextBudgetAdjustment. This will be set when - // adjustBudget is called. - GetWorkBudget adjustment = getThenResetBudgetAdjustment(); + private void sendRequestExtension(GetWorkBudget adjustment) { + inFlightBudget.getAndUpdate(budget -> budget.apply(adjustment)); StreamingGetWorkRequest extension = StreamingGetWorkRequest.newBuilder() .setRequestExtension( @@ -200,7 +188,7 @@ private void sendRequestExtension() { @Override protected synchronized void onNewStream() { - workItemBuffers.clear(); + workItemAssemblers.clear(); // Add the current in-flight budget to the next adjustment. Only positive values are allowed // here // with negatives defaulting to 0, since GetWorkBudgets cannot be created with negative values. @@ -229,7 +217,7 @@ public void appendSpecificHtml(PrintWriter writer) { // Number of buffers is same as distinct workers that sent work on this stream. writer.format( "GetWorkStream: %d buffers, %s inflight budget allowed.", - workItemBuffers.size(), inFlightBudget.get()); + workItemAssemblers.size(), inFlightBudget.get()); } @Override @@ -240,27 +228,49 @@ public void sendHealthCheck() { @Override protected void onResponse(StreamingGetWorkResponseChunk chunk) { getWorkThrottleTimer.stop(); - WorkItemBuffer workItemBuffer = - workItemBuffers.computeIfAbsent(chunk.getStreamId(), unused -> new WorkItemBuffer()); - workItemBuffer.append(chunk); + workItemAssemblers + .computeIfAbsent(chunk.getStreamId(), unused -> new GetWorkResponseChunkAssembler()) + .append(chunk) + .ifPresent(this::consumeAssembledWorkItem); + } - // The entire WorkItem has been received, it is ready to be processed. - if (chunk.getRemainingBytesForWorkItem() == 0) { - workItemBuffer.runAndReset(); - // Record the fact that there are now fewer outstanding messages and bytes on the stream. - inFlightBudget.updateAndGet(budget -> budget.subtract(1, workItemBuffer.bufferedSize())); + private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) { + // Record the fact that there are now fewer outstanding messages and bytes on the stream. + inFlightBudget.updateAndGet(budget -> budget.subtract(1, assembledWorkItem.bufferedSize())); + WorkItem workItem = assembledWorkItem.workItem(); + GetWorkResponseChunkAssembler.ComputationMetadata metadata = + assembledWorkItem.computationMetadata(); + pendingResponseBudget.getAndUpdate(budget -> budget.apply(1, workItem.getSerializedSize())); + try { + workItemScheduler.scheduleWork( + workItem, + createWatermarks(workItem, Preconditions.checkNotNull(metadata)), + createProcessingContext(Preconditions.checkNotNull(metadata.computationId())), + assembledWorkItem.latencyAttributions()); + } finally { + pendingResponseBudget.getAndUpdate(budget -> budget.apply(-1, -workItem.getSerializedSize())); } } + private Work.ProcessingContext createProcessingContext(String computationId) { + return Work.createProcessingContext( + computationId, getDataClient.get(), workCommitter.get()::commit, heartbeatSender.get()); + } + @Override protected void startThrottleTimer() { getWorkThrottleTimer.start(); } @Override - public synchronized void adjustBudget(long itemsDelta, long bytesDelta) { - nextBudgetAdjustment.set(nextBudgetAdjustment.get().apply(itemsDelta, bytesDelta)); - sendRequestExtension(); + public void adjustBudget(long itemsDelta, long bytesDelta) { + GetWorkBudget adjustment = + nextBudgetAdjustment + // Get the current value, and reset the nextBudgetAdjustment. This will be set again + // when adjustBudget is called. + .getAndUpdate(unused -> GetWorkBudget.noBudget()) + .apply(itemsDelta, bytesDelta); + sendRequestExtension(adjustment); } @Override @@ -274,74 +284,4 @@ public GetWorkBudget remainingBudget() { .apply(currentNextBudgetAdjustment) .apply(currentInflightBudget); } - - private synchronized void updatePendingResponseBudget(long itemsDelta, long bytesDelta) { - pendingResponseBudget.set(pendingResponseBudget.get().apply(itemsDelta, bytesDelta)); - } - - @AutoValue - abstract static class ComputationMetadata { - private static ComputationMetadata fromProto(ComputationWorkItemMetadata metadataProto) { - return new AutoValue_GrpcDirectGetWorkStream_ComputationMetadata( - metadataProto.getComputationId(), - WindmillTimeUtils.windmillToHarnessWatermark(metadataProto.getInputDataWatermark()), - WindmillTimeUtils.windmillToHarnessWatermark( - metadataProto.getDependentRealtimeInputWatermark())); - } - - abstract String computationId(); - - abstract Instant inputDataWatermark(); - - abstract Instant synchronizedProcessingTime(); - } - - private class WorkItemBuffer { - private final GetWorkTimingInfosTracker workTimingInfosTracker; - private ByteString data; - private @Nullable ComputationMetadata metadata; - - private WorkItemBuffer() { - workTimingInfosTracker = new GetWorkTimingInfosTracker(System::currentTimeMillis); - data = ByteString.EMPTY; - this.metadata = null; - } - - private void append(StreamingGetWorkResponseChunk chunk) { - if (chunk.hasComputationMetadata()) { - this.metadata = ComputationMetadata.fromProto(chunk.getComputationMetadata()); - } - - this.data = data.concat(chunk.getSerializedWorkItem()); - workTimingInfosTracker.addTimingInfo(chunk.getPerWorkItemTimingInfosList()); - } - - private long bufferedSize() { - return data.size(); - } - - private void runAndReset() { - try { - WorkItem workItem = WorkItem.parseFrom(data.newInput()); - updatePendingResponseBudget(1, workItem.getSerializedSize()); - workItemScheduler.scheduleWork( - workItem, - createWatermarks(workItem, Preconditions.checkNotNull(metadata)), - createProcessingContext(Preconditions.checkNotNull(metadata.computationId())), - // After the work item is successfully queued or dropped by ActiveWorkState, remove it - // from the pendingResponseBudget. - queuedWorkItem -> updatePendingResponseBudget(-1, -workItem.getSerializedSize()), - workTimingInfosTracker.getLatencyAttributions()); - } catch (IOException e) { - LOG.error("Failed to parse work item from stream: ", e); - } - workTimingInfosTracker.reset(); - data = ByteString.EMPTY; - } - - private Work.ProcessingContext createProcessingContext(String computationId) { - return Work.createProcessingContext( - computationId, getDataClient.get(), workCommitter.get()::commit, heartbeatSender.get()); - } - } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java index 033990017b24c..cf2e7260592db 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -75,7 +75,7 @@ public static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactor } @VisibleForTesting - static GrpcDispatcherClient forTesting( + public static GrpcDispatcherClient forTesting( WindmillStubFactory windmillGrpcStubFactory, List windmillServiceStubs, List windmillMetadataServiceStubs, @@ -106,7 +106,7 @@ ImmutableSet getDispatcherEndpoints() { } /** Will block the calling thread until the initial endpoints are present. */ - CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStubBlocking() { + public CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStubBlocking() { boolean initialized = false; long secondsWaited = 0; while (!initialized) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java index 4b392e9190ed2..09ecbf3f30516 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java @@ -17,45 +17,34 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; -import java.io.IOException; import java.io.PrintWriter; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import javax.annotation.Nullable; -import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkRequestExtension; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkResponseChunk; import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GetWorkResponseChunkAssembler.AssembledWorkItem; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; import org.apache.beam.sdk.util.BackOff; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public final class GrpcGetWorkStream +final class GrpcGetWorkStream extends AbstractWindmillStream implements GetWorkStream { - private static final Logger LOG = LoggerFactory.getLogger(GrpcGetWorkStream.class); - private final GetWorkRequest request; private final WorkItemReceiver receiver; private final ThrottleTimer getWorkThrottleTimer; - private final Map buffers; + private final Map workItemAssemblers; private final AtomicLong inflightMessages; private final AtomicLong inflightBytes; @@ -83,7 +72,7 @@ private GrpcGetWorkStream( this.request = request; this.getWorkThrottleTimer = getWorkThrottleTimer; this.receiver = receiver; - this.buffers = new ConcurrentHashMap<>(); + this.workItemAssemblers = new ConcurrentHashMap<>(); this.inflightMessages = new AtomicLong(); this.inflightBytes = new AtomicLong(); } @@ -138,7 +127,7 @@ private void sendRequestExtension(long moreItems, long moreBytes) { @Override protected synchronized void onNewStream() { - buffers.clear(); + workItemAssemblers.clear(); inflightMessages.set(request.getMaxItems()); inflightBytes.set(request.getMaxBytes()); send(StreamingGetWorkRequest.newBuilder().setRequest(request).build()); @@ -154,7 +143,7 @@ public void appendSpecificHtml(PrintWriter writer) { // Number of buffers is same as distinct workers that sent work on this stream. writer.format( "GetWorkStream: %d buffers, %d inflight messages allowed, %d inflight bytes allowed", - buffers.size(), inflightMessages.intValue(), inflightBytes.intValue()); + workItemAssemblers.size(), inflightMessages.intValue(), inflightBytes.intValue()); } @Override @@ -169,30 +158,33 @@ public void sendHealthCheck() { @Override protected void onResponse(StreamingGetWorkResponseChunk chunk) { getWorkThrottleTimer.stop(); + workItemAssemblers + .computeIfAbsent(chunk.getStreamId(), unused -> new GetWorkResponseChunkAssembler()) + .append(chunk) + .ifPresent(this::consumeAssembledWorkItem); + } - GrpcGetWorkStream.WorkItemBuffer buffer = - buffers.computeIfAbsent( - chunk.getStreamId(), unused -> new GrpcGetWorkStream.WorkItemBuffer()); - buffer.append(chunk); - - if (chunk.getRemainingBytesForWorkItem() == 0) { - long size = buffer.bufferedSize(); - buffer.runAndReset(); - - // Record the fact that there are now fewer outstanding messages and bytes on the stream. - long numInflight = inflightMessages.decrementAndGet(); - long bytesInflight = inflightBytes.addAndGet(-size); - - // If the outstanding items or bytes limit has gotten too low, top both off with a - // GetWorkExtension. The goal is to keep the limits relatively close to their maximum - // values without sending too many extension requests. - if (numInflight < request.getMaxItems() / 2 || bytesInflight < request.getMaxBytes() / 2) { - long moreItems = request.getMaxItems() - numInflight; - long moreBytes = request.getMaxBytes() - bytesInflight; - inflightMessages.getAndAdd(moreItems); - inflightBytes.getAndAdd(moreBytes); - sendRequestExtension(moreItems, moreBytes); - } + private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) { + receiver.receiveWork( + assembledWorkItem.computationMetadata().computationId(), + assembledWorkItem.computationMetadata().inputDataWatermark(), + assembledWorkItem.computationMetadata().synchronizedProcessingTime(), + assembledWorkItem.workItem(), + assembledWorkItem.latencyAttributions()); + + // Record the fact that there are now fewer outstanding messages and bytes on the stream. + long numInflight = inflightMessages.decrementAndGet(); + long bytesInflight = inflightBytes.addAndGet(-assembledWorkItem.bufferedSize()); + + // If the outstanding items or bytes limit has gotten too low, top both off with a + // GetWorkExtension. The goal is to keep the limits relatively close to their maximum + // values without sending too many extension requests. + if (numInflight < request.getMaxItems() / 2 || bytesInflight < request.getMaxBytes() / 2) { + long moreItems = request.getMaxItems() - numInflight; + long moreBytes = request.getMaxBytes() - bytesInflight; + inflightMessages.getAndAdd(moreItems); + inflightBytes.getAndAdd(moreBytes); + sendRequestExtension(moreItems, moreBytes); } } @@ -213,63 +205,4 @@ public GetWorkBudget remainingBudget() { .setItems(request.getMaxItems() - inflightMessages.get()) .build(); } - - private class WorkItemBuffer { - private final GetWorkTimingInfosTracker workTimingInfosTracker; - private String computation; - @Nullable private Instant inputDataWatermark; - @Nullable private Instant synchronizedProcessingTime; - private ByteString data; - private long bufferedSize; - - @SuppressWarnings("initialization.fields.uninitialized") - WorkItemBuffer() { - workTimingInfosTracker = new GetWorkTimingInfosTracker(System::currentTimeMillis); - data = ByteString.EMPTY; - bufferedSize = 0; - } - - @SuppressWarnings("NullableProblems") - private void setMetadata(Windmill.ComputationWorkItemMetadata metadata) { - this.computation = metadata.getComputationId(); - this.inputDataWatermark = - WindmillTimeUtils.windmillToHarnessWatermark(metadata.getInputDataWatermark()); - this.synchronizedProcessingTime = - WindmillTimeUtils.windmillToHarnessWatermark( - metadata.getDependentRealtimeInputWatermark()); - } - - private void append(StreamingGetWorkResponseChunk chunk) { - if (chunk.hasComputationMetadata()) { - setMetadata(chunk.getComputationMetadata()); - } - - this.data = data.concat(chunk.getSerializedWorkItem()); - this.bufferedSize += chunk.getSerializedWorkItem().size(); - workTimingInfosTracker.addTimingInfo(chunk.getPerWorkItemTimingInfosList()); - } - - private long bufferedSize() { - return bufferedSize; - } - - private void runAndReset() { - try { - Windmill.WorkItem workItem = Windmill.WorkItem.parseFrom(data.newInput()); - List getWorkStreamLatencies = - workTimingInfosTracker.getLatencyAttributions(); - receiver.receiveWork( - computation, - inputDataWatermark, - synchronizedProcessingTime, - workItem, - getWorkStreamLatencies); - } catch (IOException e) { - LOG.error("Failed to parse work item from stream: ", e); - } - workTimingInfosTracker.reset(); - data = ByteString.EMPTY; - bufferedSize = 0; - } - } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java index 17c9f7d80d5da..00784493fe3df 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.dataflow.worker.windmill.work; import java.util.Collection; -import java.util.function.Consumer; import javax.annotation.CheckReturnValue; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; @@ -36,8 +35,6 @@ public interface WorkItemScheduler { * @param workItem {@link WorkItem} to be processed. * @param watermarks processing watermarks for the workItem. * @param processingContext for processing the workItem. - * @param ackWorkItemQueued Called after an attempt to queue the work item for processing. Used to - * free up pending budget. * @param getWorkStreamLatencies Latencies per processing stage for the WorkItem for reporting * back to Streaming Engine backend. */ @@ -45,6 +42,5 @@ void scheduleWork( WorkItem workItem, Watermarks watermarks, Work.ProcessingContext processingContext, - Consumer ackWorkItemQueued, Collection getWorkStreamLatencies); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java index 3a17222d3e6bd..403bb99efb4c5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java @@ -26,14 +26,13 @@ import java.util.Map.Entry; import java.util.function.Function; import java.util.function.Supplier; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.WindmillStreamSender; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Evenly distributes the provided budget across the available {@link WindmillStreamSender}(s). */ +/** Evenly distributes the provided budget across the available {@link GetWorkBudgetSpender}(s). */ @Internal final class EvenGetWorkBudgetDistributor implements GetWorkBudgetDistributor { private static final Logger LOG = LoggerFactory.getLogger(EvenGetWorkBudgetDistributor.class); @@ -50,10 +49,10 @@ private static boolean isBelowFiftyPercentOfTarget( } @Override - public void distributeBudget( - ImmutableCollection streams, GetWorkBudget getWorkBudget) { - if (streams.isEmpty()) { - LOG.debug("Cannot distribute budget to no streams."); + public void distributeBudget( + ImmutableCollection budgetOwners, GetWorkBudget getWorkBudget) { + if (budgetOwners.isEmpty()) { + LOG.debug("Cannot distribute budget to no owners."); return; } @@ -62,23 +61,21 @@ public void distributeBudget( return; } - Map desiredBudgets = - computeDesiredBudgets(streams, getWorkBudget); + Map desiredBudgets = computeDesiredBudgets(budgetOwners, getWorkBudget); - for (Entry streamAndDesiredBudget : - desiredBudgets.entrySet()) { - WindmillStreamSender stream = streamAndDesiredBudget.getKey(); + for (Entry streamAndDesiredBudget : desiredBudgets.entrySet()) { + GetWorkBudgetSpender getWorkBudgetSpender = streamAndDesiredBudget.getKey(); GetWorkBudget desired = streamAndDesiredBudget.getValue(); - GetWorkBudget remaining = stream.remainingGetWorkBudget(); + GetWorkBudget remaining = getWorkBudgetSpender.remainingBudget(); if (isBelowFiftyPercentOfTarget(remaining, desired)) { GetWorkBudget adjustment = desired.subtract(remaining); - stream.adjustBudget(adjustment); + getWorkBudgetSpender.adjustBudget(adjustment); } } } - private ImmutableMap computeDesiredBudgets( - ImmutableCollection streams, GetWorkBudget totalGetWorkBudget) { + private ImmutableMap computeDesiredBudgets( + ImmutableCollection streams, GetWorkBudget totalGetWorkBudget) { GetWorkBudget activeWorkBudget = activeWorkBudgetSupplier.get(); LOG.info("Current active work budget: {}", activeWorkBudget); // TODO: Fix possibly non-deterministic handing out of budgets. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetDistributor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetDistributor.java index 3ec9718e041e5..d21de17e522c5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetDistributor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetDistributor.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.budget; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.WindmillStreamSender; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; @@ -28,6 +27,6 @@ */ @Internal public interface GetWorkBudgetDistributor { - void distributeBudget( - ImmutableCollection streams, GetWorkBudget getWorkBudget); + void distributeBudget( + ImmutableCollection streams, GetWorkBudget getWorkBudget); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetSpender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetSpender.java new file mode 100644 index 0000000000000..254b2589062ef --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetSpender.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.budget; + +/** + * Represents something that spends {@link + * org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget} + */ +public interface GetWorkBudgetSpender { + void adjustBudget(long itemsDelta, long bytesDelta); + + default void adjustBudget(GetWorkBudget adjustment) { + adjustBudget(adjustment.items(), adjustment.bytes()); + } + + GetWorkBudget remainingBudget(); +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 5855057c4210c..d16ed2942fd9c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -2182,7 +2182,7 @@ public void testMergeWindowsCaching() throws Exception { // No input messages assertEquals(0L, splitIntToLong(getCounter(counters, "WindmillShuffleBytesRead").getInteger())); - CacheStats stats = worker.stateCache.getCacheStats(); + CacheStats stats = worker.getStateCacheStats(); LOG.info("cache stats {}", stats); assertEquals(1, stats.hitCount()); assertEquals(4, stats.missCount()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java similarity index 93% rename from runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java rename to runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java index 1999dbe319027..aaa71b6598ea2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; +package org.apache.beam.runners.dataflow.worker.streaming.harness; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -49,12 +49,15 @@ import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.ThrottlingGetDataMetricTracker; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory; import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetSpender; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessSocketAddress; @@ -76,7 +79,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class StreamingEngineClientTest { +public class FanOutStreamingEngineWorkerHarnessTest { private static final WindmillServiceAddress DEFAULT_WINDMILL_SERVICE_ADDRESS = WindmillServiceAddress.create(HostAndPort.fromParts(WindmillChannelFactory.LOCALHOST, 443)); private static final ImmutableMap DEFAULT = @@ -113,14 +116,10 @@ public class StreamingEngineClientTest { private Server fakeStreamingEngineServer; private CountDownLatch getWorkerMetadataReady; private GetWorkerMetadataTestStub fakeGetWorkerMetadataStub; - private StreamingEngineClient streamingEngineClient; + private FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkProvider; private static WorkItemScheduler noOpProcessWorkItemFn() { - return (workItem, - watermarks, - processingContext, - ackWorkItemQueued, - getWorkStreamLatencies) -> {}; + return (workItem, watermarks, processingContext, getWorkStreamLatencies) -> {}; } private static GetWorkRequest getWorkRequest(long items, long bytes) { @@ -163,16 +162,16 @@ public void setUp() throws IOException { @After public void cleanUp() { - Preconditions.checkNotNull(streamingEngineClient).finish(); + Preconditions.checkNotNull(fanOutStreamingEngineWorkProvider).shutdown(); fakeStreamingEngineServer.shutdownNow(); stubFactory.shutdown(); } - private StreamingEngineClient newStreamingEngineClient( + private FanOutStreamingEngineWorkerHarness newStreamingEngineClient( GetWorkBudget getWorkBudget, GetWorkBudgetDistributor getWorkBudgetDistributor, WorkItemScheduler workItemScheduler) { - return StreamingEngineClient.forTesting( + return FanOutStreamingEngineWorkerHarness.forTesting( JOB_HEADER, getWorkBudget, streamFactory, @@ -194,7 +193,7 @@ public void testStreamsStartCorrectly() throws InterruptedException { TestGetWorkBudgetDistributor getWorkBudgetDistributor = spy(new TestGetWorkBudgetDistributor(numBudgetDistributionsExpected)); - streamingEngineClient = + fanOutStreamingEngineWorkProvider = newStreamingEngineClient( GetWorkBudget.builder().setItems(items).setBytes(bytes).build(), getWorkBudgetDistributor, @@ -216,7 +215,7 @@ public void testStreamsStartCorrectly() throws InterruptedException { waitForWorkerMetadataToBeConsumed(getWorkBudgetDistributor); StreamingEngineConnectionState currentConnections = - streamingEngineClient.getCurrentConnections(); + fanOutStreamingEngineWorkProvider.getCurrentConnections(); assertEquals(2, currentConnections.windmillConnections().size()); assertEquals(2, currentConnections.windmillStreams().size()); @@ -250,7 +249,7 @@ public void testStreamsStartCorrectly() throws InterruptedException { public void testScheduledBudgetRefresh() throws InterruptedException { TestGetWorkBudgetDistributor getWorkBudgetDistributor = spy(new TestGetWorkBudgetDistributor(2)); - streamingEngineClient = + fanOutStreamingEngineWorkProvider = newStreamingEngineClient( GetWorkBudget.builder().setItems(1L).setBytes(1L).build(), getWorkBudgetDistributor, @@ -273,7 +272,7 @@ public void testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers() int metadataCount = 2; TestGetWorkBudgetDistributor getWorkBudgetDistributor = spy(new TestGetWorkBudgetDistributor(metadataCount)); - streamingEngineClient = + fanOutStreamingEngineWorkProvider = newStreamingEngineClient( GetWorkBudget.builder().setItems(1).setBytes(1).build(), getWorkBudgetDistributor, @@ -311,11 +310,12 @@ public void testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers() fakeGetWorkerMetadataStub.injectWorkerMetadata(secondWorkerMetadata); waitForWorkerMetadataToBeConsumed(getWorkBudgetDistributor); StreamingEngineConnectionState currentConnections = - streamingEngineClient.getCurrentConnections(); + fanOutStreamingEngineWorkProvider.getCurrentConnections(); assertEquals(1, currentConnections.windmillConnections().size()); assertEquals(1, currentConnections.windmillStreams().size()); Set workerTokens = - streamingEngineClient.getCurrentConnections().windmillConnections().values().stream() + fanOutStreamingEngineWorkProvider.getCurrentConnections().windmillConnections().values() + .stream() .map(WindmillConnection::backendWorkerToken) .collect(Collectors.toSet()); @@ -362,7 +362,7 @@ public void testOnNewWorkerMetadata_redistributesBudget() throws InterruptedExce TestGetWorkBudgetDistributor getWorkBudgetDistributor = spy(new TestGetWorkBudgetDistributor(workerMetadataResponses.size())); - streamingEngineClient = + fanOutStreamingEngineWorkProvider = newStreamingEngineClient( GetWorkBudget.builder().setItems(1).setBytes(1).build(), getWorkBudgetDistributor, @@ -439,8 +439,8 @@ private void waitForBudgetDistribution() throws InterruptedException { } @Override - public void distributeBudget( - ImmutableCollection streams, GetWorkBudget getWorkBudget) { + public void distributeBudget( + ImmutableCollection streams, GetWorkBudget getWorkBudget) { streams.forEach(stream -> stream.adjustBudget(getWorkBudget.items(), getWorkBudget.bytes())); getWorkBudgetDistributorTriggered.countDown(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java similarity index 97% rename from runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java rename to runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java index 9d49c3ef3146d..dc6cc5641055a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; +package org.apache.beam.runners.dataflow.worker.streaming.harness; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -35,6 +35,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; @@ -64,7 +65,7 @@ public class WindmillStreamSenderTest { .build()) .build()); private final WorkItemScheduler workItemScheduler = - (workItem, watermarks, processingContext, ackWorkItemQueued, getWorkStreamLatencies) -> {}; + (workItem, watermarks, processingContext, getWorkStreamLatencies) -> {}; @Rule public transient Timeout globalTimeout = Timeout.seconds(600); private ManagedChannel inProcessChannel; private WindmillConnection connection; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java index 96c675169a7d2..d234cf424767b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/ChannelzServletTest.java @@ -56,7 +56,8 @@ public void testRendersAllChannels() throws UnsupportedEncodingException { fakeWindmillServer.setWindmillServiceEndpoints( ImmutableSet.of(HostAndPort.fromHost(windmill1), HostAndPort.fromHost(windmill2))); options.setChannelzShowOnlyWindmillServiceChannels(false); - ChannelzServlet channelzServlet = new ChannelzServlet("/channelz", options, fakeWindmillServer); + ChannelzServlet channelzServlet = + new ChannelzServlet("/channelz", options, fakeWindmillServer::getWindmillServiceEndpoints); StringWriter stringWriter = new StringWriter(); PrintWriter writer = new PrintWriter(stringWriter); channelzServlet.captureData(writer); @@ -88,7 +89,8 @@ public void testRendersOnlyWindmillChannels() throws UnsupportedEncodingExceptio fakeWindmillServer.setWindmillServiceEndpoints( ImmutableSet.of(HostAndPort.fromHost(windmill1), HostAndPort.fromHost(windmill2))); options.setChannelzShowOnlyWindmillServiceChannels(true); - ChannelzServlet channelzServlet = new ChannelzServlet("/channelz", options, fakeWindmillServer); + ChannelzServlet channelzServlet = + new ChannelzServlet("/channelz", options, fakeWindmillServer::getWindmillServiceEndpoints); StringWriter stringWriter = new StringWriter(); PrintWriter writer = new PrintWriter(stringWriter); channelzServlet.captureData(writer); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java index 5cfc19ac07dfd..7e5801b65de47 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java @@ -1142,7 +1142,13 @@ public void onNext(StreamingGetWorkRequest request) { StreamingGetWorkResponseChunk.newBuilder() .setStreamId(id) .setSerializedWorkItem(serializedResponse) - .setRemainingBytesForWorkItem(0); + .setRemainingBytesForWorkItem(0) + .setComputationMetadata( + ComputationWorkItemMetadata.newBuilder() + .setComputationId("computation") + .setInputDataWatermark(1L) + .setDependentRealtimeInputWatermark(1L) + .build()); try { responseObserver.onNext(builder.build()); } catch (IllegalStateException e) { @@ -1175,9 +1181,7 @@ public void onCompleted() { @Nullable Instant inputDataWatermark, Instant synchronizedProcessingTime, Windmill.WorkItem workItem, - Collection getWorkStreamLatencies) -> { - latch.countDown(); - }); + Collection getWorkStreamLatencies) -> latch.countDown()); // Wait for 100 items or 30 seconds. assertTrue(latch.await(30, TimeUnit.SECONDS)); // Confirm that we report at least as much throttle time as our server sent errors for. We will diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java index b0c305dc4ec45..3cda4559c100b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java @@ -19,7 +19,6 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -28,20 +27,8 @@ import java.util.ArrayList; import java.util.List; -import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection; -import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; -import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.WindmillStreamSender; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.junit.After; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -52,8 +39,6 @@ public class EvenGetWorkBudgetDistributorTest { @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); @Rule public transient Timeout globalTimeout = Timeout.seconds(600); - private ManagedChannel inProcessChannel; - private CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub stub; private static GetWorkBudgetDistributor createBudgetDistributor(GetWorkBudget activeWorkBudget) { return GetWorkBudgetDistributors.distributeEvenly(() -> activeWorkBudget); @@ -67,20 +52,6 @@ private static GetWorkBudgetDistributor createBudgetDistributor(long activeWorkI .build()); } - @Before - public void setUp() { - inProcessChannel = - grpcCleanup.register( - InProcessChannelBuilder.forName("WindmillStreamSenderTest").directExecutor().build()); - grpcCleanup.register(inProcessChannel); - stub = CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel); - } - - @After - public void cleanUp() { - inProcessChannel.shutdownNow(); - } - @Test public void testDistributeBudget_doesNothingWhenPassedInStreamsEmpty() { createBudgetDistributor(1L) @@ -90,38 +61,40 @@ public void testDistributeBudget_doesNothingWhenPassedInStreamsEmpty() { @Test public void testDistributeBudget_doesNothingWithNoBudget() { - WindmillStreamSender windmillStreamSender = - spy(createWindmillStreamSender(GetWorkBudget.noBudget())); + GetWorkBudgetSpender getWorkBudgetSpender = + spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(GetWorkBudget.noBudget())); createBudgetDistributor(1L) - .distributeBudget(ImmutableList.of(windmillStreamSender), GetWorkBudget.noBudget()); - verifyNoInteractions(windmillStreamSender); + .distributeBudget(ImmutableList.of(getWorkBudgetSpender), GetWorkBudget.noBudget()); + verifyNoInteractions(getWorkBudgetSpender); } @Test public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHighNoActiveWork() { - WindmillStreamSender windmillStreamSender = + GetWorkBudgetSpender getWorkBudgetSpender = spy( - createWindmillStreamSender( + createGetWorkBudgetOwnerWithRemainingBudgetOf( GetWorkBudget.builder().setItems(10L).setBytes(10L).build())); createBudgetDistributor(0L) .distributeBudget( - ImmutableList.of(windmillStreamSender), + ImmutableList.of(getWorkBudgetSpender), GetWorkBudget.builder().setItems(10L).setBytes(10L).build()); - verify(windmillStreamSender, never()).adjustBudget(anyLong(), anyLong()); + verify(getWorkBudgetSpender, never()).adjustBudget(anyLong(), anyLong()); } @Test public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHighWithActiveWork() { - WindmillStreamSender windmillStreamSender = - spy(createWindmillStreamSender(GetWorkBudget.builder().setItems(5L).setBytes(5L).build())); + GetWorkBudgetSpender getWorkBudgetSpender = + spy( + createGetWorkBudgetOwnerWithRemainingBudgetOf( + GetWorkBudget.builder().setItems(5L).setBytes(5L).build())); createBudgetDistributor(10L) .distributeBudget( - ImmutableList.of(windmillStreamSender), + ImmutableList.of(getWorkBudgetSpender), GetWorkBudget.builder().setItems(20L).setBytes(20L).build()); - verify(windmillStreamSender, never()).adjustBudget(anyLong(), anyLong()); + verify(getWorkBudgetSpender, never()).adjustBudget(anyLong(), anyLong()); } @Test @@ -130,12 +103,12 @@ public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHig GetWorkBudget streamRemainingBudget = GetWorkBudget.builder().setItems(1L).setBytes(10L).build(); GetWorkBudget totalGetWorkBudget = GetWorkBudget.builder().setItems(10L).setBytes(10L).build(); - WindmillStreamSender windmillStreamSender = - spy(createWindmillStreamSender(streamRemainingBudget)); + GetWorkBudgetSpender getWorkBudgetSpender = + spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget)); createBudgetDistributor(0L) - .distributeBudget(ImmutableList.of(windmillStreamSender), totalGetWorkBudget); + .distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget); - verify(windmillStreamSender, times(1)) + verify(getWorkBudgetSpender, times(1)) .adjustBudget( eq(totalGetWorkBudget.items() - streamRemainingBudget.items()), eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes())); @@ -148,12 +121,12 @@ public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHig GetWorkBudget.builder().setItems(1L).setBytes(10L).build(); GetWorkBudget totalGetWorkBudget = GetWorkBudget.builder().setItems(10L).setBytes(10L).build(); long activeWorkItemsAndBytes = 2L; - WindmillStreamSender windmillStreamSender = - spy(createWindmillStreamSender(streamRemainingBudget)); + GetWorkBudgetSpender getWorkBudgetSpender = + spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget)); createBudgetDistributor(activeWorkItemsAndBytes) - .distributeBudget(ImmutableList.of(windmillStreamSender), totalGetWorkBudget); + .distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget); - verify(windmillStreamSender, times(1)) + verify(getWorkBudgetSpender, times(1)) .adjustBudget( eq( totalGetWorkBudget.items() @@ -167,12 +140,12 @@ public void testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLo GetWorkBudget streamRemainingBudget = GetWorkBudget.builder().setItems(10L).setBytes(1L).build(); GetWorkBudget totalGetWorkBudget = GetWorkBudget.builder().setItems(10L).setBytes(10L).build(); - WindmillStreamSender windmillStreamSender = - spy(createWindmillStreamSender(streamRemainingBudget)); + GetWorkBudgetSpender getWorkBudgetSpender = + spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget)); createBudgetDistributor(0L) - .distributeBudget(ImmutableList.of(windmillStreamSender), totalGetWorkBudget); + .distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget); - verify(windmillStreamSender, times(1)) + verify(getWorkBudgetSpender, times(1)) .adjustBudget( eq(totalGetWorkBudget.items() - streamRemainingBudget.items()), eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes())); @@ -186,12 +159,12 @@ public void testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLo GetWorkBudget totalGetWorkBudget = GetWorkBudget.builder().setItems(10L).setBytes(10L).build(); long activeWorkItemsAndBytes = 2L; - WindmillStreamSender windmillStreamSender = - spy(createWindmillStreamSender(streamRemainingBudget)); + GetWorkBudgetSpender getWorkBudgetSpender = + spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget)); createBudgetDistributor(activeWorkItemsAndBytes) - .distributeBudget(ImmutableList.of(windmillStreamSender), totalGetWorkBudget); + .distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget); - verify(windmillStreamSender, times(1)) + verify(getWorkBudgetSpender, times(1)) .adjustBudget( eq(totalGetWorkBudget.items() - streamRemainingBudget.items()), eq( @@ -203,9 +176,9 @@ public void testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLo @Test public void testDistributeBudget_distributesBudgetEvenlyIfPossible() { long totalItemsAndBytes = 10L; - List streams = new ArrayList<>(); + List streams = new ArrayList<>(); for (int i = 0; i < totalItemsAndBytes; i++) { - streams.add(spy(createWindmillStreamSender(GetWorkBudget.noBudget()))); + streams.add(spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(GetWorkBudget.noBudget()))); } createBudgetDistributor(0L) .distributeBudget( @@ -225,9 +198,9 @@ public void testDistributeBudget_distributesBudgetEvenlyIfPossible() { @Test public void testDistributeBudget_distributesFairlyWhenNotEven() { long totalItemsAndBytes = 10L; - List streams = new ArrayList<>(); + List streams = new ArrayList<>(); for (int i = 0; i < 3; i++) { - streams.add(spy(createWindmillStreamSender(GetWorkBudget.noBudget()))); + streams.add(spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(GetWorkBudget.noBudget()))); } createBudgetDistributor(0L) .distributeBudget( @@ -244,24 +217,17 @@ public void testDistributeBudget_distributesFairlyWhenNotEven() { .adjustBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream))); } - private WindmillStreamSender createWindmillStreamSender(GetWorkBudget getWorkBudget) { - return WindmillStreamSender.create( - WindmillConnection.builder().setStub(stub).build(), - Windmill.GetWorkRequest.newBuilder() - .setClientId(1L) - .setJobId("job") - .setProjectId("project") - .build(), - getWorkBudget, - GrpcWindmillStreamFactory.of( - JobHeader.newBuilder() - .setJobId("job") - .setProjectId("project") - .setWorkerId("worker") - .build()) - .build(), - (workItem, watermarks, processingContext, ackWorkItemQueued, getWorkStreamLatencies) -> {}, - ignored -> mock(GetDataClient.class), - ignored -> mock(WorkCommitter.class)); + private GetWorkBudgetSpender createGetWorkBudgetOwnerWithRemainingBudgetOf( + GetWorkBudget getWorkBudget) { + return spy( + new GetWorkBudgetSpender() { + @Override + public void adjustBudget(long itemsDelta, long bytesDelta) {} + + @Override + public GetWorkBudget remainingBudget() { + return getWorkBudget; + } + }); } }