Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Oct 3, 2024
1 parent 01aee62 commit 745d87c
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 148 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -102,8 +101,11 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorker
/** Writes are guarded by synchronization, reads are lock free. */
private final AtomicReference<StreamingEngineConnectionState> connections;

@GuardedBy("this")
private long activeMetadataVersion;

@GuardedBy("metadataLock")
private long metadataVersion;
private long pendingMetadataVersion;

@GuardedBy("this")
private boolean started;
Expand Down Expand Up @@ -132,10 +134,18 @@ private FanOutStreamingEngineWorkerHarness(
new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build());
this.workerMetadataConsumer =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build());
new ThreadFactoryBuilder()
.setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME)
.setUncaughtExceptionHandler(
(t, e) ->
LOG.error(
"{} failed due to uncaught exception during execution.",
t.getName(),
e))
.build());
this.getWorkBudgetDistributor = getWorkBudgetDistributor;
this.totalGetWorkBudget = totalGetWorkBudget;
this.metadataVersion = Long.MIN_VALUE;
this.activeMetadataVersion = Long.MIN_VALUE;
this.getWorkerMetadataStream = Suppliers.memoize(createGetWorkerMetadataStream()::get);
this.workCommitterFactory = workCommitterFactory;
this.metadataLock = new Object();
Expand Down Expand Up @@ -259,9 +269,9 @@ private Supplier<GetWorkerMetadataStream> createGetWorkerMetadataStream(
private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
synchronized (metadataLock) {
// Only process versions greater than what we currently have to prevent double processing of
// metadata.
if (windmillEndpoints.version() > metadataVersion) {
metadataVersion = windmillEndpoints.version();
// metadata. workerMetadataConsumer is single-threaded so we maintain ordering.
if (windmillEndpoints.version() > pendingMetadataVersion) {
pendingMetadataVersion = windmillEndpoints.version();
workerMetadataConsumer.execute(() -> consumeWindmillWorkerEndpoints(windmillEndpoints));
}
}
Expand All @@ -272,11 +282,12 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
// queued up while a previous version of the windmillEndpoints were being consumed. Only consume
// the endpoints if they are the most current version.
synchronized (metadataLock) {
if (newWindmillEndpoints.version() < metadataVersion) {
if (newWindmillEndpoints.version() < pendingMetadataVersion) {
return;
}
}

long previousMetadataVersion = activeMetadataVersion;
LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
Expand All @@ -296,6 +307,11 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
connections.get());
connections.set(newConnectionsState);
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget);
activeMetadataVersion = newWindmillEndpoints.version();
LOG.info(
"Consumed new endpoints. previous metadata version: {}, current metadata version: {}",
previousMetadataVersion,
activeMetadataVersion);
}

