-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-19479: at_least_once mode in Kafka Streams silently drops messages when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees #20285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shashankhs11 : Thanks for the PR. Left a comment.
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
Outdated
Show resolved
Hide resolved
467de10 to
a51d19b
Compare
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shashankhs11 : Thanks for the updated PR. A few more comments.
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
Show resolved
Hide resolved
...java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
Outdated
Show resolved
Hide resolved
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shashankhs11 : Thanks for the updated PR. A few more comments.
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
Show resolved
Hide resolved
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shashankhs11 : Thanks for the updated PR. A few more comments.
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
Show resolved
Hide resolved
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shashankhs11 : Thanks for the updated PR. A few more comments.
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
Outdated
Show resolved
Hide resolved
...java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java
Outdated
Show resolved
Hide resolved
...java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java
Outdated
Show resolved
Hide resolved
...java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java
Outdated
Show resolved
Hide resolved
|
@shashankhs11 : Could you fix the build error? Also, could you run the same jmh test again to make sure there is no regression on the flush() time? |
|
@junrao: fixed the build error and added jmh test report in a seperate comment above. There seems to be a regression when Also left a comment in reply to this - #20285 (comment) |
|
@shashankhs11 : The small regression from 0.004 to 0.005 ms/op is negligible and is fine. |
0e9c522 to
6a4745c
Compare
|
CI check with Java 25 seemed to be failing. Rebased with latest trunk. |
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shashankhs11 : Thanks for the updated PR. Just a minor comment. Also, could you rebase to pick up a flaky test fix #20713 ?
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
Outdated
Show resolved
Hide resolved
6a4745c to
896ecc1
Compare
@junrao: Done :) |
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shashankhs11 : Thanks for the updated PR. LGTM. Could you update the description of the PR before merging?
@junrao: Done! Changed the title as well as the description. Thank you very much for your time and patience, Jun. I am very very grateful for the opportunity to have worked closely with you over the past couple of weeks. I got to learn a lot from this :) |
|
@shashankhs11 : I kept the PR name to match the jira name, but adjusted the description in the commit message. Thanks for working on this issue! |
|
Thanks for reviewing this PR @junrao! |
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (#20285) Bug Fix in Producer where flush() does not wait for a batch to complete after splitting. Cf - #20254 (comment) and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for more details Reviewers: Jun Rao <junrao@gmail.com>
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (#20285) Bug Fix in Producer where flush() does not wait for a batch to complete after splitting. Cf - #20254 (comment) and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for more details Reviewers: Jun Rao <junrao@gmail.com>
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (#20285) Bug Fix in Producer where flush() does not wait for a batch to complete after splitting. Cf - #20254 (comment) and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for more details Reviewers: Jun Rao <junrao@gmail.com>
|
Cherry-picked to |
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (apache#20285) Bug Fix in Producer where flush() does not wait for a batch to complete after splitting. Cf - apache#20254 (comment) and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for more details Reviewers: Jun Rao <junrao@gmail.com>
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (apache#20285) Bug Fix in Producer where flush() does not wait for a batch to complete after splitting. Cf - apache#20254 (comment) and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for more details Reviewers: Jun Rao <junrao@gmail.com>
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (apache#20285) Bug Fix in Producer where flush() does not wait for a batch to complete after splitting. Cf - apache#20254 (comment) and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for more details Reviewers: Jun Rao <junrao@gmail.com>






Fixes a bug in producer.
Cf - #20254 (comment)
and KAFKA-19479 for
more details