-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Investigate potential race conditions in KafkaSupervisor #5919
Comments
I would add some more details. Here is a snippet of private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int groupId, final boolean finalize)
{
final TaskGroup taskGroup = taskGroups.get(groupId);
...
return Futures.transform(
Futures.successfulAsList(pauseFutures), new Function<List<Map<Integer, Long>>, Map<Integer, Long>>()
{
@Nullable
@Override
public Map<Integer, Long> apply(List<Map<Integer, Long>> input)
{
// 3) Build a map of the highest offset read by any task in the group for each partition
final Map<Integer, Long> endOffsets = new HashMap<>();
for (int i = 0; i < input.size(); i++) {
Map<Integer, Long> result = input.get(i);
if (result == null || result.isEmpty()) { // kill tasks that didn't return a value
String taskId = pauseTaskIds.get(i);
log.warn("Task [%s] failed to respond to [pause] in a timely manner, killing task", taskId);
killTask(taskId);
taskGroup.tasks.remove(taskId);
} else { // otherwise build a map of the highest offsets seen
... So, |
This issue has been marked as stale due to 280 days of inactivity. |
This issue has been closed due to lack of activity. If you think that |
In
KafkaSupervisor
, thetaskGroups
is accessed by multiple threads and modified in mainexec
thread. IncheckTaskDuration
, the keys fromtaskGroups
are retrieved and passed tocheckpointTaskGroup
which executes inworkerExec
thread and returns aFuture
. So potential race condition could be while the future is executing, agroupId
might get removed fromtaskGroups
. Potential places causing race condition could be inKafkaSupervisor.checkpointTaskGroup()
orKafkaSupervisor.verifyAndMergeCheckpoints()
.Related issue #5900
The text was updated successfully, but these errors were encountered: