-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-11364][tests] Port TaskManagerFailsITCase to new code base #7676
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsBot commandsThe @flinkbot bot supports the following commands:
|
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Override | ||
| public void startTaskExecutor(boolean localCommunication) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like user friendly API. Shouldn't MiniCluster set this flag depending on the configuration? Wouldn't it be enough to expose a signature such as public void startTaskExecutor()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, MiniCluster cannot know whether to use local communication or not at start time, because TestingMiniCluster allows to start new TaskExecutors. Thus, the only option would be to always set localCommunication to false in the case of the MiniCluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I introduced a new method useLocalCommunication which can be overriden by the TestingMiniCluster to always set local communication to false.
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
Show resolved
Hide resolved
|
Thanks for the review @GJL. I've addressed your comments and pushed a fixup. |
e921816 to
248f105
Compare
| } | ||
|
|
||
| private void runJobFailureWhenTaskExecutorTerminatesTest( | ||
| Supplier<HeartbeatServices> heartbeatSupplier, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does HeartbeatServices have to be lazily supplied?
| Supplier<HeartbeatServices> heartbeatSupplier, | |
| HeartbeatServices heartbeatServices, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is not correct. Will change.
| jobGraph, | ||
| haServices, | ||
| new TestingJobManagerSharedServicesBuilder().build(), | ||
| heartbeatSupplier.get(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| heartbeatSupplier.get(), | |
| heartbeatServices, |
| public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception { | ||
| final AtomicBoolean respondToHeartbeats = new AtomicBoolean(true); | ||
| runJobFailureWhenTaskExecutorTerminatesTest( | ||
| () -> fastHeartbeatServices, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| () -> fastHeartbeatServices, | |
| fastHeartbeatServices, |
| @Test | ||
| public void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception { | ||
| runJobFailureWhenTaskExecutorTerminatesTest( | ||
| () -> heartbeatServices, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| () -> heartbeatServices, | |
| heartbeatServices, |
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
Show resolved
Hide resolved
| private static final class TestingOnCompletionActions implements OnCompletionActions { | ||
|
|
||
| private final CompletableFuture<ArchivedExecutionGraph> jobReachedGloballyTerminalStateFuture = new CompletableFuture<>(); | ||
| private final CompletableFuture<Void> jobFinishedByOtherFuture = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unused field is for future extensions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
| * Create a new {@link TerminatingFatalErrorHandler} for the {@link TaskExecutor} with | ||
| * the given index. | ||
| * | ||
| * @param index into the {{@link #taskManagers}} collection to identify the correct {@link TaskExecutor}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the double curly braces {{}} intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unintended. will change it.
| } | ||
|
|
||
| @Override | ||
| public void startTaskExecutor(boolean localCommunication) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it crucial for testing to be able to set the right localCommunication flag? If yes, a method overload that sets localCommunication to false would have been enough but I am not insisting on it.
edit: alternatively always use false with no option to override (if possible)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you suggesting to always start the TaskExecutors with localCommunication = false? Or only for the TestingMiniCluster? The latter should now be the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this functionality was needed for #7690.
|
Thanks for the second round of review @GJL. I addressed your comments except for the last one for which I didn't understand your proposal yet. |
|
@flinkbot approve all |
TaskExecutorITCase is actually covered by TaskExecutorTest#testOfferSlotToJobMasterAfterTimeout.
- "detect a failing task manager" --> JobMaster#testHeartbeatTimeoutWithTaskManager - "handle gracefully failing task manager" --> JobMasterTest#testJobFailureWhenGracefulTaskExecutorTermination - "handle hard failing task manager" --> JobMasterTest#testJobFailureWhenTaskExecutorHeartbeatTimeout - "go into a clean state in case of a TaskManager failure" --> TaskExecutorITCase#testNewTaskExecutorJoinsCluster This closes apache#7676.
9aa6dd3 to
2172ab3
Compare
|
Thanks for the review @GJL. Merging once Travis gives green light. |
- "detect a failing task manager" --> JobMaster#testHeartbeatTimeoutWithTaskManager - "handle gracefully failing task manager" --> JobMasterTest#testJobFailureWhenGracefulTaskExecutorTermination - "handle hard failing task manager" --> JobMasterTest#testJobFailureWhenTaskExecutorHeartbeatTimeout - "go into a clean state in case of a TaskManager failure" --> TaskExecutorITCase#testNewTaskExecutorJoinsCluster This closes #7676.
What is the purpose of the change
Port
TaskManagerFailsITCaseto new code base.Brief change log
"detect a failing task manager" --> JobMaster#testHeartbeatTimeoutWithTaskManager
"handle gracefully failing task manager" --> JobMasterTest#testJobFailureWhenGracefulTaskExecutorTermination
"handle hard failing task manager" --> JobMasterTest#testJobFailureWhenTaskExecutorHeartbeatTimeout
"go into a clean state in case of a TaskManager failure" --> TaskExecutorITCase#testNewTaskExecutorJoinsCluster
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation