From 745d87ce5a5c60cb356f35800fffc1efa37de68c Mon Sep 17 00:00:00 2001 From: m-trieu Date: Mon, 30 Sep 2024 14:13:53 -0700 Subject: [PATCH] address PR comments --- .../worker/StreamingDataflowWorker.java | 224 ++++++++++-------- .../FanOutStreamingEngineWorkerHarness.java | 64 ++--- .../harness/WindmillStreamSender.java | 7 +- .../worker/windmill/WindmillEndpoints.java | 1 + .../client/AbstractWindmillStream.java | 9 +- .../client/grpc/GrpcCommitWorkStream.java | 2 +- .../budget/EvenGetWorkBudgetDistributor.java | 4 +- .../work/budget/GetWorkBudgetSpender.java | 6 +- ...anOutStreamingEngineWorkerHarnessTest.java | 2 +- .../EvenGetWorkBudgetDistributorTest.java | 20 +- 10 files changed, 191 insertions(+), 148 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 075168d6ff27c..06b889cc306b4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -21,6 +21,7 @@ import com.google.api.services.dataflow.model.MapTask; import com.google.auto.value.AutoValue; +import java.io.PrintWriter; import java.util.List; import java.util.Map; import java.util.Optional; @@ -32,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.Nullable; @@ -63,7 +65,6 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer; import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; @@ -82,8 +83,6 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; @@ -105,10 +104,11 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.construction.CoderTranslation; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; import org.joda.time.Instant; @@ -131,6 +131,17 @@ public final class StreamingDataflowWorker { private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorker.class); + ////////// Beam Experiments ////////// + private static final String BEAM_FN_API_EXPERIMENT = "beam_fn_api"; + private static final String ENABLE_IPV6_EXPERIMENT = "enable_private_ipv6_google_access"; + private static final String DISABLE_HEARTBEAT_REQUESTS_EXPERIMENT = + "streaming_engine_disable_new_heartbeat_requests"; + private static final String DISABLE_BIQ_QUERY_PER_WORKER_METRICS_EXPERIMENT = + "disable_per_worker_metrics"; + private static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL = + "streaming_engine_use_job_settings_for_heartbeat_pool"; + ////////////////////////////////////// + /** * Maximum number of threads for processing. Currently, each thread processes one key at a time. */ @@ -151,10 +162,6 @@ public final class StreamingDataflowWorker { private static final int DEFAULT_STATUS_PORT = 8081; private static final Random CLIENT_ID_GENERATOR = new Random(); private static final String CHANNELZ_PATH = "/channelz"; - public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL = - "streaming_engine_use_job_settings_for_heartbeat_pool"; - private static final String BEAM_FN_API_EXPERIMENT = "beam_fn_api"; - private static final String ENABLE_IPV6_EXPERIMENT = "enable_private_ipv6_google_access"; private final WindmillStateCache stateCache; private final StreamingWorkerStatusPages statusPages; @@ -203,6 +210,11 @@ private StreamingDataflowWorker( this.options = options; this.workUnitExecutor = workUnitExecutor; this.memoryMonitor = BackgroundMemoryMonitor.create(memoryMonitor); + this.numCommitThreads = + options.isEnableStreamingEngine() + ? Math.max(options.getWindmillServiceCommitThreads(), 1) + : 1; + StreamingWorkScheduler streamingWorkScheduler = StreamingWorkScheduler.create( options, @@ -219,65 +231,32 @@ private StreamingDataflowWorker( ID_GENERATOR, configFetcher.getGlobalConfigHandle(), stageInfoMap); - ThrottlingGetDataMetricTracker getDataMetricTracker = new ThrottlingGetDataMetricTracker(memoryMonitor); - WorkerStatusPages workerStatusPages = - WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); - StreamingWorkerStatusPages.Builder statusPagesBuilder = StreamingWorkerStatusPages.builder(); - ChannelCachingStubFactory stubFactory = createStubFactory(options); - int stuckCommitDurationMillis = - options.isEnableStreamingEngine() ? Math.max(options.getStuckCommitDurationMillis(), 0) : 0; - this.numCommitThreads = - options.isEnableStreamingEngine() - ? Math.max(options.getWindmillServiceCommitThreads(), 1) - : 1; - + Runnable waitForResources = () -> memoryMonitor.waitForResources("GetWork"); + // Status page members. Different implementations on whether the harness is streaming engine + // direct path, streaming engine cloud path, or streaming appliance. + @Nullable ChannelzServlet channelzServlet = null; + Consumer getDataStatusProvider; + Supplier currentActiveCommitBytesProvider; if (isDirectPathPipeline(options)) { FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = - FanOutStreamingEngineWorkerHarness.create( - createJobHeader(options, clientId), - GetWorkBudget.builder() - .setItems(chooseMaxBundlesOutstanding(options)) - .setBytes(MAX_GET_WORK_FETCH_BYTES) - .build(), + createFanOutStreamingEngineWorkerHarness( + options, + clientId, windmillStreamFactory, - (workItem, watermarks, processingContext, getWorkStreamLatencies) -> - computationStateCache - .get(processingContext.computationId()) - .ifPresent( - computationState -> { - memoryMonitor.waitForResources("GetWork"); - streamingWorkScheduler.scheduleWork( - computationState, - workItem, - watermarks, - processingContext, - getWorkStreamLatencies); - }), - stubFactory, - GetWorkBudgetDistributors.distributeEvenly(), + computationStateCache, + waitForResources, + streamingWorkScheduler, Preconditions.checkNotNull(dispatcherClient), - commitWorkStream -> - StreamingEngineWorkCommitter.builder() - .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) - .setOnCommitComplete(this::onCompleteCommit) - .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) - .setCommitWorkStreamFactory( - () -> CloseableStream.create(commitWorkStream, () -> {})) - .build(), - getDataMetricTracker); - statusPagesBuilder - .setGetDataStatusProvider(getDataMetricTracker::printHtml) - .setCurrentActiveCommitBytes(fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes) - .setDebugCapture( - new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) - .setChannelzServlet( - new ChannelzServlet( - CHANNELZ_PATH, - options, - fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints)) - .setWindmillStreamFactory(windmillStreamFactory); + getDataMetricTracker, + this::onCompleteCommit); + getDataStatusProvider = getDataMetricTracker::printHtml; + currentActiveCommitBytesProvider = + fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; + channelzServlet = + createChannelZServlet( + options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness; } else { Windmill.GetWorkRequest request = createGetWorkRequest(clientId, options); @@ -295,13 +274,8 @@ private StreamingDataflowWorker( heartbeatSender = createStreamingEngineHeartbeatSender( options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); - statusPagesBuilder - .setDebugCapture( - new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) - .setChannelzServlet( - new ChannelzServlet( - CHANNELZ_PATH, options, windmillServer::getWindmillServiceEndpoints)) - .setWindmillStreamFactory(windmillStreamFactory); + channelzServlet = + createChannelZServlet(options, windmillServer::getWindmillServiceEndpoints); workCommitter = StreamingEngineWorkCommitter.builder() .setCommitWorkStreamFactory( @@ -325,9 +299,8 @@ private StreamingDataflowWorker( getWorkSender = GetWorkSender.forAppliance(() -> windmillServer.getWork(request)); } - statusPagesBuilder - .setGetDataStatusProvider(getDataClient::printHtml) - .setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes); + getDataStatusProvider = getDataClient::printHtml; + currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; this.streamingWorkerHarness = SingleSourceWorkerHarness.builder() @@ -349,31 +322,108 @@ private StreamingDataflowWorker( new ActiveWorkRefresher( clock, options.getActiveWorkRefreshPeriodMillis(), - stuckCommitDurationMillis, + options.isEnableStreamingEngine() + ? Math.max(options.getStuckCommitDurationMillis(), 0) + : 0, computationStateCache::getAllPresentComputations, sampler, executorSupplier.apply("RefreshWork"), getDataMetricTracker::trackHeartbeats); this.statusPages = - statusPagesBuilder + createStatusPageBuilder(options, windmillStreamFactory, memoryMonitor) .setClock(clock) .setClientId(clientId) .setIsRunning(running) - .setStatusPages(workerStatusPages) .setStateCache(stateCache) .setComputationStateCache(this.computationStateCache) .setWorkUnitExecutor(workUnitExecutor) .setGlobalConfigHandle(configFetcher.getGlobalConfigHandle()) + .setChannelzServlet(channelzServlet) + .setGetDataStatusProvider(getDataStatusProvider) + .setCurrentActiveCommitBytes(currentActiveCommitBytesProvider) .build(); LOG.debug("isDirectPathEnabled: {}", options.getIsWindmillServiceDirectPathEnabled()); - LOG.debug("windmillServiceEnabled: {}", options.isEnableStreamingEngine()); + LOG.debug("isWindmillServiceEnabled: {}", options.isEnableStreamingEngine()); LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint()); LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort()); LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } + private static FanOutStreamingEngineWorkerHarness createFanOutStreamingEngineWorkerHarness( + DataflowWorkerHarnessOptions options, + long clientId, + GrpcWindmillStreamFactory windmillStreamFactory, + ComputationStateCache computationStateCache, + Runnable waitForResources, + StreamingWorkScheduler streamingWorkScheduler, + GrpcDispatcherClient dispatcherClient, + ThrottlingGetDataMetricTracker getDataMetricTracker, + Consumer onCommitComplete) { + return FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, watermarks, processingContext, getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + waitForResources.run(); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + ChannelCachingRemoteStubFactory.create( + options.getGcpCredential(), + ChannelCache.create( + serviceAddress -> + remoteChannel( + serviceAddress, options.getWindmillServiceRpcChannelAliveTimeoutSec()))), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(onCommitComplete) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + } + + private static StreamingWorkerStatusPages.Builder createStatusPageBuilder( + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + MemoryMonitor memoryMonitor) { + WorkerStatusPages workerStatusPages = + WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); + + StreamingWorkerStatusPages.Builder streamingStatusPages = + StreamingWorkerStatusPages.builder().setStatusPages(workerStatusPages); + + return options.isEnableStreamingEngine() + ? streamingStatusPages + .setDebugCapture( + new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) + .setWindmillStreamFactory(windmillStreamFactory) + : streamingStatusPages; + } + + private static ChannelzServlet createChannelZServlet( + DataflowWorkerHarnessOptions options, + Supplier> windmillEndpointProvider) { + return new ChannelzServlet(CHANNELZ_PATH, options, windmillEndpointProvider); + } + private static HeartbeatSender createStreamingEngineHeartbeatSender( DataflowWorkerHarnessOptions options, WindmillServerStub windmillClient, @@ -582,24 +632,6 @@ private static void validateWorkerOptions(DataflowWorkerHarnessOptions options) StreamingDataflowWorker.class.getSimpleName()); } - private static ChannelCachingStubFactory createStubFactory( - DataflowWorkerHarnessOptions workerOptions) { - Function channelFactory = - serviceAddress -> - remoteChannel( - serviceAddress, workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()); - ChannelCache channelCache = - ChannelCache.create( - serviceAddress -> - // IsolationChannel will create and manage separate RPC channels to the same - // serviceAddress via calling the channelFactory, else just directly return the - // RPC channel. - workerOptions.getUseWindmillIsolatedChannels() - ? IsolationChannel.create(() -> channelFactory.apply(serviceAddress)) - : channelFactory.apply(serviceAddress)); - return ChannelCachingRemoteStubFactory.create(workerOptions.getGcpCredential(), channelCache); - } - @VisibleForTesting static StreamingDataflowWorker forTesting( Map prePopulatedStateNameMappings, @@ -736,8 +768,7 @@ private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactory .setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit()) .setSendKeyedGetDataRequests( !options.isEnableStreamingEngine() - || DataflowRunner.hasExperiment( - options, "streaming_engine_disable_new_heartbeat_requests")); + || DataflowRunner.hasExperiment(options, DISABLE_HEARTBEAT_REQUESTS_EXPERIMENT)); } private static JobHeader createJobHeader(DataflowWorkerHarnessOptions options, long clientId) { @@ -779,7 +810,8 @@ public static void main(String[] args) throws Exception { MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null)); if (options.isEnableStreamingEngine() - && !DataflowRunner.hasExperiment(options, "disable_per_worker_metrics")) { + && !DataflowRunner.hasExperiment( + options, DISABLE_BIQ_QUERY_PER_WORKER_METRICS_EXPERIMENT)) { enableBigQueryMetrics(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java index e77f238bf152a..a29be5955c44c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java @@ -26,7 +26,6 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; @@ -102,8 +101,11 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorker /** Writes are guarded by synchronization, reads are lock free. */ private final AtomicReference connections; + @GuardedBy("this") + private long activeMetadataVersion; + @GuardedBy("metadataLock") - private long metadataVersion; + private long pendingMetadataVersion; @GuardedBy("this") private boolean started; @@ -132,10 +134,18 @@ private FanOutStreamingEngineWorkerHarness( new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build()); this.workerMetadataConsumer = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build()); + new ThreadFactoryBuilder() + .setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME) + .setUncaughtExceptionHandler( + (t, e) -> + LOG.error( + "{} failed due to uncaught exception during execution.", + t.getName(), + e)) + .build()); this.getWorkBudgetDistributor = getWorkBudgetDistributor; this.totalGetWorkBudget = totalGetWorkBudget; - this.metadataVersion = Long.MIN_VALUE; + this.activeMetadataVersion = Long.MIN_VALUE; this.getWorkerMetadataStream = Suppliers.memoize(createGetWorkerMetadataStream()::get); this.workCommitterFactory = workCommitterFactory; this.metadataLock = new Object(); @@ -259,9 +269,9 @@ private Supplier createGetWorkerMetadataStream( private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) { synchronized (metadataLock) { // Only process versions greater than what we currently have to prevent double processing of - // metadata. - if (windmillEndpoints.version() > metadataVersion) { - metadataVersion = windmillEndpoints.version(); + // metadata. workerMetadataConsumer is single-threaded so we maintain ordering. + if (windmillEndpoints.version() > pendingMetadataVersion) { + pendingMetadataVersion = windmillEndpoints.version(); workerMetadataConsumer.execute(() -> consumeWindmillWorkerEndpoints(windmillEndpoints)); } } @@ -272,11 +282,12 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi // queued up while a previous version of the windmillEndpoints were being consumed. Only consume // the endpoints if they are the most current version. synchronized (metadataLock) { - if (newWindmillEndpoints.version() < metadataVersion) { + if (newWindmillEndpoints.version() < pendingMetadataVersion) { return; } } + long previousMetadataVersion = activeMetadataVersion; LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints); ImmutableMap newWindmillConnections = createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints()); @@ -296,6 +307,11 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi connections.get()); connections.set(newConnectionsState); getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget); + activeMetadataVersion = newWindmillEndpoints.version(); + LOG.info( + "Consumed new endpoints. previous metadata version: {}, current metadata version: {}", + previousMetadataVersion, + activeMetadataVersion); } /** Close the streams that are no longer valid asynchronously. */ @@ -329,25 +345,19 @@ private void closeStaleStreams( createAndStartNewStreams(Collection newWindmillConnections) { ImmutableMap currentStreams = connections.get().windmillStreams(); - CompletionStage>> - connectionAndSenderFuture = - MoreFutures.allAsList( - newWindmillConnections.stream() - .map( - connection -> - MoreFutures.supplyAsync( - () -> - Pair.of( - connection, - Optional.ofNullable(currentStreams.get(connection)) - .orElseGet( - () -> - createAndStartWindmillStreamSender( - connection))), - windmillStreamManager)) - .collect(Collectors.toList())); - - return connectionAndSenderFuture + return MoreFutures.allAsList( + newWindmillConnections.stream() + .map( + connection -> + MoreFutures.supplyAsync( + () -> + Pair.of( + connection, + Optional.ofNullable(currentStreams.get(connection)) + .orElseGet( + () -> createAndStartWindmillStreamSender(connection))), + windmillStreamManager)) + .collect(Collectors.toList())) .thenApply( connectionsAndSenders -> connectionsAndSenders.stream() diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java index 3d93629951ea9..8932063e22c03 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java @@ -154,10 +154,9 @@ void closeAllStreams() { } @Override - public void adjustBudget(long itemsDelta, long bytesDelta) { - GetWorkBudget adjustment = - GetWorkBudget.builder().setItems(itemsDelta).setBytes(bytesDelta).build(); - getWorkBudget.getAndSet(adjustment); + public void setBudget(long items, long bytes) { + GetWorkBudget adjustment = GetWorkBudget.builder().setItems(items).setBytes(bytes).build(); + getWorkBudget.set(adjustment); if (started.get()) { getWorkStream.get().setBudget(adjustment); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java index cca37c20b32f1..c38824ae42c9d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java @@ -124,6 +124,7 @@ private static Optional tryParseDirectEndpointIntoIpV6Address( directEndpointAddress.getHostAddress(), (int) endpointProto.getPort())); } + /** Version of the endpoints which increases with every modification. */ public abstract long version(); /** diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java index 8a5efbd2da965..fa306ce0d93e4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java @@ -244,7 +244,8 @@ protected final void startStream() { } } - // We were never able to start the stream, remove it from the stream registry. + // We were never able to start the stream, remove it from the stream registry. Otherwise, it is + // removed when closed. streamRegistry.remove(this); } @@ -396,7 +397,7 @@ public void onError(Throwable throwable) { } @Override - public synchronized void onCompleted() { + public void onCompleted() { delegate().onCompleted(); } } @@ -473,8 +474,8 @@ private void recordStreamStatus(Status status) { : "received response " + (nowMillis - lastResponseTimeMs.get()) + "ms ago"; logger.debug( - "{} has been restarted {} times. Streaming Windmill RPC Error Count: {}; last was: {} with status: {}." - + " created {}ms ago; {}. This is normal with autoscaling.", + "{} has been restarted {} times. Streaming Windmill RPC Error Count: {}; last was: {}" + + " with status: {}. created {}ms ago; {}. This is normal with autoscaling.", AbstractWindmillStream.this.getClass(), currentRestartCount, currentErrorCount, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java index 87a459eedca9c..a89d178155a3e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java @@ -280,7 +280,7 @@ private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest synchronized (this) { pending.put(id, pendingRequest); for (int i = 0; - i < serializedCommit.size() && !isShutdown(); + i < serializedCommit.size(); i += AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE) { int end = i + AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE; ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size())); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java index 9cc6d5e01aecb..c7f02552cfc3d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java @@ -69,10 +69,10 @@ public void distributeBudget( GetWorkBudget remaining = getWorkBudgetSpender.remainingBudget(); if (isBelowFiftyPercentOfTarget(remaining, desiredBudget)) { GetWorkBudget adjustment = desiredBudget.subtract(remaining); - getWorkBudgetSpender.adjustBudget(adjustment); + getWorkBudgetSpender.setBudget(adjustment); } } else { - getWorkBudgetSpender.adjustBudget(desiredBudget); + getWorkBudgetSpender.setBudget(desiredBudget); } }); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetSpender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetSpender.java index 254b2589062ef..9fbf66d0ada25 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetSpender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetSpender.java @@ -22,10 +22,10 @@ * org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget} */ public interface GetWorkBudgetSpender { - void adjustBudget(long itemsDelta, long bytesDelta); + void setBudget(long items, long bytes); - default void adjustBudget(GetWorkBudget adjustment) { - adjustBudget(adjustment.items(), adjustment.bytes()); + default void setBudget(GetWorkBudget adjustment) { + setBudget(adjustment.items(), adjustment.bytes()); } GetWorkBudget remainingBudget(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java index 747ba10b3c254..5518e6f214559 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java @@ -414,7 +414,7 @@ private void expectNumDistributions(int numBudgetDistributionsExpected) { @Override public void distributeBudget( ImmutableCollection streams, GetWorkBudget getWorkBudget) { - streams.forEach(stream -> stream.adjustBudget(getWorkBudget.items(), getWorkBudget.bytes())); + streams.forEach(stream -> stream.setBudget(getWorkBudget.items(), getWorkBudget.bytes())); getWorkBudgetDistributorTriggered.countDown(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java index b4058f0ce22e9..95756c6a88b5a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java @@ -57,7 +57,7 @@ private static GetWorkBudgetSpender createGetWorkBudgetOwnerWithRemainingBudgetO return spy( new GetWorkBudgetSpender() { @Override - public void adjustBudget(long itemsDelta, long bytesDelta) {} + public void setBudget(long items, long bytes) {} @Override public GetWorkBudget remainingBudget() { @@ -93,7 +93,7 @@ public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHig ImmutableList.of(getWorkBudgetSpender), GetWorkBudget.builder().setItems(10L).setBytes(10L).build()); - verify(getWorkBudgetSpender, never()).adjustBudget(anyLong(), anyLong()); + verify(getWorkBudgetSpender, never()).setBudget(anyLong(), anyLong()); } @Test @@ -108,7 +108,7 @@ public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHig ImmutableList.of(getWorkBudgetSpender), GetWorkBudget.builder().setItems(20L).setBytes(20L).build()); - verify(getWorkBudgetSpender, never()).adjustBudget(anyLong(), anyLong()); + verify(getWorkBudgetSpender, never()).setBudget(anyLong(), anyLong()); } @Test @@ -123,7 +123,7 @@ public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHig .distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget); verify(getWorkBudgetSpender, times(1)) - .adjustBudget( + .setBudget( eq(totalGetWorkBudget.items() - streamRemainingBudget.items()), eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes())); } @@ -141,7 +141,7 @@ public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHig .distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget); verify(getWorkBudgetSpender, times(1)) - .adjustBudget( + .setBudget( eq( totalGetWorkBudget.items() - streamRemainingBudget.items() @@ -160,7 +160,7 @@ public void testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLo .distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget); verify(getWorkBudgetSpender, times(1)) - .adjustBudget( + .setBudget( eq(totalGetWorkBudget.items() - streamRemainingBudget.items()), eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes())); } @@ -179,7 +179,7 @@ public void testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLo .distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget); verify(getWorkBudgetSpender, times(1)) - .adjustBudget( + .setBudget( eq(totalGetWorkBudget.items() - streamRemainingBudget.items()), eq( totalGetWorkBudget.bytes() @@ -206,7 +206,7 @@ public void testDistributeBudget_distributesBudgetEvenlyIfPossible() { streams.forEach( stream -> verify(stream, times(1)) - .adjustBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream))); + .setBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream))); } @Test @@ -228,7 +228,7 @@ public void testDistributeBudget_distributesFairlyWhenNotEven() { streams.forEach( stream -> verify(stream, times(1)) - .adjustBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream))); + .setBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream))); } @Test @@ -258,6 +258,6 @@ public void testDistributeBudget_distributesFairlyWhenNotEven() { streams.forEach( stream -> verify(stream, times(1)) - .adjustBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream))); + .setBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream))); } }