Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Concurrent Task Insertion in pendingCompletionTaskGroups #16834

Merged
merged 6 commits into from
Aug 8, 2024

Conversation

hardikbajaj
Copy link
Contributor

@hardikbajaj hardikbajaj commented Aug 2, 2024

Fix Concurrent Task Insertion in pendingCompletionTaskGroups

Fixes #16727

Description

Fixed thread synchronisation issue in addDiscoveredTaskToPendingCompletionTaskGroups so the pendingCompletionTaskGroup is properly locked while initialising.

As multiple threads with different task ids are hitting this function, they read a stale copy of concurrent hash map and create new pending completion task groups for tasks with same group id, which ideally should be added in a single TaskGroup.
To properly syncrhonize pendingCompletionTaskGroup across multiple threads, we need to do all updates inside the compute block as it locks the Map[key] and performs a write based on a locked read. This synchronises the value across all running threads

Fixed the bug ...

#16727
addDiscoveredTaskToPendingCompletionTaskGroups is not properly Thread Synchronized and updates to it are made on the basis of a stale copy of reads. When we submit supervisor config, there are cases where instead of adding all tasks in same TaskGroup, it can create Multiple copies of TaskGroups. For ex, if A1,A2 are consuming from same partition and are in same group, then

ideal_behavior: pendingCompletionTaskGroup[P] = { TaskGroup{A1,A2}  }
behavior_seen: pendingCompletionTaskGroup[P] = { TaskGroup{A1}, TaskGroup{A2} }

This is because, while initialising a new TaskGroup, the threads rely on a stale copy of read and multiple threads executing simultaneously, adding tasks to pendingCompletionTaskGroup can create new task groups instead of being added to existing ones. This behaviour defeats the purpose of Task replication as these single task taskgroup if gets failed for some reason, than Overlord sees it as entire task group is failed, and kill actively reading tasks too, to resume ingestion from last published segment.

Renamed the class ...

Added a forbidden-apis entry ...

Release note


Key changed/added classes in this PR
  • MyFoo
  • OurBar
  • TheirBaz

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Fix Concurrent Task Insertion in pendingCompletionTaskGroups
@AmatyaAvadhanula
Copy link
Contributor

I think the logic may be incorrect.

We should expect the following test (that I have written) to pass with the current logic and it does.
However, the changes in this patch cause a failure.


  @Test
  public void testAddDiscoveredTaskToPendingCompletionTaskGroupsSync()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
    Map<String, String> startingPartiions = new HashMap<>();
    startingPartiions.put("partition", "offset");

    Map<String, String> startingPartiions1 = new HashMap<>();
    startingPartiions.put("partition", "offset1");

    supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_1", startingPartiions);
    supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_2", startingPartiions);
    supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_3", startingPartiions);

    supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_7", startingPartiions1);
    supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_8", startingPartiions1);
    supervisor.addDiscoveredTaskToPendingCompletionTaskGroups(0, "task_9", startingPartiions1);


    CopyOnWriteArrayList<SeekableStreamSupervisor.TaskGroup> taskGroups = supervisor.getPendingCompletionTaskGroups(0);
    Assert.assertEquals(2, taskGroups.size());
    Assert.assertEquals(3, taskGroups.get(0).tasks.size());
    Assert.assertEquals(3, taskGroups.get(1).tasks.size());
  }

I think it is because you're handling the task group but not the start offsets as part of your logic.

@AmatyaAvadhanula
Copy link
Contributor

It might be helpful if you can also add comments explaining what this method does and why concurrent insertion was previously failing and why changing the order of steps would help.

@hardikbajaj
Copy link
Contributor Author

Hey @AmatyaAvadhanula , thanks for identifying this case which I missed.
I have improved the logic to handle this case.

Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, and the changes @hardikbajaj!

I was just wondering if we could somehow test this without adding a test-only method. LGTM otherwise.

Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you, @hardikbajaj

@AmatyaAvadhanula AmatyaAvadhanula merged commit 1cf3f4b into apache:master Aug 8, 2024
90 checks passed
airlock-confluentinc bot pushed a commit to confluentinc/druid that referenced this pull request Aug 8, 2024
…16834)

Fix streaming task failures that may arise due to concurrent task insertion in pendingCompletionTaskGroups
airlock-confluentinc bot pushed a commit to confluentinc/druid that referenced this pull request Aug 8, 2024
…16834)

Fix streaming task failures that may arise due to concurrent task insertion in pendingCompletionTaskGroups
hardikbajaj added a commit to confluentinc/druid that referenced this pull request Aug 8, 2024
…16834) (#219)

Fix streaming task failures that may arise due to concurrent task insertion in pendingCompletionTaskGroups
apache@1cf3f4b
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Actively Reading tasks fails even though Pending completion tasks are a success
4 participants