Skip to content

Commit

Permalink
Fix flakiness in MasterServiceTests.testThrottlingForMultipleTaskTypes (
Browse files Browse the repository at this point in the history
opensearch-project#8901)

* Fix flakiness in MasterServiceTests.testThrottlingForMultipleTaskTypes

The test configured a [timeout duration of zero][1] for certain tasks
and asserted that all tasks were throttled or timed out. This is not a
valid assertion because it is possible for a task to complete before the
[asynchronous timeout operation runs][2], which means the task would
complete successfully. The fix is to adjust the assertion to allow for
successful tasks in this case.

[1]: https://github.com/opensearch-project/OpenSearch/blob/60985bc300d9eafd36c1ab25d46235e1c925c565/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java#L941
[2]: https://github.com/opensearch-project/OpenSearch/blob/9fc3f4096958159ec9b53012fc7ced19fd793e1b/server/src/main/java/org/opensearch/common/util/concurrent/PrioritizedOpenSearchThreadPoolExecutor.java#L266

Signed-off-by: Andrew Ross <andrross@amazon.com>

* Add a deterministic test case for timeout

Signed-off-by: Andrew Ross <andrross@amazon.com>

---------

Signed-off-by: Andrew Ross <andrross@amazon.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
andrross authored and kaushalmahi12 committed Sep 12, 2023
1 parent e79df00 commit 1a7fa50
Showing 1 changed file with 76 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.carrotsearch.randomizedtesting.annotations.Timeout;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.anyOf;
Expand Down Expand Up @@ -863,6 +865,7 @@ public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey(
AtomicInteger throttledTask3 = new AtomicInteger();
AtomicInteger succeededTask1 = new AtomicInteger();
AtomicInteger succeededTask2 = new AtomicInteger();
AtomicInteger succeededTask3 = new AtomicInteger();
AtomicInteger timedOutTask3 = new AtomicInteger();

final ClusterStateTaskListener listener = new ClusterStateTaskListener() {
Expand All @@ -880,6 +883,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
succeededTask1.incrementAndGet();
} else if (source.equals(task2)) {
succeededTask2.incrementAndGet();
} else if (source.equals(task3)) {
succeededTask3.incrementAndGet();
}
latch.countDown();
}
Expand Down Expand Up @@ -955,7 +960,7 @@ public void run() {
assertEquals(numberOfTask1, throttledTask1.get() + succeededTask1.get());
assertEquals(numberOfTask2, succeededTask2.get());
assertEquals(0, throttledTask2.get());
assertEquals(numberOfTask3, throttledTask3.get() + timedOutTask3.get());
assertEquals(numberOfTask3, throttledTask3.get() + timedOutTask3.get() + succeededTask3.get());
masterService.close();
}

Expand Down Expand Up @@ -1378,6 +1383,76 @@ public void testDeprecatedMasterServiceUpdateTaskThreadName() {
assertThrows(AssertionError.class, () -> MasterService.assertClusterManagerUpdateThread());
}

@Timeout(millis = 5_000)
public void testTaskTimeout() throws InterruptedException {
try (ClusterManagerService clusterManagerService = createClusterManagerService(true)) {
final AtomicInteger failureCount = new AtomicInteger();
final AtomicInteger successCount = new AtomicInteger();
final CountDownLatch taskStartLatch = new CountDownLatch(1);
final CountDownLatch blockingTaskLatch = new CountDownLatch(1);
final CountDownLatch timeoutLatch = new CountDownLatch(1);
final ClusterStateTaskListener blockingListener = new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
fail("Unexpected failure");
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
successCount.incrementAndGet();
taskStartLatch.countDown();
try {
blockingTaskLatch.await();
} catch (InterruptedException e) {
fail("Interrupted");
}
}
};
final ClusterStateTaskListener timeoutListener = new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
assertEquals("timeout", source);
failureCount.incrementAndGet();
timeoutLatch.countDown();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail("Unexpected success");
}
};

final ClusterStateTaskExecutor<Object> executor = (currentState, tasks) -> ClusterStateTaskExecutor.ClusterTasksResult.builder()
.successes(tasks)
.build(currentState);

// start a task and wait for it to start and block on the clusterStateProcessed callback
clusterManagerService.submitStateUpdateTask(
"success",
new Object(),
ClusterStateTaskConfig.build(randomFrom(Priority.values())),
executor,
blockingListener
);
taskStartLatch.await();

// start a second task that is guaranteed to timeout as the first task is still running
clusterManagerService.submitStateUpdateTask(
"timeout",
new Object(),
ClusterStateTaskConfig.build(randomFrom(Priority.values()), TimeValue.timeValueMillis(1L)),
executor,
timeoutListener
);

// wait for the timeout to happen, then unblock and assert one success and one failure
timeoutLatch.await();
blockingTaskLatch.countDown();
assertEquals(1, failureCount.get());
assertEquals(1, successCount.get());
}
}

/**
* Returns the cluster state that the cluster-manager service uses (and that is provided by the discovery layer)
*/
Expand Down

0 comments on commit 1a7fa50

Please sign in to comment.