Skip to content

Commit

Permalink
[Dataflow] ActiveWorkRefresh (#30390)
Browse files Browse the repository at this point in the history
* factor out async work management

* add tests

* remove sleep from test

* add sleep in tests
  • Loading branch information
m-trieu authored Feb 29, 2024
1 parent 4ea3898 commit aebb34a
Show file tree
Hide file tree
Showing 5 changed files with 501 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkRefresher;
import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkRefreshers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
Expand Down Expand Up @@ -293,6 +295,7 @@ public class StreamingDataflowWorker {
private int maxWorkItemCommitBytes = Integer.MAX_VALUE;

private final DataflowExecutionStateSampler sampler = DataflowExecutionStateSampler.instance();
private final ActiveWorkRefresher activeWorkRefresher;

private StreamingDataflowWorker(
WindmillServerStub windmillServer,
Expand Down Expand Up @@ -441,6 +444,20 @@ private StreamingDataflowWorker(

this.mapTaskToNetwork = mapTaskToBaseNetwork;

int stuckCommitDurationMillis =
windmillServiceEnabled && options.getStuckCommitDurationMillis() > 0
? options.getStuckCommitDurationMillis()
: 0;
this.activeWorkRefresher =
ActiveWorkRefreshers.createDispatchedActiveWorkRefresher(
clock,
options.getActiveWorkRefreshPeriodMillis(),
stuckCommitDurationMillis,
() -> Collections.unmodifiableCollection(computationMap.values()),
sampler,
metricTrackingWindmillServer::refreshActiveWork,
executorSupplier.apply("RefreshWork"));

LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled);
LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint());
LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
Expand Down Expand Up @@ -722,30 +739,7 @@ public void start() {
scheduledExecutors.add(workerMessageTimer);
}

ScheduledExecutorService refreshWorkTimer = executorSupplier.apply("RefreshWork");
if (options.getActiveWorkRefreshPeriodMillis() > 0) {
refreshWorkTimer.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
try {
refreshActiveWork();
} catch (RuntimeException e) {
LOG.warn("Failed to refresh active work: ", e);
}
}
},
options.getActiveWorkRefreshPeriodMillis(),
options.getActiveWorkRefreshPeriodMillis(),
TimeUnit.MILLISECONDS);
scheduledExecutors.add(refreshWorkTimer);
}
if (windmillServiceEnabled && options.getStuckCommitDurationMillis() > 0) {
int periodMillis = Math.max(options.getStuckCommitDurationMillis() / 10, 100);
refreshWorkTimer.scheduleWithFixedDelay(
this::invalidateStuckCommits, periodMillis, periodMillis, TimeUnit.MILLISECONDS);
scheduledExecutors.add(refreshWorkTimer);
}
activeWorkRefresher.start();

if (options.getPeriodicStatusPageOutputDirectory() != null) {
ScheduledExecutorService statusPageTimer = executorSupplier.apply("DumpStatusPages");
Expand Down Expand Up @@ -832,6 +826,8 @@ public void stop() {
timer.awaitTermination(300, TimeUnit.SECONDS);
}
}

activeWorkRefresher.stop();
statusPages.stop();
if (debugCaptureManager != null) {
debugCaptureManager.stop();
Expand Down Expand Up @@ -2064,33 +2060,6 @@ private void sendWorkerUpdatesToDataflowService(
}
}

/**
* Sends a GetData request to Windmill for all sufficiently old active work.
*
* <p>This informs Windmill that processing is ongoing and the work should not be retried. The age
* threshold is determined by {@link
* StreamingDataflowWorkerOptions#getActiveWorkRefreshPeriodMillis}.
*/
private void refreshActiveWork() {
Map<String, List<Windmill.HeartbeatRequest>> heartbeats = new HashMap<>();
Instant refreshDeadline =
clock.get().minus(Duration.millis(options.getActiveWorkRefreshPeriodMillis()));

for (Map.Entry<String, ComputationState> entry : computationMap.entrySet()) {
heartbeats.put(entry.getKey(), entry.getValue().getKeyHeartbeats(refreshDeadline, sampler));
}

metricTrackingWindmillServer.refreshActiveWork(heartbeats);
}

private void invalidateStuckCommits() {
Instant stuckCommitDeadline =
clock.get().minus(Duration.millis(options.getStuckCommitDurationMillis()));
for (Map.Entry<String, ComputationState> entry : computationMap.entrySet()) {
entry.getValue().invalidateStuckCommits(stuckCommitDeadline);
}
}

