Skip to content

Commit

Permalink
KAFKA-13719: Fix connector restart cause duplicate tasks (#11869)
Browse files Browse the repository at this point in the history
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Luke Chen <showuon@gmail.com>, Chris Egerton <fearthecellos@gmail.com>
Co-authored-by: Chris Egerton <fearthecellos@gmail.com>
  • Loading branch information
2 people authored and mimaison committed Mar 30, 2022
1 parent ecff741 commit f2920b2
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1180,8 +1180,8 @@ protected synchronized void doRestartConnectorAndTasks(RestartRequest request) {
}
}
if (restartTasks) {
log.debug("Restarting {} of {} tasks for {}", plan.restartTaskCount(), plan.totalTaskCount(), request);
plan.taskIdsToRestart().forEach(taskId -> {
log.debug("Restarting {} of {} tasks for {}", assignedIdsToRestart.size(), plan.totalTaskCount(), request);
assignedIdsToRestart.forEach(taskId -> {
try {
if (startTask(taskId)) {
log.info("Task '{}' restart successful", taskId);
Expand All @@ -1192,7 +1192,7 @@ protected synchronized void doRestartConnectorAndTasks(RestartRequest request) {
log.error("Task '{}' restart failed", taskId, t);
}
});
log.debug("Restarted {} of {} tasks for {} as requested", plan.restartTaskCount(), plan.totalTaskCount(), request);
log.debug("Restarted {} of {} tasks for {} as requested", assignedIdsToRestart.size(), plan.totalTaskCount(), request);
}
log.info("Completed {}", plan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1289,24 +1289,25 @@ public void testDoRestartConnectorAndTasksOnlyConnector() {

@Test
public void testDoRestartConnectorAndTasksOnlyTasks() {
ConnectorTaskId taskId = new ConnectorTaskId(CONN1, 0);
RestartRequest restartRequest = new RestartRequest(CONN1, false, true);
RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes();
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(true).anyTimes();
EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(taskId)).anyTimes();
EasyMock.expect(restartPlan.restartTaskCount()).andReturn(1).anyTimes();
EasyMock.expect(restartPlan.totalTaskCount()).andReturn(1).anyTimes();
// The connector has three tasks
EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Arrays.asList(TASK0, TASK1, TASK2)).anyTimes();
EasyMock.expect(restartPlan.restartTaskCount()).andReturn(3).anyTimes();
EasyMock.expect(restartPlan.totalTaskCount()).andReturn(3).anyTimes();
EasyMock.expect(herder.buildRestartPlan(restartRequest)).andReturn(Optional.of(restartPlan)).anyTimes();

herder.assignment = PowerMock.createMock(ExtendedAssignment.class);
EasyMock.expect(herder.assignment.connectors()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(herder.assignment.tasks()).andReturn(Collections.singletonList(taskId)).anyTimes();
// But only one task is assigned to this worker
EasyMock.expect(herder.assignment.tasks()).andReturn(Collections.singletonList(TASK0)).anyTimes();

worker.stopAndAwaitTasks(Collections.singletonList(taskId));
worker.stopAndAwaitTasks(Collections.singletonList(TASK0));
PowerMock.expectLastCall();

herder.onRestart(taskId);
herder.onRestart(TASK0);
EasyMock.expectLastCall();

worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
Expand Down

0 comments on commit f2920b2

Please sign in to comment.