Skip to content

Commit

Permalink
deflake FanOutStreamingEngineWorkerHarnessTest.testOnNewWorkerMetadat…
Browse files Browse the repository at this point in the history
…a_redistributesBudget()
  • Loading branch information
m-trieu committed Sep 27, 2024
1 parent d0a199d commit 01aee62
Showing 1 changed file with 11 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -71,7 +69,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -333,11 +330,8 @@ public void testOnNewWorkerMetadata_redistributesBudget() throws InterruptedExce
.putAllGlobalDataEndpoints(DEFAULT)
.build();

List<WorkerMetadataResponse> workerMetadataResponses =
Lists.newArrayList(firstWorkerMetadata, secondWorkerMetadata);

TestGetWorkBudgetDistributor getWorkBudgetDistributor =
spy(new TestGetWorkBudgetDistributor(workerMetadataResponses.size()));
spy(new TestGetWorkBudgetDistributor(1));
fanOutStreamingEngineWorkProvider =
newFanOutStreamingEngineWorkerHarness(
GetWorkBudget.builder().setItems(1).setBytes(1).build(),
Expand All @@ -346,15 +340,13 @@ public void testOnNewWorkerMetadata_redistributesBudget() throws InterruptedExce

getWorkerMetadataReady.await();

// Make sure we are injecting the metadata from smallest to largest.
workerMetadataResponses.stream()
.sorted(Comparator.comparingLong(WorkerMetadataResponse::getMetadataVersion))
.forEach(fakeGetWorkerMetadataStub::injectWorkerMetadata);

fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata);
waitForWorkerMetadataToBeConsumed(getWorkBudgetDistributor);
getWorkBudgetDistributor.expectNumDistributions(1);
fakeGetWorkerMetadataStub.injectWorkerMetadata(secondWorkerMetadata);
waitForWorkerMetadataToBeConsumed(getWorkBudgetDistributor);

verify(getWorkBudgetDistributor, atLeast(workerMetadataResponses.size()))
.distributeBudget(any(), any());
verify(getWorkBudgetDistributor, times(2)).distributeBudget(any(), any());
}

private void waitForWorkerMetadataToBeConsumed(
Expand Down Expand Up @@ -404,7 +396,7 @@ private void injectWorkerMetadata(WorkerMetadataResponse response) {
}

private static class TestGetWorkBudgetDistributor implements GetWorkBudgetDistributor {
private final CountDownLatch getWorkBudgetDistributorTriggered;
private CountDownLatch getWorkBudgetDistributorTriggered;

private TestGetWorkBudgetDistributor(int numBudgetDistributionsExpected) {
this.getWorkBudgetDistributorTriggered = new CountDownLatch(numBudgetDistributionsExpected);
Expand All @@ -415,6 +407,10 @@ private void waitForBudgetDistribution() throws InterruptedException {
getWorkBudgetDistributorTriggered.await(5, TimeUnit.SECONDS);
}

private void expectNumDistributions(int numBudgetDistributionsExpected) {
this.getWorkBudgetDistributorTriggered = new CountDownLatch(numBudgetDistributionsExpected);
}

@Override
public <T extends GetWorkBudgetSpender> void distributeBudget(
ImmutableCollection<T> streams, GetWorkBudget getWorkBudget) {
Expand Down

0 comments on commit 01aee62

Please sign in to comment.