KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively#16310
Conversation
…ache too aggressively Allow the committed offsets fetch to run for as long as needed. This handles the case where a user invokes Consumer.poll() with a very small timeout (including zero).
…than Long.MAX_VALUE
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
|
@AndrewJSchofield @cadonna @lianetm @philipnee—please review this PR. It's an alternative take on #16241 that seems simpler and not fraught with peril. Thanks! |
…hed or were unique, based on the test
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the PR. I like the new approach and the PR looks pretty good. I have left one comment to do with exception handling.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Show resolved
Hide resolved
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the PR, @kirktrue !
I like the approach.
Once the @AndrewJSchofield's comments are addressed, I guess we are good to go.
WDYT, @lianetm ?
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
Outdated
Show resolved
Hide resolved
|
Thanks for the PR @kirktrue, I love the simple the approach and the consistency with the legacy logic, thanks for your patience here :) Same as @cadonna , LGTM once the error handling on getResult that @AndrewJSchofield pointed out is addressed. Thanks! |
…with application event queue
…clearing the pending event
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
Outdated
Show resolved
Hide resolved
|
I'm happy with the approach now. Thanks @kirktrue. |
|
Thanks for the updates @kirktrue , happy with the approach too. Left an answer to your concerns on the tests, can be totally done separately if you prefer. |
|
@AndrewJSchofield @cadonna @lianetm @philipnee: this PR is ready to be re-reviewed. Thanks all for your input 😄 |
|
@jlprat—This Jira/PR is a blocker for the KIP-848 Java client work. It sounds like we're really close to merging this within the next day or two. Thanks! |
| if (pendingOffsetFetchEvent == null) | ||
| return false; | ||
|
|
||
| if (!pendingOffsetFetchEvent.partitions().equals(partitions)) |
There was a problem hiding this comment.
Can it get reuse if the partitions of fetch request includes "all" input partitions? It seems refreshCommittedOffsets can ignore those partitions.
There was a problem hiding this comment.
@chia7712—that's definitely an interesting optimization!
IIUC, the suggestion is to relax the requirement to allow reuse if the partitions for the current request are a subset of (or equal to) the previous request, right? So basically:
| if (!pendingOffsetFetchEvent.partitions().equals(partitions)) | |
| if (!pendingOffsetFetchEvent.partitions().containsAll(partitions)) |
The behavior of the existing LegacyKafkaConsumer is to allow reuse only if the partitions for the current request equal those of the previous request exactly (source). That logic is the basis for the behavior used in the AsyncKafkaConsumer. We've been very deliberate to try to match the behavior between the two Consumer implementations as closely as possible, unless there's a specific reason not to.
It's a small change, and it does makes sense (to me). My main concern is that it introduces a subtle difference in behavior between the two Consumer implementations. Also, the specific case we're trying to solve with this change is when the user has passed in a very low timeout and we're in a tight poll() loop, which suggests the partitions wouldn't be changing between those loops (CMIIW).
If I understand correctly, this seems like an optimization, rather than something needed for correctness. If that's the case, can I file a new Jira to implement this when we have a little more time to investigate and test?
Thanks!
There was a problem hiding this comment.
can I file a new Jira to implement this when we have a little more time to investigate and test?
sure and thanks for you sharing. I wasn't even aware that "match the behavior" before 🥲
There was a problem hiding this comment.
I've filed KAFKA-16966 to track this optimization.
| refreshCommittedOffsets(offsets, metadata, subscriptions); | ||
| return true; | ||
| } catch (TimeoutException e) { | ||
| log.error("Couldn't refresh committed offsets before timeout expired"); |
There was a problem hiding this comment.
Does poll(0) fill users' log with this error message? if so, should we change the log level or add more explanation in order to avoiding freak users out?
There was a problem hiding this comment.
Good point! ERROR level seems excessive here. At least WARN should be used. However, WARN would still fill users' logs. So maybe DEBUG?
There was a problem hiding this comment.
Changed to debug and updated the text of the log to be slightly more helpful.
There was a problem hiding this comment.
I'm not sure where that ERROR-level debugging crept in. I'd assumed it was a holdover from the LegacyKafkaConsumer, but its implementation doesn't log anything in the case of timeouts.
I think it's helpful to keep it there in DEBUG form.
|
Hi @kirktrue. Let's check again when the PR is approved :) |
| refreshCommittedOffsets(offsets, metadata, subscriptions); | ||
| return true; | ||
| } catch (TimeoutException e) { | ||
| log.error("Couldn't refresh committed offsets before timeout expired"); |
There was a problem hiding this comment.
Good point! ERROR level seems excessive here. At least WARN should be used. However, WARN would still fill users' logs. So maybe DEBUG?
| } catch (InterruptException e) { | ||
| throw e; | ||
| } catch (Throwable t) { | ||
| // Clear the pending event on errors that are not timeout- or interrupt-related. |
There was a problem hiding this comment.
nit: Could you please remove the comment? It does not really add any information because it just spells out what the code does.
| throw ConsumerUtils.maybeWrapAsKafkaException(t); | ||
| } finally { | ||
| if (shouldClearPendingEvent) | ||
| pendingOffsetFetchEvent = null; |
There was a problem hiding this comment.
nit:
Why not clearing pendingOffsetFetchEvent where you set shouldClearPendingEvent = true?
Would avoid the if here and the additional variable shouldClearPendingEvent.
There was a problem hiding this comment.
I went back and forth on this a couple of times. The idea was to have a single place that the variable is assigned and a single place it is cleared, just for easier reasoning of the code for later troubleshooting/debugging/modifying. But it's clearly debatable if it's cleaner that way, so I went ahead and removed the flag.
|
@AndrewJSchofield @cadonna @chia7712 @lianetm @philipnee: this PR is ready to be re-reviewed. Thanks all for your continued input 😄 |
lianetm
left a comment
There was a problem hiding this comment.
Thanks for the simplification and adjusted log level, that will definitely avoid misleading red flags (been there). LGTM.
…che too aggressively (#16310) Allow the committed offsets fetch to run for as long as needed. This handles the case where a user invokes Consumer.poll() with a very small timeout (including zero). Reviewers: Andrew Schofield <aschofield@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
|
@jlprat Can this get merged into 3.8.0? |
|
Hi @chia7712 we can let it stay in the 3.8 branch |
Allow the committed offsets fetch to run for as long as needed. This handles the case where a user invokes
Consumer.poll()with a very small timeout (including zero).Committer Checklist (excluded from commit message)