Skip to content

KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively#16241

Closed
kirktrue wants to merge 23 commits intoapache:trunkfrom
kirktrue:KAFKA-16637-cache-offset-fetch-result
Closed

KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively#16241
kirktrue wants to merge 23 commits intoapache:trunkfrom
kirktrue:KAFKA-16637-cache-offset-fetch-result

Conversation

@kirktrue
Copy link
Contributor

@kirktrue kirktrue commented Jun 7, 2024

The LegacyKafkaConsumer employs a simple caching scheme when fetching offset requests. ConsumerCoordinator.fetchCommittedOffsets() reuses the same request across multiple attempts, as long as the set of partitions remain the same for each successive attempt. This is critical for cases where the user invokes Consumer.poll(0) because the first few fetch offsets attempts will not have enough time to complete.

The AsyncKafkaConsumer has a similar approach, but a logic bug prevented the request from being reused across multiple attempts since it removed the cached request immediately when it completed.

This change allows us to keep the offset fetch request in the inflight list when it succeeds. This allows the next offset fetch attempt to find the previous attempt in the inflight list and reuse its result.

Some existing tests that were disabled for CONSUMER are now enabled by this change.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

kirktrue added 2 commits June 7, 2024 07:19
…ache too aggressively

Don't remove the successful-but-expired offset fetch request from the inflight list until the next request.
@kirktrue kirktrue force-pushed the KAFKA-16637-cache-offset-fetch-result branch from 0c16425 to 3a4ad22 Compare June 7, 2024 17:55
@kirktrue kirktrue marked this pull request as ready for review June 7, 2024 22:38
@cadonna cadonna self-requested a review June 10, 2024 11:05
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I'm sure the undesirable behaviour previously has been resolved by the change, but I suggest it needs further refactoring for maintainability. For example, in the addOffsetFetchRequest method, there are retries and expiration and chaining of futures all mixed in together.

log.debug("Duplicated unsent offset fetch request found for partitions: {}", request.requestedPartitions);
dupe.orElseGet(inflight::get).chainFuture(request.future);
dupe.get().chainFuture(request.future);
} else if (inflight.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this area of de-duplicating requests needs a good deal of simplification still. This piece of code in particular is more tricky to understand than is necessary. Here's an example.

  • There's a duplicate request in the in-flight requests
  • We chain the new request's future onto the existing request
  • But the existing request might already been done
  • So, then we remove the existing request from the in-flight requests

Does the chaining of futures make the slightest sense in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield—I totally agree 😄

The initial pass was my attempt to shoehorn in a fix while disturbing the surrounding code as little as possible. I wanted to keep things as localized as possible, given the deadline.

I am working on an approach that is hopefully more sane.

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @kirktrue !

I made a first pass.


// The incoming offset fetch request isn't in the unsent or inflight buffers, which means we don't
// need to keep track of the entry in the inflight buffer any longer.
inflightOffsetFetches.removeIf(r -> request.isExpired());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this line not remove all elements of inflightOffsetFetches if the request in request is expired? Should this rather be:

inflightOffsetFetches.removeIf(r -> r.isExpired());

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you try to remove here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂️

Comment on lines 517 to 521
if (!fetchRequest.isExpired()) {
boolean inflightRemoved = pendingRequests.inflightOffsetFetches.remove(fetchRequest);
if (!inflightRemoved) {
log.warn("The response for the offset fetch request for partitions {} was not found in the inflight buffer", fetchRequest.requestedPartitions);
}
Copy link
Member

@cadonna cadonna Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is the request removed from the inflightOffsetFetches when it is expired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is removed later, when the user calls addOffsetFetchRequest() with a set of partitions that doesn't match any in the unsent or inflight queues.

However, this change has been reverted, but I'll double check how the new approach handles it.

@kirktrue kirktrue marked this pull request as draft June 11, 2024 02:12
}
}

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now crystal clear. Thanks for putting in the extra work.

