-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Optimize PersistentTopic.getLastDispatchablePosition #22707
[improve][broker] Optimize PersistentTopic.getLastDispatchablePosition #22707
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job! LGTM
@coderzc PTAL |
...r/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
Show resolved
Hide resolved
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #22707 +/- ##
============================================
- Coverage 73.57% 73.29% -0.29%
- Complexity 32624 33015 +391
============================================
Files 1877 1891 +14
Lines 139502 141969 +2467
Branches 15299 15571 +272
============================================
+ Hits 102638 104051 +1413
- Misses 28908 29893 +985
- Partials 7956 8025 +69
Flags with carried forward coverage won't be shown. Click here to find out more.
|
@@ -459,6 +464,8 @@ void removeTxnAndUpdateMaxReadPosition(TxnID txnID) { | |||
} else { | |||
updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false); | |||
} | |||
// Update the last dispatchable position to null if there is a TXN finished. | |||
updateLastDispatchablePosition(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, Sorry, I overlooked this. If you do not set it to null here, but set a flag that requires reading BK, you can reduce the response time of the next request. I mentioned here before, but maybe not in detail.
- end txn.
- set a flag 'readFromBK' as true
- update normal message
- set a flag 'readFromBK' as false
- when call
getLastDispatchablePosition
, if the flag == true, thelastDispatchablePosition
is valid to return. And read from BK after return. - Call
getLastDispatchablePosition
again, it can get the lastgetLastDispatchablePosition
.
How do you think about this?
#22707) [PersistentTopic#getLastDispatchablePosition](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3776-L3788) is using by [Reader#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java#L116) , [ConsumerImpl#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2440-L2448), [Consumer#getLastMessageIdAsync](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L591-L615). The current implementation is read entries from Bookkeeper(or sth else), which leads to low throughput, high latency and heavy load, this PR is for the purpose of optimization. (cherry picked from commit 266243c)
apache#22707) [PersistentTopic#getLastDispatchablePosition](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3776-L3788) is using by [Reader#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java#L116) , [ConsumerImpl#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2440-L2448), [Consumer#getLastMessageIdAsync](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L591-L615). The current implementation is read entries from Bookkeeper(or sth else), which leads to low throughput, high latency and heavy load, this PR is for the purpose of optimization. (cherry picked from commit 266243c) (cherry picked from commit 912ae3c)
#22707) [PersistentTopic#getLastDispatchablePosition](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3776-L3788) is using by [Reader#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java#L116) , [ConsumerImpl#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2440-L2448), [Consumer#getLastMessageIdAsync](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L591-L615). The current implementation is read entries from Bookkeeper(or sth else), which leads to low throughput, high latency and heavy load, this PR is for the purpose of optimization. (cherry picked from commit 266243c)
apache#22707) [PersistentTopic#getLastDispatchablePosition](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3776-L3788) is using by [Reader#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java#L116) , [ConsumerImpl#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2440-L2448), [Consumer#getLastMessageIdAsync](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L591-L615). The current implementation is read entries from Bookkeeper(or sth else), which leads to low throughput, high latency and heavy load, this PR is for the purpose of optimization. (cherry picked from commit 266243c) (cherry picked from commit 912ae3c)
apache#22707) [PersistentTopic#getLastDispatchablePosition](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3776-L3788) is using by [Reader#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java#L116) , [ConsumerImpl#hasMessageAvailable](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2440-L2448), [Consumer#getLastMessageIdAsync](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java#L591-L615). The current implementation is read entries from Bookkeeper(or sth else), which leads to low throughput, high latency and heavy load, this PR is for the purpose of optimization. (cherry picked from commit 266243c) (cherry picked from commit 912ae3c)
Motivation
PersistentTopic#getLastDispatchablePosition is using by
Reader#hasMessageAvailable , ConsumerImpl#hasMessageAvailable, Consumer#getLastMessageIdAsync.
The current implementation is read entries from Bookkeeper(or sth else), which leads to low throughput, high latency and heavy load, this PR is for the purpose of optimization.
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: