-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16331: remove EOSv1 config from StreamsConfig #17170
Conversation
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Outdated
Show resolved
Hide resolved
@SuppressWarnings("deprecation") | ||
|
||
// TODO cleanup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be addressed in a follow up PR -- for now, just replacing removed variables with hard-coded strings below.
@@ -56,12 +56,12 @@ public static ProcessingMode processingMode(final StreamsConfig config) { | |||
} | |||
} | |||
|
|||
@SuppressWarnings("deprecation") | |||
// TODO cleanup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same -- follow up PR will refactor this further and remove hard coded string replacements.
public enum ProcessingMode { | ||
AT_LEAST_ONCE(StreamsConfig.AT_LEAST_ONCE), | ||
|
||
EXACTLY_ONCE_ALPHA(StreamsConfig.EXACTLY_ONCE), | ||
// TODO cleanup | ||
EXACTLY_ONCE_ALPHA("exactly_once"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be addressed in a follow up refactoring PR. Just a workaround to keep the scope contained.
assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"), is(nullValue())); | ||
} | ||
|
||
// TODO: should we keep this test? (cf other TODO) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relates to the question above, about the remove code which sets the internal config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's remove in this PR
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for PR @mjsax, overall this lgtm, just left a couple of comments
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Outdated
Show resolved
Hide resolved
public static ProcessingMode processingMode(final StreamsConfig config) { | ||
if (StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) { | ||
if ("exactly_once".equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) { | ||
return ProcessingMode.EXACTLY_ONCE_ALPHA; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe take the same approach with EXACTLY_ONCE_ALPHA
? Same for the other occurances
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will cleanup ProcessingMode
in a follow up PR an remove it.
assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"), is(nullValue())); | ||
} | ||
|
||
// TODO: should we keep this test? (cf other TODO) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's remove in this PR
@@ -176,13 +175,13 @@ public void createTopics() throws Exception { | |||
} | |||
|
|||
@ParameterizedTest | |||
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2}) | |||
@ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE, StreamsConfig.EXACTLY_ONCE_V2}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have to address in this PR, but why do we have AT_LEAST_ONCE
parameters in the EOS integration test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed on the other PRs... need to clean this up. Filed https://issues.apache.org/jira/browse/KAFKA-17558
fbdf59e
to
0680317
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@mjsax looks like we have a single related test failure |
@@ -102,11 +101,11 @@ public class StreamsProducerTest { | |||
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")) | |||
); | |||
|
|||
@SuppressWarnings("deprecation") | |||
// TODO cleanup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed via #17212
Stacked on #17110
This PR add a couple of workaround (marked as TODO) to limit the scope. TODOs will be resolved via follow up PRs.