Comment on lines +1307 to +1312
* The idea here is to cache the results of the last successful--but expired--response. An operation may exceed
* the time the user allotted, but that doesn't mean that the network request is aborted. In this scenario, it is
* often the case that the client receives a successful response, though it has technically exceeded the amount
* of time the user specified. However, as mentioned above, {@code poll()} is likely to be invoked again with the
* same set of partitions. By caching the successful response from attempt #1--though it timed out from the user's
* perspective--the client is able to satisfy attempt #2 from the cache.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected behavior if there is a commit between the two calls to poll()?
If the first offset fetch returns successfully before the commit is done, the returned offsets would be wrong, wouldn't they?

Copy link
Member

@lianetm lianetm Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good point. If I get it right, the legacy logic does not cache the fetch responses. It caches only the fetch request for which it hasn't received a response yet, with the future response (that can be safely done if the set of partitions requested are the same). As soon as it gets a response it clears the cached request (here). We should never cache the fetch responses because it could definitely lead to the gap that @cadonna is pointing out, if a commit happens in between.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm in the legacy consumer, does the fact that the offset fetch request and the offset commit request use the same network connection ensure that the requests have an order and the commit request cannot overtake the fetch request if commit was sent after fetch? Sorry if this question seems dumb, but I am not familiar with the network part.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so, but mainly because I expect that the broker will handle them one at a time, so it will receive a fetch first, a commit after, and it won't handle the commit until it's done with the previous fetch request. So legacy and new logic, it's all safe I would say until the moment we start caching on the client side. There we have to be very careful not to return cached fetch results (that we may have received before a commit).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a call to clear the cache whenever a commit (of any sort) is enqueued.

log.warn("A duplicated, inflight, request was identified, but unable to find it in the " +
"outbound buffer:" + fetchRequest);
}
offsetFetchResultCache.maybeCache(fetchRequest, res);
Copy link
Member

@lianetm lianetm Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this here means that we're caching results instead of inflight request, which I'm worried could be risky. Let's review this sequence, I may be missing something:

  • call to fetch offsets for tp0 (fetch1) + times out without response
  • fetch1 completes after it is expired -> we save the results in cache
  • commit offsets for tp0 -> completes successfully
  • call to fetch offsets for tp0 again (fetch2) => won't this reuse the cached result received for fetch1 which is not correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a unit test for this case. Without the commit cache clearing it fails, but with it, it passes.

@lianetm
Copy link
Member

lianetm commented Jun 11, 2024

High level comment about the approach. Getting into caching results of fetch requests seems very tricky, given how easily they can become stale. The simple dedup approach to ensure we don't send a fetch request if there is another one in-flight for the same partitions seemed simple and safe (but not enough for poll, agreed). The problem was: fetch requests issued from within the poll loop were being expired with the same poll timeout, which could prevent the consumer from ever fetching offsets on low poll timeout.

