diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java index edc9edeed4f36..2337115564c36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java @@ -64,11 +64,13 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -169,6 +171,9 @@ public static void shutdownMiniCluster() throws Exception { @Test public void test() throws Exception { + // this captures variables communicated across instances, recoveries, etc. + TestScript.reset(); + final int numEvents1 = 200; final int numEvents2 = 5; final int delay1 = 1; @@ -296,19 +301,23 @@ private static final class EventSendingCoordinator implements OperatorCoordinato private final int delay; private final int maxNumber; + private final int failAtMessage; private int nextNumber; private CompletableFuture requestedCheckpoint; private CompletableFuture nextToComplete; - private final int failAtMessage; - private boolean failedBefore; - - private final ArrayDeque recoveredTaskRunning = new ArrayDeque<>(); - private SubtaskGateway subtaskGateway; private boolean workLoopRunning; + /** + * This contains all variables that are necessary to track the progress of the test, and + * which need to be tracked across instances of this coordinator (some scheduler + * implementations may re-instantiate the ExecutionGraph and the coordinators around global + * failures). + */ + private final TestScript testScript; + private EventSendingCoordinator(Context context, String name, int numEvents, int delay) { checkArgument(delay > 0); checkArgument(numEvents >= 3); @@ -316,6 +325,9 @@ private EventSendingCoordinator(Context context, String name, int numEvents, int this.context = context; this.maxNumber = numEvents; this.delay = delay; + + this.testScript = TestScript.getForOperator(name); + this.mailboxExecutor = Executors.newSingleThreadExecutor( new DispatcherThreadFactory( @@ -349,17 +361,12 @@ public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exc String.format("Don't recognize event '%s' from task %d.", event, subtask)); } - // We complete all events that were enqueued. We may need to complete - // multiple ones here, because it can happen that after a failure no real recovery - // happens that results in an event being sent (and this method being called), but that - // immediately another failure comes, triggered by the other operator coordinator (or - // its task). - synchronized (recoveredTaskRunning) { - for (CountDownLatch latch : recoveredTaskRunning) { - latch.countDown(); - } - recoveredTaskRunning.clear(); - } + // this unblocks all the delayed actions that where kicked off while the previous + // task was still running (if there was a previous task). this is part of simulating + // the extreme race where the coordinator thread stalls for so long that a new + // task execution attempt gets deployed before the last events targeted at the old task + // where sent. + testScript.signalRecoveredTaskReady(); // first, we hand this over to the mailbox thread, so we preserve order on operations, // even if the action is only to do a thread safe scheduling into the scheduledExecutor @@ -375,13 +382,13 @@ public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exc @Override public void subtaskFailed(int subtask, @Nullable Throwable reason) { - // we need to create and enqueue this outside the mailbox, so that the - // enqueuing is strictly ordered with the completion (which also happens outside - // the mail box executor). + // we need to create and register this outside the mailbox so that the + // registration is not affected by the artificial stall on the mailbox, but happens + // strictly before the tasks are restored and the operator events are received (to + // trigger the latches) which also happens outside the mailbox. + final CountDownLatch successorIsRunning = new CountDownLatch(1); - synchronized (recoveredTaskRunning) { - recoveredTaskRunning.addLast(successorIsRunning); - } + testScript.registerHookToNotifyAfterTaskRecovered(successorIsRunning); // simulate a heavy thread race here: the mailbox has a last enqueued action before the // cancellation is processed. But through a race, the mailbox freezes for a while and in @@ -483,7 +490,12 @@ private void executeSingleAction() { System.exit(-1); } - // schedule the next step + // schedule the next step. we do this here, after the previous step concluded, rather + // than scheduling a periodic action. Otherwise, the periodic task would enqueue many + // actions while the mailbox stalls and process them all instantaneously after the + // un-stalling. That wouldn't break the test, but it voids the differences in event + // sending delays between the different coordinators, which are part of provoking the + // situation that requires checkpoint alignment between the coordinators' event streams. scheduleSingleAction(); } @@ -515,8 +527,8 @@ private void sendNextEvent() { } private void checkWhetherToTriggerFailure() { - if (nextNumber >= failAtMessage && !failedBefore) { - failedBefore = true; + if (nextNumber >= failAtMessage && !testScript.hasAlreadyFailed()) { + testScript.recordHasFailed(); context.failJob(new Exception("test failure")); } } @@ -622,6 +634,54 @@ private void restoreState(List target) throws Exception { } } + // ------------------------------------------------------------------------ + // dedicated class to hold the "test script" + // ------------------------------------------------------------------------ + + private static final class TestScript { + + private static final Map MAP_FOR_OPERATOR = new HashMap<>(); + + static TestScript getForOperator(String operatorName) { + return MAP_FOR_OPERATOR.computeIfAbsent(operatorName, (key) -> new TestScript()); + } + + static void reset() { + MAP_FOR_OPERATOR.clear(); + } + + private final Collection recoveredTaskRunning = new ArrayList<>(); + private boolean failedBefore; + + void recordHasFailed() { + this.failedBefore = true; + } + + boolean hasAlreadyFailed() { + return failedBefore; + } + + void registerHookToNotifyAfterTaskRecovered(CountDownLatch latch) { + synchronized (recoveredTaskRunning) { + recoveredTaskRunning.add(latch); + } + } + + void signalRecoveredTaskReady() { + // We complete all latches that were registered. We may need to complete + // multiple ones here, because it can happen that after a previous failure, the next + // executions fails immediately again, before even registering at the coordinator. + // in that case, we have multiple latches from multiple failure notifications waiting + // to be completed. + synchronized (recoveredTaskRunning) { + for (CountDownLatch latch : recoveredTaskRunning) { + latch.countDown(); + } + recoveredTaskRunning.clear(); + } + } + } + // ------------------------------------------------------------------------ // serialization shenannigans // ------------------------------------------------------------------------