private class HarnessDataProvider implements StatusDataProvider {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;

import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Asynchronously GetData requests to Streaming Engine for all sufficiently old active work and
* invalidates stuck commits.
*
* <p>This informs Windmill that processing is ongoing and the work should not be retried. The age
* threshold is determined by {@link #activeWorkRefreshPeriodMillis}
*/
@ThreadSafe
public abstract class ActiveWorkRefresher {
private static final Logger LOG = LoggerFactory.getLogger(ActiveWorkRefresher.class);

protected final Supplier<Instant> clock;
protected final int activeWorkRefreshPeriodMillis;
protected final Supplier<Collection<ComputationState>> computations;
protected final DataflowExecutionStateSampler sampler;
private final int stuckCommitDurationMillis;
private final ScheduledExecutorService activeWorkRefreshExecutor;

protected ActiveWorkRefresher(
Supplier<Instant> clock,
int activeWorkRefreshPeriodMillis,
int stuckCommitDurationMillis,
Supplier<Collection<ComputationState>> computations,
DataflowExecutionStateSampler sampler,
ScheduledExecutorService activeWorkRefreshExecutor) {
this.clock = clock;
this.activeWorkRefreshPeriodMillis = activeWorkRefreshPeriodMillis;
this.stuckCommitDurationMillis = stuckCommitDurationMillis;
this.computations = computations;
this.sampler = sampler;
this.activeWorkRefreshExecutor = activeWorkRefreshExecutor;
}

@SuppressWarnings("FutureReturnValueIgnored")
public void start() {
if (activeWorkRefreshPeriodMillis > 0) {
activeWorkRefreshExecutor.scheduleWithFixedDelay(
() -> {
try {
refreshActiveWork();
} catch (RuntimeException e) {
LOG.warn("Failed to refresh active work: ", e);
}
},
activeWorkRefreshPeriodMillis,
activeWorkRefreshPeriodMillis,
TimeUnit.MILLISECONDS);
}

if (stuckCommitDurationMillis > 0) {
int periodMillis = Math.max(stuckCommitDurationMillis / 10, 100);
activeWorkRefreshExecutor.scheduleWithFixedDelay(
this::invalidateStuckCommits, periodMillis, periodMillis, TimeUnit.MILLISECONDS);
}
}

public void stop() {
if (activeWorkRefreshPeriodMillis > 0 || stuckCommitDurationMillis > 0) {
activeWorkRefreshExecutor.shutdown();
try {
activeWorkRefreshExecutor.awaitTermination(300, TimeUnit.SECONDS);
} catch (InterruptedException e) {
activeWorkRefreshExecutor.shutdownNow();
}
}
}

private void invalidateStuckCommits() {
Instant stuckCommitDeadline = clock.get().minus(Duration.millis(stuckCommitDurationMillis));
for (ComputationState computationState : computations.get()) {
computationState.invalidateStuckCommits(stuckCommitDeadline);
}
}

protected abstract void refreshActiveWork();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
import org.joda.time.Instant;

/** Utility class for {@link ActiveWorkRefresher}. */
public final class ActiveWorkRefreshers {
public static ActiveWorkRefresher createDispatchedActiveWorkRefresher(
Supplier<Instant> clock,
int activeWorkRefreshPeriodMillis,
int stuckCommitDurationMillis,
Supplier<Collection<ComputationState>> computations,
DataflowExecutionStateSampler sampler,
Consumer<Map<String, List<HeartbeatRequest>>> activeWorkRefresherFn,
ScheduledExecutorService scheduledExecutorService) {
return new DispatchedActiveWorkRefresher(
clock,
activeWorkRefreshPeriodMillis,
stuckCommitDurationMillis,
computations,
sampler,
activeWorkRefresherFn,
scheduledExecutorService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.joda.time.Duration;
import org.joda.time.Instant;

final class DispatchedActiveWorkRefresher extends ActiveWorkRefresher {

private final Consumer<Map<String, List<Windmill.HeartbeatRequest>>> activeWorkRefresherFn;

DispatchedActiveWorkRefresher(
Supplier<Instant> clock,
int activeWorkRefreshPeriodMillis,
int stuckCommitDurationMillis,
Supplier<Collection<ComputationState>> computations,
DataflowExecutionStateSampler sampler,
Consumer<Map<String, List<Windmill.HeartbeatRequest>>> activeWorkRefresherFn,
ScheduledExecutorService scheduledExecutorService) {
super(
clock,
activeWorkRefreshPeriodMillis,
stuckCommitDurationMillis,
computations,
sampler,
scheduledExecutorService);
this.activeWorkRefresherFn = activeWorkRefresherFn;
}

@Override
protected void refreshActiveWork() {
Map<String, List<Windmill.HeartbeatRequest>> heartbeats = new HashMap<>();
Instant refreshDeadline = clock.get().minus(Duration.millis(activeWorkRefreshPeriodMillis));

for (ComputationState computationState : computations.get()) {
heartbeats.put(
computationState.getComputationId(),
computationState.getKeyHeartbeats(refreshDeadline, sampler));
}

activeWorkRefresherFn.accept(heartbeats);
}
}
Loading

0 comments on commit aebb34a

Please sign in to comment.