KAFKA-14274 [6, 7]: Introduction of fetch request manager#14406
KAFKA-14274 [6, 7]: Introduction of fetch request manager#14406junrao merged 77 commits intoapache:trunkfrom kirktrue:KAFKA-14274-3-introduce-fetch-request-manager-take-2
Conversation
Changes: 1. Introduces FetchRequestManager that implements the RequestManager API for fetching messages from brokers. Unlike Fetcher, record decompression and deserialization is performed on the application thread inside CompletedFetch. 2. Restructured the code so that objects owned by the background thread are not instantiated until the background thread runs (via Supplier) to ensure that there are no references available to the application thread. 3. Ensuring resources are properly using Closeable and using IdempotentCloser to ensure they're only closed once. 4. Introduces ConsumerTestBuilder to reduce a lot of inconsistency in the way the objects were built up for tests.
|
@philipnee Can you add the |
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ErrorEventHandler.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchConfig.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
Outdated
Show resolved
Hide resolved
Changes: 1. Introduces FetchRequestManager that implements the RequestManager API for fetching messages from brokers. Unlike Fetcher, record decompression and deserialization is performed on the application thread inside CompletedFetch. 2. Restructured the code so that objects owned by the background thread are not instantiated until the background thread runs (via Supplier) to ensure that there are no references available to the application thread. 3. Ensuring resources are properly using Closeable and using IdempotentCloser to ensure they're only closed once. 4. Introduces ConsumerTestBuilder to reduce a lot of inconsistency in the way the objects were built up for tests.
…f github.com:kirktrue/kafka into KAFKA-14274-3-introduce-fetch-request-manager-take-2
Changes: 1. Introduced a new generic queue processing class that both the ApplicationEventProcessor and the BackgroundEventProcessor use. 2. Moved event processing to each iteration of the internal loop inside PrototypeAsyncConsumer.poll, instead of just at the start. 3. Removed the NOOP event types. 4. Introduced the CompletableEvent interface which will be used by the BackgroundEvent hierarchy soon 5. Provided missing documentation around the queue processors. 6. No longer returning a flag from the event processor method as it was not used anywhere.
Refactored the close method so that it's much cleaner. Also fixed a confusing doc change to refreshCommittedOffsetsIfNeeded().
FetchEvent now provides the Future to the FetchRequestManager which will directly respond with the results of the fetch. This allows the application thread to properly block in pollForFetches() as it waits for a response. A separate thread-safe blocking queue collects the results from the Future, as opposed to collecting the results from the future in the fetch buffer, as that is not thread safe and should not be used across threads.
1. Renamed DefaultEventHandler to ApplicationEventHandler 2. Renamed ErrorEventHandler to BackgroundEventHandler 3. Deleted EventHandler interface 4. Moved ApplicationEventHandler and BackgroundEventHandler (and their unit tests) into the "events" sub-package for consistency 5. Removed use of mocks in DefaultBackgroundThreadTest as they caused issues with various methods returning null
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
Show resolved
Hide resolved
|
|
||
| if (!fetch.isEmpty()) { | ||
| // Notify the network thread to wake up and start the next round of fetching. | ||
| applicationEventHandler.wakeup(); |
There was a problem hiding this comment.
This doesn't seem quite right. It's possible for a fetchRequest to return no data. In that case, if we don't wake up the the network thread, it may not be able to send the next fetch request for a long time.
There was a problem hiding this comment.
The enqueuing of other events will also call wakeupNetworkThread, so it shouldn't be a problem. But I went ahead and changed it as requested.
| /** | ||
| * Wakeup the {@link ConsumerNetworkThread network I/O thread} to pull the event from the queue. | ||
| */ | ||
| public void wakeup() { |
There was a problem hiding this comment.
wakeup => wakeupNetworkThread ?
| Map<TopicPartition, Long> offsetResetTimestamps; | ||
|
|
||
| try { | ||
| offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp(); |
There was a problem hiding this comment.
auto.offset.reset has 3 options: latest, earliest, none. It's possible for a user to choose none, which means "throw exception to the consumer if no previous offset is found for the consumer's group". But in that case, offsetFetcherUtils.getOffsetResetTimestamp just ignores that partition and won't reset it forever. We need to throw an exception so that the user knows.
It's a bit of weird configuration since it means a consumer can't really consume for the very first time with this option. However, I tested it out with the existing consumer and it does throw an exception.
bin/kafka-console-consumer.sh --consumer-property auto.offset.reset=none --bootstrap-server localhost:9092 --topic test
[2023-10-20 15:30:07,088] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test-0]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:711)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2459)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1198)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1178)
at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:473)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
…OffsetForPartitionException if there's no reset strategy
|
FYI: I rebased against |
| timer.update(pollTimer.currentTimeMs()); | ||
| } | ||
|
|
||
| return collectFetch(); |
There was a problem hiding this comment.
Could we just return empty here instead of calling collectFetch() again since the caller is in a loop and can call this method to collect fetch again?
There was a problem hiding this comment.
Yes, we could update pollForFetches() to simply return Fetch.empty() at the end.
However, there's code that intentionally blocks (for a little bit) waiting for fetch data to arrive in the buffer:
try {
fetchBuffer.awaitNotEmpty(pollTimer);
} catch (InterruptException e) {
log.trace("Timeout during fetch", e);
} finally {
timer.update(pollTimer.currentTimeMs());
}
return collectFetch();Also, each loop through poll() executes updateAssignmentMetadataIfNeeded() before checking the fetch buffer for any data. That method does a lot of work (network I/O) that we'd ideally skip if we already have the data.
Perhaps we could update awaitNotEmpty() with a return flag so that the tail end of pollForFetches() can look something like this:
try {
if (fetchBuffer.awaitNotEmpty(pollTimer))
return collectFetch();
} catch (InterruptException e) {
log.trace("Timeout during fetch", e);
} finally {
timer.update(pollTimer.currentTimeMs());
}
return Fetch.empty();There was a problem hiding this comment.
Thanks for the explanation, Kirk. We can just leave the code as it is then.
| // positions, so a consumer with manually assigned partitions can avoid a coordinator | ||
| // dependence by always ensuring that assigned partitions have an initial position. | ||
| if (isCommittedOffsetsManagementEnabled() && !refreshCommittedOffsetsIfNeeded(timer)) | ||
| return false; |
There was a problem hiding this comment.
Would it be clearer to rename refreshCommittedOffsetsIfNeeded to initWithCommittedOffsetsIfNeeded?
There was a problem hiding this comment.
The refreshCommittedOffsetsIfNeeded() name is derived from existing code in KafkaConsumer that calls out to ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(). If we change the method name in PrototypeAsyncConsumer, should we change ConsumerCoordinator's corresponding method name too?
There was a problem hiding this comment.
Yes, in both cases, the method sets offsets for partitions in initializing state.
There was a problem hiding this comment.
Updated and a bit of cleanup.
…Needed Also removed the check for null from ConsumerUtils.refreshCommittedOffsets() and put it on the ConsumerCoordinator.initWithCommittedOffsetsIfNeeded to check for null offsets map before calling.
No, I do not believe they are. There are 8 "new" failures, of which two are the same test with different parameters. Here are the tests and any Jiras for flakiness:
There are 22 "existing" failures, which are all related to the |
|
Closing and reopening to trigger another Jenkins test run. |
|
Ugh. One of the builds failed with: No integration test failures due to threads, though 😏 |
|
Closing and reopening to restart test. |
|
Closing and reopening to restart test. Jenkins doesn't seem happy lately. |
| @Override | ||
| public List<PartitionInfo> partitionsFor(String topic) { | ||
| throw new KafkaException("method not implemented"); | ||
| return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs)); |
There was a problem hiding this comment.
I had missed a detail here. We're only implementing this with the default timeout, but not the overloaded one below, that has the timeout provided by the user. Is there a reason or is it just that we missed the latter one? (same for the listTopics)
There was a problem hiding this comment.
(I guess it will be all part of the integration with the metadata calls right?)
There was a problem hiding this comment.
Good call out. I don't know that the necessary topic plumbing code was written at that time. Would you mind filing a bug to resolve?
|
@kirktrue : The build for JDK 21 and Scala 2.13 failed this time. Did it succeed before? |
|
@junrao—yes. Here's a brief history for JDK 21, starting with the most recent build (build 74):
These same intra-Jenkins communication, |
Changes: 1. Introduces FetchRequestManager that implements the RequestManager API for fetching messages from brokers. Unlike Fetcher, record decompression and deserialization is performed on the application thread inside CompletedFetch. 2. Restructured the code so that objects owned by the background thread are not instantiated until the background thread runs (via Supplier) to ensure that there are no references available to the application thread. 3. Ensuring resources are properly using Closeable and using IdempotentCloser to ensure they're only closed once. 4. Introduces ConsumerTestBuilder to reduce a lot of inconsistency in the way the objects were built up for tests. Reviewers: Philip Nee <pnee@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, Jun Rao<junrao@gmail.com>
Changes:
FetchRequestManagerthat implements theRequestManagerAPI for fetching messages from brokers. UnlikeFetcher, record decompression and deserialization is performed on the application thread insideCompletedFetch.Supplier) to ensure that there are no references available to the application thread.Closeableand usingIdempotentCloserto ensure they're only closed once.ConsumerTestBuilderto reduce a lot of inconsistency in the way the objects were built up for tests.