/** Close the streams that are no longer valid asynchronously. */
Expand Down Expand Up @@ -329,25 +345,19 @@ private void closeStaleStreams(
createAndStartNewStreams(Collection<WindmillConnection> newWindmillConnections) {
ImmutableMap<WindmillConnection, WindmillStreamSender> currentStreams =
connections.get().windmillStreams();
CompletionStage<List<Pair<WindmillConnection, WindmillStreamSender>>>
connectionAndSenderFuture =
MoreFutures.allAsList(
newWindmillConnections.stream()
.map(
connection ->
MoreFutures.supplyAsync(
() ->
Pair.of(
connection,
Optional.ofNullable(currentStreams.get(connection))
.orElseGet(
() ->
createAndStartWindmillStreamSender(
connection))),
windmillStreamManager))
.collect(Collectors.toList()));

return connectionAndSenderFuture
return MoreFutures.allAsList(
newWindmillConnections.stream()
.map(
connection ->
MoreFutures.supplyAsync(
() ->
Pair.of(
connection,
Optional.ofNullable(currentStreams.get(connection))
.orElseGet(
() -> createAndStartWindmillStreamSender(connection))),
windmillStreamManager))
.collect(Collectors.toList()))
.thenApply(
connectionsAndSenders ->
connectionsAndSenders.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,9 @@ void closeAllStreams() {
}

@Override
public void adjustBudget(long itemsDelta, long bytesDelta) {
GetWorkBudget adjustment =
GetWorkBudget.builder().setItems(itemsDelta).setBytes(bytesDelta).build();
getWorkBudget.getAndSet(adjustment);
public void setBudget(long items, long bytes) {
GetWorkBudget adjustment = GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
getWorkBudget.set(adjustment);
if (started.get()) {
getWorkStream.get().setBudget(adjustment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ private static Optional<HostAndPort> tryParseDirectEndpointIntoIpV6Address(
directEndpointAddress.getHostAddress(), (int) endpointProto.getPort()));
}

/** Version of the endpoints which increases with every modification. */
public abstract long version();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ protected final void startStream() {
}
}

// We were never able to start the stream, remove it from the stream registry.
// We were never able to start the stream, remove it from the stream registry. Otherwise, it is
// removed when closed.
streamRegistry.remove(this);
}

Expand Down Expand Up @@ -396,7 +397,7 @@ public void onError(Throwable throwable) {
}

@Override
public synchronized void onCompleted() {
public void onCompleted() {
delegate().onCompleted();
}
}
Expand Down Expand Up @@ -473,8 +474,8 @@ private void recordStreamStatus(Status status) {
: "received response " + (nowMillis - lastResponseTimeMs.get()) + "ms ago";

logger.debug(
"{} has been restarted {} times. Streaming Windmill RPC Error Count: {}; last was: {} with status: {}."
+ " created {}ms ago; {}. This is normal with autoscaling.",
"{} has been restarted {} times. Streaming Windmill RPC Error Count: {}; last was: {}"
+ " with status: {}. created {}ms ago; {}. This is normal with autoscaling.",
AbstractWindmillStream.this.getClass(),
currentRestartCount,
currentErrorCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest
synchronized (this) {
pending.put(id, pendingRequest);
for (int i = 0;
i < serializedCommit.size() && !isShutdown();
i < serializedCommit.size();
i += AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE) {
int end = i + AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE;
ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public <T extends GetWorkBudgetSpender> void distributeBudget(
GetWorkBudget remaining = getWorkBudgetSpender.remainingBudget();
if (isBelowFiftyPercentOfTarget(remaining, desiredBudget)) {
GetWorkBudget adjustment = desiredBudget.subtract(remaining);
getWorkBudgetSpender.adjustBudget(adjustment);
getWorkBudgetSpender.setBudget(adjustment);
}
} else {
getWorkBudgetSpender.adjustBudget(desiredBudget);
getWorkBudgetSpender.setBudget(desiredBudget);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
* org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget}
*/
public interface GetWorkBudgetSpender {
void adjustBudget(long itemsDelta, long bytesDelta);
void setBudget(long items, long bytes);

default void adjustBudget(GetWorkBudget adjustment) {
adjustBudget(adjustment.items(), adjustment.bytes());
default void setBudget(GetWorkBudget adjustment) {
setBudget(adjustment.items(), adjustment.bytes());
}

GetWorkBudget remainingBudget();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ private void expectNumDistributions(int numBudgetDistributionsExpected) {
@Override
public <T extends GetWorkBudgetSpender> void distributeBudget(
ImmutableCollection<T> streams, GetWorkBudget getWorkBudget) {
streams.forEach(stream -> stream.adjustBudget(getWorkBudget.items(), getWorkBudget.bytes()));
streams.forEach(stream -> stream.setBudget(getWorkBudget.items(), getWorkBudget.bytes()));
getWorkBudgetDistributorTriggered.countDown();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private static GetWorkBudgetSpender createGetWorkBudgetOwnerWithRemainingBudgetO
return spy(
new GetWorkBudgetSpender() {
@Override
public void adjustBudget(long itemsDelta, long bytesDelta) {}
public void setBudget(long items, long bytes) {}

@Override
public GetWorkBudget remainingBudget() {
Expand Down Expand Up @@ -93,7 +93,7 @@ public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHig
ImmutableList.of(getWorkBudgetSpender),
GetWorkBudget.builder().setItems(10L).setBytes(10L).build());

verify(getWorkBudgetSpender, never()).adjustBudget(anyLong(), anyLong());
verify(getWorkBudgetSpender, never()).setBudget(anyLong(), anyLong());
}

@Test
Expand All @@ -108,7 +108,7 @@ public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHig
ImmutableList.of(getWorkBudgetSpender),
GetWorkBudget.builder().setItems(20L).setBytes(20L).build());

verify(getWorkBudgetSpender, never()).adjustBudget(anyLong(), anyLong());
verify(getWorkBudgetSpender, never()).setBudget(anyLong(), anyLong());
}

@Test
Expand All @@ -123,7 +123,7 @@ public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHig
.distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget);

verify(getWorkBudgetSpender, times(1))
.adjustBudget(
.setBudget(
eq(totalGetWorkBudget.items() - streamRemainingBudget.items()),
eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes()));
}
Expand All @@ -141,7 +141,7 @@ public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHig
.distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget);

verify(getWorkBudgetSpender, times(1))
.adjustBudget(
.setBudget(
eq(
totalGetWorkBudget.items()
- streamRemainingBudget.items()
Expand All @@ -160,7 +160,7 @@ public void testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLo
.distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget);

verify(getWorkBudgetSpender, times(1))
.adjustBudget(
.setBudget(
eq(totalGetWorkBudget.items() - streamRemainingBudget.items()),
eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes()));
}
Expand All @@ -179,7 +179,7 @@ public void testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLo
.distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget);

verify(getWorkBudgetSpender, times(1))
.adjustBudget(
.setBudget(
eq(totalGetWorkBudget.items() - streamRemainingBudget.items()),
eq(
totalGetWorkBudget.bytes()
Expand All @@ -206,7 +206,7 @@ public void testDistributeBudget_distributesBudgetEvenlyIfPossible() {
streams.forEach(
stream ->
verify(stream, times(1))
.adjustBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream)));
.setBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream)));
}

@Test
Expand All @@ -228,7 +228,7 @@ public void testDistributeBudget_distributesFairlyWhenNotEven() {
streams.forEach(
stream ->
verify(stream, times(1))
.adjustBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream)));
.setBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream)));
}

@Test
Expand Down Expand Up @@ -258,6 +258,6 @@ public void testDistributeBudget_distributesFairlyWhenNotEven() {
streams.forEach(
stream ->
verify(stream, times(1))
.adjustBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream)));
.setBudget(eq(itemsAndBytesPerStream), eq(itemsAndBytesPerStream)));
}
}

0 comments on commit 745d87c

Please sign in to comment.