Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17509: Introduce a delayed action queue to complete purgatory actions outside purgatory. #17177

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from

Conversation

adixitconfluent
Copy link
Contributor

@adixitconfluent adixitconfluent commented Sep 12, 2024

About

In reference to comment #16969 (comment) , I have introduced a DelayedActionQueue to add purgatory actions and try to complete them.

  1. I've added code to add purgatory actions to DelayedActionQueue when partition locks are released after fetch in forceComplete. Also, code has been added to onExpiration to check the delayed actions queue and try to complete it. Since onExpiration serves as a callback for forceComplete, it should not lead to infinite call stack.
  2. Also, fixed a few warning in some tests in DelayedShareFetchTestwhich were occurring due to insufficient mocking.

Testing

The code has been tested with the help of unit tests.

@apoorvmittal10 apoorvmittal10 added the KIP-932 Queues for Kafka label Sep 12, 2024
@adixitconfluent
Copy link
Contributor Author

I've checked the 3 test failures. They are unrelated to the PR. I ran all of them locally and they all passed.

Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch @adixitconfluent!

Here's my understanding of the current share fetch handling

  • KafkaApis is calling into SPM to enqueue a share request
  • SPM#maybeProcessFetchQueue runs recursively (! 🙀) until the queue is empty
  • On each iteration, we get a share fetch request off the queue, do some validation and enqueue a DelayedShareFetch

Since adding the DelayedShareFetch to the purgatory is non-blocking, I'm pretty sure we are essentially not using the fetch queue any more. Or rather, we are now using the DelayedShareFetch purgatory as a fetch queue (which was the goal of the refactoring, after all).

For fetchQueue I don't see too many remaining usages:

  • Adding in SPM#fetchMessages (from KafkaApis)
  • completeExceptionally in SPM#close
  • Polling in SPM#maybeProcessFetchQueue

Since this closely matches our DelayedShareFetch usage, I'm wondering if we can remove the fetchQueue code in this PR.

WDYT?

// then we should check if there is a pending share fetch request for the topic-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
// we directly call delayedShareFetchPurgatory.checkAndComplete
delayedActionQueue.add(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent I'm a little confused by the async code here. We are gathering some futures in ShareFetchUtils#processFetchResponse, but when I look down into SharePartition#acquire it's all synchronous/blocking code (it just returns a completed CompletableFuture).

Is this some leftovers from the refactoring? Or do we intend to make SharePartition#acquire async?

I ask this because if we're not keeping the CompletableFuture return type in SharePartition#acquire, we can fix it in this PR and avoid some complexity here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mumrah , we created a JIRA https://issues.apache.org/jira/browse/KAFKA-17522 for tracking this issue earlier. Yes, it makes sense that share partition acquire functionality need not return a future. I am not sure whether I should cover it in this PR itself.
@apoorvmittal10 any thoughts whether we should cover it in this PR or since the JIRA is assigned to you, if you're working on it already, we can have another PR for the resolution?

@adixitconfluent
Copy link
Contributor Author

adixitconfluent commented Sep 13, 2024

hi @mumrah,

I'm wondering if we can remove the fetchQueue code in this PR.

You're right, we don't need the fetch queue. I have created a JIRA https://issues.apache.org/jira/browse/KAFKA-17509 for it, and will prioritize it in the coming PRs.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the PR. Added a few comments.

super(shareFetchPartitionData.fetchParams().maxWaitMs, Option.empty());
this.shareFetchPartitionData = shareFetchPartitionData;
this.replicaManager = replicaManager;
this.partitionCacheMap = partitionCacheMap;
this.delayedActionQueue = delayedActionQueue;
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating a separate DelayedActionQueue, it would be useful to share with the one already in ReplicaManager.

@Override
public void onExpiration() {
delayedActionQueue.tryCompleteActions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why we need this. We only need to call tryCompleteActions() after a new item has been added to the queue. In KafkaApis, after processing each request, an item could be added to the queue. So we need to call tryCompleteActions() there (we are already doing that for the existing action queue).

In #8657, we also call #8657 in DelayedJoin.onExpiration(). This is because this method calls coordinator.onExpireJoin(), which could call ReplicaManager.appendRecords() and thus add an item to the queue. Here, since there is no additional logic to add an item to the queue in onExpiration(), there is no need to call tryCompleteActions() here.

@@ -524,6 +533,7 @@ private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set<Objec

@Override
public void close() throws Exception {
this.delayedActionsQueue.tryCompleteActions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessary since we don't do that when closing the ReplicaManager.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants