-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Fix send offset to tx, when no active tx #4096
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
876763a
to
6fa0e01
Compare
} | ||
|
||
private void sendOffsetsToTransaction() { | ||
if (this.kafkaTxManager != null && TransactionSynchronizationManager.getResource(this.kafkaTxManager.getProducerFactory()) == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the kafkaTxManager
is null? Don't we need to address that also? If it is null, we can return as there is no point in sending the offsets to transaction. Do I miss anything on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sobychacko You're right. I fixed it. Thanks for the feedback.
. Signed-off-by: moonyougnCHAE <xpf_fl@naver.com>
. Signed-off-by: moonyougnCHAE <xpf_fl@naver.com>
. Signed-off-by: moonyougnCHAE <xpf_fl@naver.com>
1e3e9a6
to
a108a33
Compare
template.send(new ProducerRecord<>(topic11, 0, 0, "bar2")); | ||
return null; | ||
}); | ||
assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this test ensuring that offsets are sent to only active transactions? I mean, i see that you are asserting that two records are consumed transactionally, but how do we ensure the case where there is no active txn? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sobychacko Thanks for the feedback. I modified the test to verify whether the commit occurred within a transaction by using TransactionExecutionListener
.
. Signed-off-by: moonyougnCHAE <xpf_fl@naver.com>
Fixes #4088
Description
When a message is filtered out by the Kafka interceptor while the previous message passes through the interceptor and the producer is created, a commit occurs while the Kafka transaction is not active.
structure: Interceptor → Listener (a transactional listener that consumes a message and produces another message)