Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17515: Fix flaky RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener #17187

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from

Conversation

chenyulin0719
Copy link

@chenyulin0719 chenyulin0719 commented Sep 13, 2024

It's regarding KAFKA-17515.
Found two issues in the flaky tests: (Put the log analysis under Jira comments.)

  1. The error "java.nio.file.DirectoryNotEmptyException" occurs if the flush() of kafkaStreams.close() and purgeLocalStreamsState() are triggered in the same time. (The current timeout is 5 sec, which is too short since the CI is unstable and slow).
  2. Racing issue: Task to-be restored in ks-1 are rebalanced to ks-2 before entering active restoring state. So no onRestoreSuspend() was triggered.

To solve the issues:

  1. Remove the timeout in kafkaStreams.close()
  2. Ensure all tasks in ks-1 are active restoring before start second KafkaStreams(ks-2)

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@chenyulin0719 chenyulin0719 marked this pull request as draft September 15, 2024 09:57
Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenyulin0719 thanks for digging in this flaky again. a couple of comments are left. PTAL

@@ -579,12 +580,15 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception
validateReceivedMessages(sampleData, outputTopic);

// Close kafkaStreams1 (with cleanup) and start it again to force the restoration of the state.
kafkaStreams.close(Duration.ofMillis(5000L));
kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test has timeout, so maybe we can remove the timeout from close?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It make sense to me. Thanks!
Removed the timeout.

public static void waitForActiveRestoringTask(final KafkaStreams streams,
final int expectedTasks,
final long timeoutMilliseconds) throws Exception {
TestUtils.waitForCondition(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use lambda function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);

final TestStateRestoreListener kafkaStreams1StateRestoreListener = new TestStateRestoreListener("ks1", RESTORATION_DELAY);
kafkaStreams = startKafkaStreams(builder, kafkaStreams1StateRestoreListener, kafkaStreams1Configuration);

// Must make ensure all the restoring tasks are in active state before starting the new instance.
waitForActiveRestoringTask(kafkaStreams, 5, IntegrationTestUtils.DEFAULT_TIMEOUT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you include "suspend" in the comments? for example: all restoring task must be in active state. otherwise, the first kafka streams won't encounter "suspend" after starting another instance

Copy link
Author

@chenyulin0719 chenyulin0719 Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. I tried to rephrase it in my words. Please let me know if it's still not clear.

@chenyulin0719 chenyulin0719 marked this pull request as ready for review September 18, 2024 11:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants