Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17783: Adding listeners to remove share partition on partition changes #17796

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from

Conversation

apoorvmittal10
Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 commented Nov 13, 2024

The PR adds listeners to Partition class which might invoke the clean up of share partitions on Partition changes.

I considered introducing new Listener class which defines methods per SharePartitionKey but avoided as that would have required wiring up at the calls at different places in ReplicaManager. Additionally replica manager needed handling to store the new listener, etc. I found the current PR approach better as SharePartition is abstracted Shared view of Partition hence attaching the change events on Partition is helpful.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added core Kafka Broker KIP-932 Queues for Kafka small Small PRs labels Nov 13, 2024
Copy link
Collaborator

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a promising approach. It's in an area of code that I really don't know, so I'm not qualified to a proper review. I do have a couple of comments:

  • Can the partition become a leader after it has previously become a follower? I just wonder whether the state machine is more complicated than this PR implies?
  • The listener is called under a lock in the Partition. The listener implementation needs to be very careful not to do anything brave under that lock. I expect there is potential for a deadlock here.

@apoorvmittal10
Copy link
Collaborator Author

apoorvmittal10 commented Nov 13, 2024

Thanks @AndrewJSchofield for review and good points.

Can the partition become a leader after it has previously become a follower? I just wonder whether the state machine is more complicated than this PR implies?

Yes, that can always happen. Once it does then next share fetch request should load the share partition. The scenario is not very different than partition being shuffled across brokers. The broker who is the leader of the partition should be able to load share partition.

The listener is called under a lock in the Partition. The listener implementation needs to be very careful not to do anything brave under that lock. I expect there is potential for a deadlock here.

The lock is for each Partition, but I do get the point. I can make the call outside lock as well, I ll wait for @junrao's comments as he can help.

@mumrah
Copy link
Contributor

mumrah commented Nov 13, 2024

@apoorvmittal10 what is the purpose of this listener? Is is so that SPM can clean up its in-memory state when it is no longer the leader for a partition?

If that's the main use case, I think we should consider tying into the metadata system directly rather than coupling ourselves to Partition.

@dajac, since its looks like you added it, maybe you can comment on the intended usage of PartitionListener?

@apoorvmittal10
Copy link
Collaborator Author

@apoorvmittal10 what is the purpose of this listener? Is is so that SPM can clean up its in-memory state when it is no longer the leader for a partition?

Yes, that's the purpose.

If that's the main use case, I think we should consider tying into the metadata system directly rather than coupling ourselves to Partition.

So as far as I see the code there exists 1 approach taken by coordinators: in replica manager where we have a method becomeLeaderOrFollower where we pass onLeadershipChange method. The onLeadershipChange method triggers groupCoordinator and txnCordinators, onElection and onResignation methods. The 2) approach is by invoking the listeners on partition change.

I am not sure if we do get leadership change notifications from metadata system. Not sure if we could have some kind of leaderEpochChange listener to see the partition epoch bump and do some actions. But I found the listeners approach much sensible.

@github-actions github-actions bot removed the small Small PRs label Nov 14, 2024
Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the details, @apoorvmittal10. As we discussed offline, those methods in ReplicaManager are driven by the metadata system (BrokerMetadataPublisher). In the end, if we're just acting on one partition at a time, it shouldn't matter if we use the partition listener or the metadata publisher.

I am not sure if we do get leadership change notifications from metadata system

Partition leadership, ISR, etc are all controller-managed metadata which flow through the metadata system. We can reliably learn if a partition became a leader or if it became a follower through MetadataPublisher (this is how ReplicaManager and the coordinators work).

--

Thinking a bit more about this, I'd like to use a metadata publisher rather than modifying the PartitionListener. It seems that PartitionListener is more geared towards the log state rather than the partition metadata. In fact, the only production usage of this listener I can find is kafka.coordinator.group.ListenerAdapter#onHighWatermarkUpdated.

Since we are interested in subscribing to changes in the partition's leadership state, I think it is better to directly subscribe to the metadata system. This also has the benefit of eliminating the (potentially numerous) SharePartitionListener objects.

*/
@Override
public void onFollower(TopicPartition topicPartition) {
log.info("The share partition leader change listener is invoked for the topic-partition: {}, share-partition: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INFO is probably too noisy for this as we expect it any time the topic leadership changes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, can make it debug as well.

@apoorvmittal10
Copy link
Collaborator Author

Thanks @mumrah, works well with me. I was more thinking to tie it up with Partition listener initially but can work either way. @dajac @junrao What do you think?

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the PR. Left a comment.

Regarding whether to use a listener at the Partition level or from BrokerMetadataPublisher, it's probably better to use a Partition level listener. It's true that the source of truth of the metadata is BrokerMetadataPublisher. However, the share partition requests only interact with metadata at the Partition level. So, it's more natural to take an action to share partitions after the metadata has been reflected at the Partition level. Otherwise, what could happen is that we remove a partition after the metadata changes in BrokerMetadataPublisher, but not in Partition yet. When we try to create the new SharePartition, it's still created with the outdated leaderEpoch.

/**
* Called when the Partition on this broker is marked as follower.
*/
def onFollower(partition: TopicPartition): Unit = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a broker switches from a follower to a follower, there is no need to remove the share partition. It's only when a broker is no longer a leader, a share partition should be removed. So we could add a more specific listener.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think either onLeaderToFollower or onBecomeFollower could be the better names?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@apoorvmittal10
Copy link
Collaborator Author

@apoorvmittal10 : Thanks for the PR. Left a comment.

Regarding whether to use a listener at the Partition level or from BrokerMetadataPublisher, it's probably better to use a Partition level listener. It's true that the source of truth of the metadata is BrokerMetadataPublisher. However, the share partition requests only interact with metadata at the Partition level. So, it's more natural to take an action to share partitions after the metadata has been reflected at the Partition level. Otherwise, what could happen is that we remove a partition after the metadata changes in BrokerMetadataPublisher, but not in Partition yet. When we try to create the new SharePartition, it's still created with the outdated leaderEpoch.

Thanks a lot for confirming. Then I will go ahead with this approach and ll add tests to the PR.
cc: @mumrah @dajac

@apoorvmittal10
Copy link
Collaborator Author

@AndrewJSchofield @mumrah @junrao I have updated the code with tests. Also removed listener call from locked markFollower method, @AndrewJSchofield mentioned, rather moved to replicaManager itself. Please review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants