-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-12499: add transaction timeout verification #10482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12499: add transaction timeout verification #10482
Conversation
58983d0 to
067e5dc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to do this check in StreamsConfig ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, we should fail-fast at the level of the KafkaStreams. Otherwise in eos-alpha we actually have to wait for all the individual StreamThreads to be created, started, complete a rebalance to get a task assignment, and then start throwing IllegalArgumentException on each thread when it goes to create a task producer. (Can we also log an error in addition to the IllegalArgumentException?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
8974909 to
961c2fb
Compare
guozhangwang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
|
Re-triggered unit tests |
66ba7c2 to
9a87b6a
Compare
mjsax
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use long as commitInterval is declared a LONG in StreamsConfig:
.define(COMMIT_INTERVAL_MS_CONFIG,
Type.LONG,
DEFAULT_COMMIT_INTERVAL_MS,
atLeast(0),
Importance.LOW,
COMMIT_INTERVAL_MS_DOC)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to either convert transaction timeout or commit interval I assume. Will switch to long comparison instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense -- a long -> int cast is safe, but not the other way around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we update all this? While not incorrect, should not be necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized that it's safer to use the correct long type instead of having int/long/string mixed as much as possible.
|
Build failed with |
7475fdb to
7b84ad4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the explicit cast required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I think so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be simpler to just use int transactionTimeout -- Java will auto-cast to long in
if (transactionTimeout < commitInterval) {
b4e2d74 to
6bfa1c8
Compare
mjsax
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for being picky... LGTM.
6bfa1c8 to
7050bef
Compare
|
@abbccdda it looks like some eos test failed in the PR build, guessing it's related |
7050bef to
c01bd12
Compare
61d43d3 to
0e7b264
Compare
|
Only unrelated test fails, merging. |
…e-allocations-lz4 * apache-github/trunk: (43 commits) KAFKA-12800: Configure generator to fail on trailing JSON tokens (apache#10717) MINOR: clarify message ordering with max in-flight requests and idempotent producer (apache#10690) MINOR: Add log identifier/prefix printing in Log layer static functions (apache#10742) MINOR: update java doc for deprecated methods (apache#10722) MINOR: Fix deprecation warnings in SlidingWindowedCogroupedKStreamImplTest (apache#10703) KAFKA-12499: add transaction timeout verification (apache#10482) KAFKA-12620 Allocate producer ids on the controller (apache#10504) MINOR: Kafka Streams code samples formating unification (apache#10651) KAFKA-12808: Remove Deprecated Methods under StreamsMetrics (apache#10724) KAFKA-12522: Cast SMT should allow null value records to pass through (apache#10375) KAFKA-12820: Upgrade maven-artifact dependency to resolve CVE-2021-26291 HOTFIX: fix checkstyle issue in KAFKA-12697 KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller (apache#10572) KAFKA-12342: Remove MetaLogShim and use RaftClient directly (apache#10705) KAFKA-12779: KIP-740, Clean up public API in TaskId and fix TaskMetadata#taskId() (apache#10735) KAFKA-12814: Remove Deprecated Method StreamsConfig getConsumerConfigs (apache#10737) MINOR: Eliminate redundant functions in LogTest suite (apache#10732) MINOR: Remove unused maxProducerIdExpirationMs parameter in Log constructor (apache#10723) MINOR: Updating files with release 2.7.1 (apache#10660) KAFKA-12809: Remove deprecated methods of Stores factory (apache#10729) ...
This PR tries to add the check for transaction timeout for a comparison against commit interval of streams. If transaction timeout is smaller than commit interval, stream should crash and inform user to update their commit interval to match the given transaction timeout, or vise versa.
Committer Checklist (excluded from commit message)