-
Notifications
You must be signed in to change notification settings - Fork 14.9k
MINOR: Port RestoreIntegrationTest to the new protocol #20347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
aliehsaeedii
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @RaidenE1. Except calling streams-related admin API, I don't see any other concern.
|
|
||
| consumer.commitSync(); | ||
| consumer.close(); | ||
| admin.alterConsumerGroupOffsets(appId, offsetsToCommit).all().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please call admin.alterStreamsGroupOffsets
kirktrue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @RaidenE1!
Any reason to re-throw the failure in setCommittedOffset() vs. calling fail() directly?
| if (admin != null) { | ||
| admin.close(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: there's a utility method in Kafka called Utils.closeQuietly() which can make closing resources a one-liner.
|
|
||
| consumer.commitSync(); | ||
| consumer.close(); | ||
| admin.alterStreamsGroupOffsets(appId, offsetsToCommit).all().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to make this configurable? If the test runs with classic protocol we would still need to use the corresponding consumer group admit-call?
In general, it seems that the whole test must be parametrized, to run with both classic and streams protocol? Atm, it seem that the test is only running with classic, and thus using this streams specific call, should actually break the test?
|
|
||
| consumer.commitSync(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be an else here?
| @ParameterizedTest | ||
| @ValueSource(booleans = {true, false}) | ||
| public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean stateUpdaterEnabled) throws Exception { | ||
| @CsvSource({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should do the same for all other test case, like shouldRestoreNullRecord above. There is 8 test methods I did not miscount, but you enable the new protocol only for two.
bbejeck
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @RaidenE1 I've made a pass
|
|
||
| if (useNewProtocol) { | ||
| // For new protocol, we need to stop the streams instance before altering offsets | ||
| kafkaStreams.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably best to use close(Duration timeout) overload, otherwise the test can get stuck
| } | ||
| }); | ||
| kafkaStreams.setGlobalStateRestoreListener(new TrackingStateRestoreListener(restored)); | ||
| kafkaStreams.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a IntegrationTestUtils.startApplicationAndWaitUntilRunning(KafkaStreams instance) method you can use that returns your KS instance once it's in a running state which will simplify your logic here - no need for CountDownLatch
|
|
||
| // Restart the streams instance with a new startup latch | ||
| final CountDownLatch restartLatch = new CountDownLatch(1); | ||
| kafkaStreams = new KafkaStreams(builder.build(props), props); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
| assertThat(CloseCountingInMemoryStore.numStoresClosed(), equalTo(expectedAfterStreams2Close)); | ||
| } finally { | ||
| streams1.close(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this was pre-existing but I have the same comment about closing with the overload that accepts a timeout.
|
|
||
| admin.alterStreamsGroupOffsets(appId, offsetsToCommit).all().get(); | ||
| } catch (final Exception e) { | ||
| throw new RuntimeException("Failed to set committed offsets", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use fail(...._
bbejeck
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @RaidenE1! LGTM
|
merged #20347 into trunk |
The test uses regular consumer to commit offsets. New protocol requires
a streams consumer since we are using streams groups, otherwise we run
into group ID conflicts.
Followed the addition of the KafkaAdmin interface for setting offsets, a
Kafka Admin client is created and used the interface in to set the
committed offsets instead of instantiating a consumer.
Also enable all tests for stream new protocol.
Reviewers: Alieh Saeediasaeedi@confluent.io, Kirk True
ktrue@confluent.io, Matthias Sax mjsax@apache.org, Bill Bejeck
bbejeck@apache.org