Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add multiple caches for accelerating available container count calculation #667

Merged
merged 69 commits into from
Feb 22, 2024

Conversation

sangreal
Copy link
Contributor

@sangreal sangreal commented Nov 19, 2023

Description...
This pr is for reopening for previous approval pr : #644

Explanation

  1. introduce cache for ready-to-be-retried container count (could be selected to retry containers). Therefore the
    actually available count = all available container count - (still pending for running retry containers count)
  2. The caches calculation happens when containers are selected to run user function (before / after)
  3. The available work containers will change only after the container selection, therefore it is accurate to update the cache at this point.
  4. no extra threading needed in this solution comparing to previous one
  5. The improvement is substantial. The performance of lessKeysThanThreads has been improvement from 01:24 -> 00:38

prev flow

cache_improvement_prev drawio

current flow

cache_improvement_nowdrawio drawio

Possible Question

Q : Previously the expired retry is calculated on fly which looks more accurate than new flow?
A : Actually they will be eventually same since for controlLoop, it will wait for <= (latest retry time) and it will update the caches. Previous flow, the available work container count also only update after (latest retry time)

Checklist

  • Documentation (if applicable)
  • Changelog

@rkolesnev
Copy link
Contributor

rkolesnev commented Feb 9, 2024

There is a bit missing for tracking the counts in ProcessingShard.removeStaleWorkContainersFromShard() - i think it would need to remove from retryQueue, check and decrement expiredRetryContainerCnt and the main availableWorkContainerCnt.

@rkolesnev
Copy link
Contributor

Hmm the more i am getting my head round this the more i think we should solve it in two-fold approach.
Basically we have 3 conditions that make up that count - container meets:
isNotInFlight() && !isUserFunctionSucceeded() && isDelayPassed()
We can directly track the isNotInFlight and !isUserFunctionSucceeded through the counter - as there are specific events that trigger their change - taken as work, succeeded, failed, etc.
But the last one - isDelayPassed() is time based - so we cannot really directly track its state from events - as there are no events - we really should be just checking that by observing during the call to getNumberOfWorkQueuedInShardsAwaitingSelection().

So what i am proposing is to keep the availableWorkContainerCnt and use it to track addition / removal of work containers from shard, and track the isNotInFlight and !isUserFunctionSucceeded() - which it basically does now in this PR, but at the same time remove expiredRetryContainerCnt and retryItemCnt and still scan the retryQueue - at least up to the first container that has its delay not yet expired (as its ordered by retry delay).

@sangreal
Copy link
Contributor Author

@rkolesnev Thanks a lot for your review. I think your solution also a good thought but still have to scan through the retryQueue.

I totally understand your concern but if you could take a look at the above flow diagram and some explanations.
The updates in getNumberOfWorkQueuedInShardsAwaitingSelection could be covered in controlLoop. Because it will only wait for <= (latest retry time = first retryable time) and it will update all caches.
In previous flow, the available work container count also only update after (latest retry time)

https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java#L771

Let me give you an example:
Say now is 00:00, there are two retry items to be retried at 00:02 & 00:05. and there are no pending containers

for old flow:

  1. it will check during every polling in BrokerPollSystem, but the count will stay same which is 0 until 00:02, the count will be 1 and after it processed, it will be back to zero. Until 00:05, the second one will be picked up, then the count will be 1 again.

for new flow:

  1. during every polling in BrokerPollSystem, the count will be not changed since cache is unchanged.
  2. in controlLoop , the Duration timeToBlockFor = shouldTryCommitNow ? Duration.ZERO : getTimeToBlockFor(); , therefore the timeToBlockFor will be <= min(all retry container waiting time). So at 00:02, the controlLoop will be run and updates (there is no blocking call in this loop), and count (expiredRetryContainerCnt -> 1) will be update to 1. Same applied to the second until 00:05, controlLoop will be processed to update the count to 1.

Conclusion

  1. so as you could see, the updates timing is the same between previous and new flow

@rkolesnev
Copy link
Contributor

rkolesnev commented Feb 12, 2024

@sangreal - i had tested the calculations using integration tests and they are matching my explanation below, i can provide the test if needed - i had existing test modified in place to have a large retry queue that is drained slowly and observed the getNumberOfWorkQueuedInShardsAwaitingSelection() count staying at 0 during processing of retryQueue.

