diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index ded833da59914..388bfa4218a73 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -1732,9 +1732,7 @@ private boolean startTask(ConnectorTaskId taskId) { throw ConnectUtils.maybeWrap(cause, "Failed to perform round of zombie fencing"); } }, - () -> { - verifyTaskGenerationAndOwnership(taskId, taskGeneration); - } + () -> verifyTaskGenerationAndOwnership(taskId, taskGeneration) ); } else { return worker.startSourceTask( @@ -1941,8 +1939,8 @@ private void reconfigureConnector(final String connName, final Callback cb } } - // Currently unused, but will be invoked by exactly-once source tasks after they have successfully - // initialized their transactional producer + // Invoked by exactly-once worker source tasks after they have successfully initialized their transactional + // producer to ensure that it is still safe to bring up the task private void verifyTaskGenerationAndOwnership(ConnectorTaskId id, int initialTaskGen) { log.debug("Reading to end of config topic to ensure it is still safe to bring up source task {} with exactly-once support", id); if (!refreshConfigSnapshot(Long.MAX_VALUE)) {