Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Return cached pausedPartitionSet #620

Merged
merged 4 commits into from
Aug 25, 2023
Merged

fix: Return cached pausedPartitionSet #620

merged 4 commits into from
Aug 25, 2023

Conversation

acktsap
Copy link
Contributor

@acktsap acktsap commented Aug 6, 2023

Resolves: #618

Checklist

  • Documentation (if applicable)
  • Changelog

@youribonnaffe
Copy link
Contributor

Isn't something we would like to handle in https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java the same way metadata are cached between polls?

@jhollandus
Copy link

Isn't something we would like to handle in https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java the same way metadata are cached between polls?

This is a good idea. It would be best to split the ToDoubleFunction away from the AtomicInteger and put the integer into the ConsumerManager poll loop. Finally reference the pauses partitions from the consumer manager in the ToDoubleFunction.

Copy link

@jhollandus jhollandus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@what-the-diff
Copy link

what-the-diff bot commented Aug 11, 2023

PR Summary

  • Fixes added to CHANGELOG.adoc
    Two bug fixes have been implemented. They are primarily aimed at addressing the issue of the parallel consumer sometimes stopping data processing, and also facilitating the return of a cached paused partition set.

  • Update to BrokerPollSystem.java
    A new feature has been introduced in the ConsumerManager class. Specifically, a method named getPausedPartitionSize has been added. This addition should help to efficiently manage and comprehend the size of paused partitions.

  • Modifications in ConsumerManager.java
    Multiple changes have been made to the ConsumerManager class. A field named pausedPartitionSizeCache was added to boost the performance, given it provides caching capabilities. Additionally, the method updateMetadataCache was renamed to updateCache, aligning better with its functionality of routinely updating the cache. Also, a new method getPausedPartitionSize was added to handle the retrieval of paused partition sizes.

  • Change to PCModuleTestEnv.java
    To stay consistent with the main code, the method updateMetadataCache in this file has been renamed to updateCache. This reflects the broader scope of work this method undertakes beyond just updating the metadata.

@acktsap
Copy link
Contributor Author

acktsap commented Aug 11, 2023

Hi, thanks for comments. I moved cache logic into ConsumerManager.

@acktsap acktsap changed the title fix: Return previous value for 'partitions.paused' on ConcurrentModificationException fix: Return previous value for ConsumerManager#paused on ConcurrentModificationException Aug 11, 2023
@acktsap acktsap changed the title fix: Return previous value for ConsumerManager#paused on ConcurrentModificationException fix: Return cached pausedPartitionSet Aug 11, 2023
acktsap and others added 2 commits August 11, 2023 21:13
Signed-off-by: Taeik Lim <sibera21@gmail.com>
@cla-assistant
Copy link

cla-assistant bot commented Aug 11, 2023

CLA assistant check
All committers have signed the CLA.

Copy link
Member

@eddyv eddyv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the latest changes failed a few tests.

I offered some suggestions so that we just keep track of the size itself, which seems to address this issue.

Will need to add the following in BrokerPollSystem.initMetrics() if you choose to go that route.

-                this.consumerManager, consumerManager -> consumerManager.paused().size());
+                this.consumerManager, consumerManager -> consumerManager.getPausedPartitionSize().get());

Signed-off-by: Taeik Lim <sibera21@gmail.com>
@eddyv eddyv requested review from jhollandus and eddyv August 11, 2023 21:01
Copy link
Member

@eddyv eddyv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks 👍

Copy link

@jhollandus jhollandus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@eddyv eddyv merged commit df2271a into confluentinc:master Aug 25, 2023
@acktsap acktsap deleted the fix/return-previous-value-for-partition branch August 28, 2023 07:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Metrics error with 0.5.2.6
4 participants