The problem with updating the available work that is in retry queue and has its delay elapsed is that the expiredRetryContainerCnt is only ever increased in getWorkIfAvailable() - and that only scans up to the number of work requested workToGetDelta:

         while (workTaken.size() < workToGetDelta && iterator.hasNext()) {
            var workContainer = iterator.next().getValue();

            if (pm.couldBeTakenAsWork(workContainer)) {
                if (workContainer.isAvailableToTakeAsWork()) {
                    log.trace("Taking {} as work", workContainer);

                    // only increase the ExpiredRetryContainerCnt when this is retry due since already added in the new container creation
                    if (workContainer.isDelayExistsExpired()) {
                        expiredRetryContainerCnt.incrementAndGet();
                    }
                    workContainer.onQueueingForExecution();
                    workTaken.add(workContainer);
                } else {
                ...

For example if we have 1000 items in the retry queue, all with delay of 5 seconds - once the delay elapsed - all 1000 are available for work - but we would only update the available work as we are taking them into processing and say with 16 processing threads - we would take only 16 and the rest will still be in retry queue but not counted as available work.
On top of that - as we are taking them as work - they will get marked as inFlight and be excluded from the available work count straight away.
So effectively they will never show up as available work.
Implications of this is that BrokerPoller will be polling for more work instead of applying back-pressure and pausing consumers - which may lead to OOM / really deep queues etc in extreme / edge cases.

It all goes back to the fact that the retry delay is time based - and the only way to really know which/how many containers in retry queue have their retry delay elapsed and are available as work - is to scan the retry queue.

It does not have to be a full scan - as the retry queue is sorted by nextRetryDueAt - so it would be enough to scan up to the first container that still has the delay that is not elapsed to get the count of available / not available as work in retry queue.

In general i don't think scanning retry queue will have big performance implications - with thinking that - if the retry queue is small - then the scan is fast and if the retry queue is large - than we are in bad state anyway and probably not that concerned about the overhead introduced by scanning it - as processing is already slowed down by having a lot of messages to retry.

@sangreal
Copy link
Contributor Author

@rkolesnev Thanks for the detailed explanation. I got your points.
I miss the workToGetDelta in getWorkIfAvailable which will indeed lead to some missing containers state updating.
Let me work on updates based on your suggestions.

@sangreal
Copy link
Contributor Author

@rkolesnev I have updated the pr according to your suggestions, meanwhile I keep the retryItemCnt since this count is accurate and will help the accelerate the expired items calculation while expiredRetryContainerCnt is removed.
Please help review again.

@sangreal
Copy link
Contributor Author

@rkolesnev Thanks for your detailed review. I have fixed according to your comment, except for one. Please help review again

@sangreal
Copy link
Contributor Author

@rkolesnev

// 2. the container has been selected and it is inflight but we already slashed them from availableWorkContainerCnt, so should be counted in
This is not correct - we do not count inflight containers as available for selection.
The intent of the getCountOfWorkAwaitingSelection is to get a number for all work that is ready to be processed - to determine if Consumer should poll for more messages - or there is already enough queued for processing - so that excludes work that is already being processed / inflight.

Regarding this, getCountOfWorkAwaitingSelection indeed do get a number for all work that is ready to be processed and excludes inflight messages in the updated logic. This could be reflected in UT.
I think this pr's goal is
(1) ensure getCountOfWorkAwaitingSelection return the correct number
(2) performance improvement
(3) makes the code as concise and clean as possible.
But simple copy old logic might not be the final goal. Looking forward to hearing your idea on this one.

@rkolesnev
Copy link
Contributor

rkolesnev commented Feb 14, 2024

@rkolesnev

// 2. the container has been selected and it is inflight but we already slashed them from availableWorkContainerCnt, so should be counted in
This is not correct - we do not count inflight containers as available for selection.
The intent of the getCountOfWorkAwaitingSelection is to get a number for all work that is ready to be processed - to determine if Consumer should poll for more messages - or there is already enough queued for processing - so that excludes work that is already being processed / inflight.

Regarding this, getCountOfWorkAwaitingSelection indeed do get a number for all work that is ready to be processed and excludes inflight messages in the updated logic. This could be reflected in UT. I think this pr's goal is (1) ensure getCountOfWorkAwaitingSelection return the correct number (2) performance improvement (3) makes the code as concise and clean as possible. But simple copy old logic might not be the final goal. Looking forward to hearing your idea on this one.

Sure - but we have to keep the logic uniform - we cannot exclude inflight work that was never retried, but include inflight work that is being retried - that would just give a weird number / behaviour that differs based on wether the messages were retried or not.
The check would be done only on WorkContainers that are in retry queue and are ready to be retried to determine if they are in fact still in the queue (thus available to be taken as work) or already in flight / being processed (thus not ready to be taken as work).

Let me have another look at the code - i am thinking if availableWorkContainerCnt is already being decremented when we take work inflight - regardless of is it from retry queue or not - so we maybe already accounting for them that way...

@rkolesnev
Copy link
Contributor

Let me have another look at the code - i am thinking if availableWorkContainerCnt is already being decremented when we take work inflight - regardless of is it from retry queue or not - so we maybe already accounting for them that way...

Ok - yeah - that is already taken care of by decrementing availableWorkContainerCnt in the ProcessingShard.getWorkIfAvailable(...) call - when we take work into processing. So you are right - we do not need to exclude them again when counting items in retry queue.

@rkolesnev
Copy link
Contributor

Ok - so i am happy enough with it - thank you very much for going back and forth with the PR with me.
Only outstanding bit left is to take care of retryQueue in ShardManager when removing stale work.

@sangreal
Copy link
Contributor Author

Ok - so i am happy enough with it - thank you very much for going back and forth with the PR with me. Only outstanding bit left is to take care of retryQueue in ShardManager when removing stale work.

Thanks for your time for the review! Let me change a bit on my previous pr's code related to stale containers handling and get back to you

@sangreal
Copy link
Contributor Author

@rkolesnev please help check the updates on the stale container removal for retryQueue, thanks a lot for your review again.

@rkolesnev rkolesnev self-requested a review February 22, 2024 13:07
@rkolesnev
Copy link
Contributor

Hi @sangreal - the PR is ready to be merged - can you please sign the Contributor License Agreement (CLA)?
You can access it through license/cla status - Details.

@sangreal
Copy link
Contributor Author

sangreal commented Feb 22, 2024

Hi @sangreal - the PR is ready to be merged - can you please sign the Contributor License Agreement (CLA)? You can access it through license/cla status - Details.

@rkolesnev I find it quite weird since I signed it last year already since I already contributed. And when I try to check, I could not signed since it showed I already signed. Please let me know if this is a blocker. If it is, I will revoke this one and try sign again.

@rkolesnev rkolesnev merged commit 20f8b27 into confluentinc:master Feb 22, 2024
1 of 3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants