Skip to content

KAFKA-19110: Add missing unit test for Streams-consumer integration #19457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 24, 2025

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented Apr 13, 2025

  • Construct AsyncKafkaConsumer constructor and verify that the
    RequestManagers.supplier() contains Streams-specific data structures.
  • Verify that RequestManagers constructs the Streams request managers
    correctly
  • Test StreamsGroupHeartbeatManager#resetPollTimer()
  • Test StreamsOnTasksRevokedCallbackCompletedEvent,
    StreamsOnTasksAssignedCallbackCompletedEvent, and
    StreamsOnAllTasksLostCallbackCompletedEvent in
    ApplicationEventProcessor
  • Test DefaultStreamsRebalanceListener
  • Test StreamThread.
    • Test handleStreamsRebalanceData.
    • Test StreamsRebalanceData.

Reviewers: Lucas Brutschy lbrutschy@confluent.io

Signed-off-by: PoAn Yang <payang@apache.org>
@FrankYang0529 FrankYang0529 changed the title KAFKA-19110: Add missing unit test for Streams-consumer integration (wip) KAFKA-19110: Add missing unit test for Streams-consumer integration Apr 14, 2025
@FrankYang0529 FrankYang0529 requested a review from cadonna April 14, 2025 16:01
@cadonna cadonna requested review from bbejeck and lucasbru April 16, 2025 10:33
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looking good to me, just one minor thing. Could you fix this as part of this PR? Then I think it is ready to merge from my side.

HANDLER
)
);
assertEquals("Named topologies and the CONSUMER protocol cannot be used at the same time.", exception.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception message is actually wrong. "Named topologies and the STREAMS protocol" should be what it says. Can you update the test and the production code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed it. Thanks.

Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with a couple of optional comments to consider

@@ -220,7 +228,7 @@ private AsyncKafkaConsumer<String, String> newConsumer(Properties props) {
(a, b, c, d, e, f, g) -> fetchCollector,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is pre-existing, but I'm wondering if it's worth doing a follow-up PR to use more meaningful names than a, b etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will handle this in a follow-up PR. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up PR: #19550

taskManager
);

private void setup(final StreamsRebalanceData streamsRebalanceData) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It took a second to realize what was going on with creating a DefaultStreamsRebalanceListener in two places. Maybe change the name from setup() with something like createRebalanceListenerWithRebalanceData

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it. Thanks.

@lucasbru lucasbru merged commit 3fae785 into apache:trunk Apr 24, 2025
23 checks passed
@FrankYang0529 FrankYang0529 deleted the KAFKA-19110 branch April 24, 2025 11:37
bbejeck pushed a commit that referenced this pull request Apr 24, 2025
Replace names like a, b, c, ... with meaningful names in
AsyncKafkaConsumerTest.

Follow-up:
#19457 (comment)

Signed-off-by: PoAn Yang <payang@apache.org>

Reviewers: Bill Bejeck <bbejeck@apache.org>, Ken Huang <s7133700@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants