diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java index 39d8cc2b35a0f..747ba10b3c254 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java @@ -30,9 +30,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Comparator; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -71,7 +69,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.junit.After; import org.junit.Before; @@ -333,11 +330,8 @@ public void testOnNewWorkerMetadata_redistributesBudget() throws InterruptedExce .putAllGlobalDataEndpoints(DEFAULT) .build(); - List workerMetadataResponses = - Lists.newArrayList(firstWorkerMetadata, secondWorkerMetadata); - TestGetWorkBudgetDistributor getWorkBudgetDistributor = - spy(new TestGetWorkBudgetDistributor(workerMetadataResponses.size())); + spy(new TestGetWorkBudgetDistributor(1)); fanOutStreamingEngineWorkProvider = newFanOutStreamingEngineWorkerHarness( GetWorkBudget.builder().setItems(1).setBytes(1).build(), @@ -346,15 +340,13 @@ public void testOnNewWorkerMetadata_redistributesBudget() throws InterruptedExce getWorkerMetadataReady.await(); - // Make sure we are injecting the metadata from smallest to largest. - workerMetadataResponses.stream() - .sorted(Comparator.comparingLong(WorkerMetadataResponse::getMetadataVersion)) - .forEach(fakeGetWorkerMetadataStub::injectWorkerMetadata); - + fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata); + waitForWorkerMetadataToBeConsumed(getWorkBudgetDistributor); + getWorkBudgetDistributor.expectNumDistributions(1); + fakeGetWorkerMetadataStub.injectWorkerMetadata(secondWorkerMetadata); waitForWorkerMetadataToBeConsumed(getWorkBudgetDistributor); - verify(getWorkBudgetDistributor, atLeast(workerMetadataResponses.size())) - .distributeBudget(any(), any()); + verify(getWorkBudgetDistributor, times(2)).distributeBudget(any(), any()); } private void waitForWorkerMetadataToBeConsumed( @@ -404,7 +396,7 @@ private void injectWorkerMetadata(WorkerMetadataResponse response) { } private static class TestGetWorkBudgetDistributor implements GetWorkBudgetDistributor { - private final CountDownLatch getWorkBudgetDistributorTriggered; + private CountDownLatch getWorkBudgetDistributorTriggered; private TestGetWorkBudgetDistributor(int numBudgetDistributionsExpected) { this.getWorkBudgetDistributorTriggered = new CountDownLatch(numBudgetDistributionsExpected); @@ -415,6 +407,10 @@ private void waitForBudgetDistribution() throws InterruptedException { getWorkBudgetDistributorTriggered.await(5, TimeUnit.SECONDS); } + private void expectNumDistributions(int numBudgetDistributionsExpected) { + this.getWorkBudgetDistributorTriggered = new CountDownLatch(numBudgetDistributionsExpected); + } + @Override public void distributeBudget( ImmutableCollection streams, GetWorkBudget getWorkBudget) {