Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b030479
KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from c…
kirktrue Jun 7, 2024
3a4ad22
Removed unnecessary code from test
kirktrue Jun 7, 2024
5b1473b
Clearing out expired inflight fetch requests if not being reused
kirktrue Jun 7, 2024
3741802
CommitRequestManagerTest updates
kirktrue Jun 7, 2024
5aac85e
More test cleanup
kirktrue Jun 7, 2024
21c0572
Re-adding accidentally-removed whitespace
kirktrue Jun 7, 2024
7ee84f3
Minor formatting changes
kirktrue Jun 7, 2024
83dff8e
Enabling KafkaConsumerTest's testFetchStableOffsetThrowInPoll
kirktrue Jun 7, 2024
74ef706
Updated list of tests
kirktrue Jun 7, 2024
7427b97
Merge branch 'trunk' into KAFKA-16637-cache-offset-fetch-result
kirktrue Jun 10, 2024
9c5b98d
WIP: use an explicit cache of results vs. inflight
kirktrue Jun 10, 2024
a468c1c
Reverting KafkaConsumerTest changes
kirktrue Jun 10, 2024
e248ff0
Stupid whitespace
kirktrue Jun 10, 2024
04a8e69
Update KafkaConsumerTest.java
kirktrue Jun 10, 2024
9c537d2
WIP: more testing
kirktrue Jun 10, 2024
7680893
More refactoring :(
kirktrue Jun 11, 2024
b52f21a
Updates to make tests more thorough and concise
kirktrue Jun 11, 2024
18d75ba
MInor tidying
kirktrue Jun 11, 2024
1e3ef7d
First pass at comments
kirktrue Jun 11, 2024
fe2a126
Merge branch 'apache:trunk' into KAFKA-16637-cache-offset-fetch-result
kirktrue Jun 11, 2024
f3be9e2
Added clearing of the cache and cleaned up tests
kirktrue Jun 12, 2024
2a2a7ee
More clean up
kirktrue Jun 12, 2024
a9b110e
Test renaming
kirktrue Jun 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
Expand Down Expand Up @@ -47,6 +48,7 @@
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -61,6 +63,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -94,6 +97,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
* group anymore.
*/
private final MemberInfo memberInfo;
final OffsetFetchResultCache offsetFetchResultCache;

public CommitRequestManager(
final Time time,
Expand Down Expand Up @@ -156,6 +160,7 @@ public CommitRequestManager(
this.memberInfo = new MemberInfo();
this.metricsManager = new OffsetCommitMetricsManager(metrics);
this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker;
this.offsetFetchResultCache = new OffsetFetchResultCache();
}

/**
Expand Down Expand Up @@ -519,6 +524,7 @@ private void fetchOffsetsWithRetries(final OffsetFetchRequestState fetchRequest,
log.warn("A duplicated, inflight, request was identified, but unable to find it in the " +
"outbound buffer:" + fetchRequest);
}
offsetFetchResultCache.maybeCache(fetchRequest, res);
Copy link
Member

@lianetm lianetm Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this here means that we're caching results instead of inflight request, which I'm worried could be risky. Let's review this sequence, I may be missing something:

  • call to fetch offsets for tp0 (fetch1) + times out without response
  • fetch1 completes after it is expired -> we save the results in cache
  • commit offsets for tp0 -> completes successfully
  • call to fetch offsets for tp0 again (fetch2) => won't this reuse the cached result received for fetch1 which is not correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a unit test for this case. Without the commit cache clearing it fails, but with it, it passes.

if (error == null) {
result.complete(res);
} else {
Expand Down Expand Up @@ -1116,6 +1122,7 @@ boolean hasUnsentRequests() {
OffsetCommitRequestState addOffsetCommitRequest(OffsetCommitRequestState request) {
log.debug("Enqueuing OffsetCommit request for offsets: {}", request.offsets);
unsentOffsetCommits.add(request);
offsetFetchResultCache.clear();
return request;
}

Expand All @@ -1127,6 +1134,9 @@ OffsetCommitRequestState addOffsetCommitRequest(OffsetCommitRequestState request
* upon completion.
*/
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(final OffsetFetchRequestState request) {
if (offsetFetchResultCache.maybeCompleteRequest(request))
return request.future;

Optional<OffsetFetchRequestState> dupe =
unsentOffsetFetches.stream().filter(r -> r.sameRequest(request)).findAny();
Optional<OffsetFetchRequestState> inflight =
Expand Down Expand Up @@ -1279,4 +1289,123 @@ static class MemberInfo {
this.memberEpoch = Optional.empty();
}
}

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now crystal clear. Thanks for putting in the extra work.

* As the name suggests, this caches the result of an {@link OffsetFetchRequestState offset fetch request}
* <em>in specific cases</em>. The logic resembles that of {@link ConsumerCoordinator}'s
* <code>PendingCommittedOffsetRequest</code> mechanism.
*
* <p/>
*
* This handles the case where a user calls {@link Consumer#poll(Duration)} with a low timeout value, such as 0.
* As the timeout value approaches zero, the likelihood that the client will be able to fetch the offsets from the
* broker within the user's timeout also decreases. But we can take advantage of the fact that {@code poll()}
* is typically invoked in a tight loop, so the user's application will likely call {@code poll()} again to make
* a second attempt with the same set of {@link TopicPartition}s as the first attempt.
*
* <p/>
*
* The idea here is to cache the results of the last successful--but expired--response. An operation may exceed
* the time the user allotted, but that doesn't mean that the network request is aborted. In this scenario, it is
* often the case that the client receives a successful response, though it has technically exceeded the amount
* of time the user specified. However, as mentioned above, {@code poll()} is likely to be invoked again with the
* same set of partitions. By caching the successful response from attempt #1--though it timed out from the user's
* perspective--the client is able to satisfy attempt #2 from the cache.
Comment on lines +1308 to +1313
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected behavior if there is a commit between the two calls to poll()?
If the first offset fetch returns successfully before the commit is done, the returned offsets would be wrong, wouldn't they?

Copy link
Member

@lianetm lianetm Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good point. If I get it right, the legacy logic does not cache the fetch responses. It caches only the fetch request for which it hasn't received a response yet, with the future response (that can be safely done if the set of partitions requested are the same). As soon as it gets a response it clears the cached request (here). We should never cache the fetch responses because it could definitely lead to the gap that @cadonna is pointing out, if a commit happens in between.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm in the legacy consumer, does the fact that the offset fetch request and the offset commit request use the same network connection ensure that the requests have an order and the commit request cannot overtake the fetch request if commit was sent after fetch? Sorry if this question seems dumb, but I am not familiar with the network part.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so, but mainly because I expect that the broker will handle them one at a time, so it will receive a fetch first, a commit after, and it won't handle the commit until it's done with the previous fetch request. So legacy and new logic, it's all safe I would say until the moment we start caching on the client side. There we have to be very careful not to return cached fetch results (that we may have received before a commit).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a call to clear the cache whenever a commit (of any sort) is enqueued.

*
* <p/>
*
* The cached results are only used if the set of {@link TopicPartition}s and the client's member ID and epoch
* for attempt #2 matches that of attempt #1.
*/
class OffsetFetchResultCache {

private final AtomicReference<OffsetFetchResultDetail> cache = new AtomicReference<>();

/**
* Iff the {@link OffsetFetchRequestState#isExpired() request timed out} <em>and</em> the request was
* successful, we write the result into the cache using the partition set from the request as the 'key.'
* Otherwise, we clear the existing cached value because the current result does not need to be cached, and
* thus there's no need for any previous cached value, either.
*/
void maybeCache(final OffsetFetchRequestState request, final Map<TopicPartition, OffsetAndMetadata> result) {
if (request.isExpired() && result != null) {
OffsetFetchResultDetail newValue = new OffsetFetchResultDetail(request.requestedPartitions, result);
log.debug("A new offset fetch result was cached: {}", newValue);
cache.set(newValue);
} else {
clear();
}
}

/**
* If there's a cached value present that matches the same partitions of the previous request,
* {@link CompletableFuture#complete(Object) complete the Future}. In either case, we clear out the
* cache reference as it's either been used, or is now irrelevant.
*/
boolean maybeCompleteRequest(final OffsetFetchRequestState request) {
OffsetFetchResultDetail oldValue = clear();

if (oldValue != null) {
if (oldValue.sameRequest(request.requestedPartitions)) {
log.debug("The cached offset fetch response matches the requested partitions ({})", request.requestedPartitions);
request.future.complete(oldValue.result);
return true;
} else {
log.debug(
"The requested partitions ({}) do not match the cached offset fetch response's partitions ({}); ignoring cached results",
request.requestedPartitions,
oldValue.requestedPartitions
);
return false;
}
} else {
log.debug("There are no cached offset fetch results to attempt to use");
return false;
}
}

/**
* Retrieves the internal value of the cache. This is intended for testing, but given that the
* {@link OffsetFetchResultDetail} is immutable, it shouldn't hurt to allow access to this.
*/
OffsetFetchResultDetail get() {
return cache.get();
}

OffsetFetchResultDetail clear() {
return cache.getAndSet(null);
}
}

class OffsetFetchResultDetail {

final Set<TopicPartition> requestedPartitions;
final Optional<String> requestedMemberId;
final Optional<Integer> requestedMemberEpoch;
final Map<TopicPartition, OffsetAndMetadata> result;

OffsetFetchResultDetail(final Set<TopicPartition> partitions,
final Map<TopicPartition, OffsetAndMetadata> result) {
this.requestedPartitions = Collections.unmodifiableSet(partitions);
this.requestedMemberId = memberInfo.memberId;
this.requestedMemberEpoch = memberInfo.memberEpoch;
this.result = Collections.unmodifiableMap(result);
}

boolean sameRequest(final Set<TopicPartition> currentPartitions) {
return Objects.equals(requestedPartitions, currentPartitions) &&
Objects.equals(requestedMemberId, memberInfo.memberId) &&
Objects.equals(requestedMemberEpoch, memberInfo.memberEpoch);
}

@Override
public String toString() {
return "OffsetFetchResultDetail{" +
"requestedPartitions=" + requestedPartitions +
", requestedMemberId=" + requestedMemberId +
", requestedMemberEpoch=" + requestedMemberEpoch +
", result=" + result +
'}';
}
}
}
Loading