Sorry if I'm missing something, but couldn't we simply ensure that fetch requests issued from the poll have no timeout, and we apply the timeout only when getting responses. This means that we would leave the fetch request running, and on the next call to poll it would be either completed (so we would get the results), or in-flight (as long as the assignment does not change, we wouldn't issue a new fetch). The change I refer to is this snippet from initWithCommittedOffsetsIfNeeded. It currently does fetch with timeout + get results.
But what about just moving the timeout to the get result?

  1. issue fetch (max_value as timeout) -> will run across poll iterations for the same set of partitions
  2. get result with poll timeout -> this ensures that we respect the poll iteration timeout, while leaving fetch request running to be used on the next poll.

Would this work to solve our problem? (while avoiding caching offsets on the client).

@kirktrue
Copy link
Contributor Author

@lianetm—thanks for your suggestion for handling it at the AsyncKafkaConsumer layer.

But what about just moving the timeout to the get result?

  1. issue fetch (max_value as timeout) -> will run across poll iterations for the same set of partitions
  2. get result with poll timeout -> this ensures that we respect the poll iteration timeout, while leaving fetch request running to be used on the next poll.

Here's the version of initWithCommittedOffsetsIfNeeded() in trunk:

final FetchCommittedOffsetsEvent event =
    new FetchCommittedOffsetsEvent(
        initializingPartitions,
        calculateDeadlineMs(timer));
wakeupTrigger.setActiveTask(event.future());
final Map<TopicPartition, OffsetAndMetadata> offsets = applicationEventHandler.addAndGet(event);
refreshCommittedOffsets(offsets, metadata, subscriptions);
return true;

Attempt #1: change to use a "timed" version of Future.get() with the user's Timer:

final FetchCommittedOffsetsEvent event =
    new FetchCommittedOffsetsEvent(
        initializingPartitions,
        Long.MAX_VALUE);
wakeupTrigger.setActiveTask(event.future());
applicationEventHandler.add(event);
final Map<TopicPartition, OffsetAndMetadata> offsets = ConsumerUtils.getResult(event.future(), timer);
refreshCommittedOffsets(offsets, metadata, subscriptions);
return true;

When I run the new integration test, it prints 'couldn't refresh committed offsets' hundreds of times and eventually fails.

Attempt #2: change to use a "timed" version of Future.get() with a Timer of 1 millisecond:

final FetchCommittedOffsetsEvent event =
    new FetchCommittedOffsetsEvent(
        initializingPartitions,
        Long.MAX_VALUE);
wakeupTrigger.setActiveTask(event.future());
applicationEventHandler.add(event);
final Map<TopicPartition, OffsetAndMetadata> offsets = ConsumerUtils.getResult(event.future(), time.timer(1));
refreshCommittedOffsets(offsets, metadata, subscriptions);
return true;

That passes the new integration test 🤷‍♂️

I haven't worked out why the Timer of 0 milliseconds fails, but I believe it's because the code considers a timeout of 0 to be an automatic hard fail, even if we have the result on hand 😢

@lianetm
Copy link
Member

lianetm commented Jun 12, 2024

Hey @kirktrue, I don't expect the code snippets you shared in attempt1 would work because in the end it creates a new event every time.

The approach I had in mind is more like what the legacy does by keeping a PendingCommittedOffsetRequest. We should keep a pendingFetchEvent that we would reuse on polls as long as the set of partitions are the same. Then we simply need to update the initWithCommittedOffsetsIfNeeded , instead of :

        final FetchCommittedOffsetsEvent event =
            new FetchCommittedOffsetsEvent(
                initializingPartitions,
                calculateDeadlineMs(timer));
        wakeupTrigger.setActiveTask(event.future());
        final Map<TopicPartition, OffsetAndMetadata> offsets = applicationEventHandler.addAndGet(event);

We would:

       // TODO: also consider if the set of initializingPartitions are not the same, would need a new request
        if (pendingFetchEvent == null) {
            pendingFetchEvent = new FetchCommittedOffsetsEvent(
                initializingPartitions,
                Long.MAX_VALUE);
            wakeupTrigger.setActiveTask(pendingFetchEvent.future());
            applicationEventHandler.add(pendingFetchEvent);
        }

        final Map<TopicPartition, OffsetAndMetadata> offsets = ConsumerUtils.getResult(pendingFetchEvent.future(), timer);
        pendingFetchEvent = null;

With that the new integration test passes! This is a rough try I gave at this, it still needs the TODO, and I guess we should consider how to expire that FetchEvent with max_value timeout that may stay forever in the background.

With this we would have a really simple approach, very similar to the legacy one, which feels safe, but I could be missing details of why it wouldn't work? Just sharing in case it helps but happy to review the current PR approach and align on it if you think it's best at this point.

@kirktrue
Copy link
Contributor Author

@lianetm @cadonna @AndrewJSchofield—PTAL at an alternate implementation of this Jira: #16310. Thanks!

@kirktrue
Copy link
Contributor Author

Closing this PR as we have achieved consensus around the simpler solution in #16310.

@kirktrue kirktrue closed this Jun 13, 2024
@kirktrue kirktrue deleted the KAFKA-16637-cache-offset-fetch-result branch June 19, 2024 19:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants