Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1732,9 +1732,7 @@ private boolean startTask(ConnectorTaskId taskId) {
throw ConnectUtils.maybeWrap(cause, "Failed to perform round of zombie fencing");
}
},
() -> {
verifyTaskGenerationAndOwnership(taskId, taskGeneration);
}
() -> verifyTaskGenerationAndOwnership(taskId, taskGeneration)
);
} else {
return worker.startSourceTask(
Expand Down Expand Up @@ -1941,8 +1939,8 @@ private void reconfigureConnector(final String connName, final Callback<Void> cb
}
}

// Currently unused, but will be invoked by exactly-once source tasks after they have successfully
// initialized their transactional producer
// Invoked by exactly-once worker source tasks after they have successfully initialized their transactional
// producer to ensure that it is still safe to bring up the task
private void verifyTaskGenerationAndOwnership(ConnectorTaskId id, int initialTaskGen) {
log.debug("Reading to end of config topic to ensure it is still safe to bring up source task {} with exactly-once support", id);
if (!refreshConfigSnapshot(Long.MAX_VALUE)) {
Expand Down