From f2920b2477d77fd52d3d828012c600e4c2ed20a5 Mon Sep 17 00:00:00 2001 From: sunshujie1990 Date: Wed, 30 Mar 2022 20:58:58 +0800 Subject: [PATCH] KAFKA-13719: Fix connector restart cause duplicate tasks (#11869) Reviewers: Mickael Maison , Luke Chen , Chris Egerton Co-authored-by: Chris Egerton --- .../runtime/distributed/DistributedHerder.java | 6 +++--- .../distributed/DistributedHerderTest.java | 15 ++++++++------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 357796c9d1af..65a8e7e15b81 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -1180,8 +1180,8 @@ protected synchronized void doRestartConnectorAndTasks(RestartRequest request) { } } if (restartTasks) { - log.debug("Restarting {} of {} tasks for {}", plan.restartTaskCount(), plan.totalTaskCount(), request); - plan.taskIdsToRestart().forEach(taskId -> { + log.debug("Restarting {} of {} tasks for {}", assignedIdsToRestart.size(), plan.totalTaskCount(), request); + assignedIdsToRestart.forEach(taskId -> { try { if (startTask(taskId)) { log.info("Task '{}' restart successful", taskId); @@ -1192,7 +1192,7 @@ protected synchronized void doRestartConnectorAndTasks(RestartRequest request) { log.error("Task '{}' restart failed", taskId, t); } }); - log.debug("Restarted {} of {} tasks for {} as requested", plan.restartTaskCount(), plan.totalTaskCount(), request); + log.debug("Restarted {} of {} tasks for {} as requested", assignedIdsToRestart.size(), plan.totalTaskCount(), request); } log.info("Completed {}", plan); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 6ddf04776bfd..996c8407c6ec 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -1289,24 +1289,25 @@ public void testDoRestartConnectorAndTasksOnlyConnector() { @Test public void testDoRestartConnectorAndTasksOnlyTasks() { - ConnectorTaskId taskId = new ConnectorTaskId(CONN1, 0); RestartRequest restartRequest = new RestartRequest(CONN1, false, true); RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class); EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes(); EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(true).anyTimes(); - EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(taskId)).anyTimes(); - EasyMock.expect(restartPlan.restartTaskCount()).andReturn(1).anyTimes(); - EasyMock.expect(restartPlan.totalTaskCount()).andReturn(1).anyTimes(); + // The connector has three tasks + EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Arrays.asList(TASK0, TASK1, TASK2)).anyTimes(); + EasyMock.expect(restartPlan.restartTaskCount()).andReturn(3).anyTimes(); + EasyMock.expect(restartPlan.totalTaskCount()).andReturn(3).anyTimes(); EasyMock.expect(herder.buildRestartPlan(restartRequest)).andReturn(Optional.of(restartPlan)).anyTimes(); herder.assignment = PowerMock.createMock(ExtendedAssignment.class); EasyMock.expect(herder.assignment.connectors()).andReturn(Collections.emptyList()).anyTimes(); - EasyMock.expect(herder.assignment.tasks()).andReturn(Collections.singletonList(taskId)).anyTimes(); + // But only one task is assigned to this worker + EasyMock.expect(herder.assignment.tasks()).andReturn(Collections.singletonList(TASK0)).anyTimes(); - worker.stopAndAwaitTasks(Collections.singletonList(taskId)); + worker.stopAndAwaitTasks(Collections.singletonList(TASK0)); PowerMock.expectLastCall(); - herder.onRestart(taskId); + herder.onRestart(TASK0); EasyMock.expectLastCall(); worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),