-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer #12874
[broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer #12874
Conversation
9fad6cc
to
1adf3fc
Compare
/pulsarbot run-failure-checks |
4 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
@codelipenghui @hangc0276 @congbobo184 Please help take a look. |
@Jason918 Do you want the resolve the conflicts? |
a2bbd56
to
b918092
Compare
Resolved, sorry for missing this. |
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good change. I think we should probably discuss it on the mailing list before moving forward since it includes breaking changes to the Topic
interface. @eolivelli, @codelipenghui, @hangc0276 - do you agree, or is it minor enough that we can move forward here?
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age)); | ||
backlogQuotaCheckFuture.exceptionally(throwable -> { | ||
//throwable should be CompletionException holding TopicBacklogQuotaExceededException | ||
BrokerServiceException.TopicBacklogQuotaExceededException exception = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if there is something else?
A ClassCastException or a NPE.
This will break the system
I believe we should handle the case in which this expectation is not met
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two situations:
- If
preciseTimeBasedBacklogQuotaCheck
set as true (default is false) and some other constrains met, this will be execute in the callback thread ofManagedLedgerImpl#asyncReadEntry(...)
. - Otherwise everything is executed in previous thread. Nothing async.
CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf( | ||
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage), | ||
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age)); | ||
backlogQuotaCheckFuture.exceptionally(throwable -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which thread will execute this code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am changing the outer thenAccept to thenCompose, the outside exceptionally
will handle all unexpected exceptions.
|
||
disableTcpNoDelayIfNeeded(topicName.toString(), producerName); | ||
backlogQuotaCheckFuture.thenRun(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which thread will execute this code?
The current thread (in case we reach here and the futures are already completed) or some other thread on completion of the one of the two futures above.
I am not sure we have control over what's happening here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The trigger thread is the same as the above backlogQuotaCheckFuture.exceptionally
, it's current thread or ReadEntry call back thread.
@@ -2518,17 +2528,26 @@ public boolean isSizeBacklogExceeded() { | |||
* @return determine if backlog quota enforcement needs to be done for topic based on time limit | |||
*/ | |||
public boolean isTimeBacklogExceeded() { | |||
try { | |||
return checkTimeBacklogExceeded().get(); | |||
} catch (Throwable e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a code smell
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just for keeping the same behavior of isTimeBacklogExceeded
as before. This method is called somewhere else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main problem with this approach is that it completely hides from the caller the fact that the method is internally blocking. We have many other examples of this in the code base
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get it. Thx.
I will remove this isTimeBacklogExceeded
.
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
da61c1b
to
e1196d1
Compare
@eolivelli PTAL |
CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf( | ||
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage), | ||
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age)); | ||
backlogQuotaCheckFuture.exceptionally(throwable -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thenCompose
return the backlogQuotaCheckFuture
to the next stage, is it better to handle the TopicBacklogQuotaExceededException
here https://github.com/apache/pulsar/pull/12874/files#diff-1e0e8195fb5ec5a6d79acbc7d859c025a9b711f94e6ab37c94439e99b3202e84R1262 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui Moved TopicBacklogQuotaExceededException handling outside. PTAL.
@eolivelli Please help review this PR again |
e1196d1
to
1b8ea00
Compare
/pulsarbot run-failure-checks |
@eolivelli Please help review this PR again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
### Motivation We should only send the error response to the client when the code is able to complete the `producerFuture`. This logic is described here: https://github.com/apache/pulsar/blob/2285d02aa9957af7877b9d3d3c628a750d813ca7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1286-L1293 Edit: in a previous version of this motivation section, I attributed the current behavior to #12874. That PR did not introduce this behavior, though. ### Modifications * Move the response to the client into a conditional block that only runs when this section of the code is able to complete the future.
…he#13949) ### Motivation We should only send the error response to the client when the code is able to complete the `producerFuture`. This logic is described here: https://github.com/apache/pulsar/blob/2285d02aa9957af7877b9d3d3c628a750d813ca7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1286-L1293 Edit: in a previous version of this motivation section, I attributed the current behavior to apache#12874. That PR did not introduce this behavior, though. ### Modifications * Move the response to the client into a conditional block that only runs when this section of the code is able to complete the future.
Motivation
Currently, when broker receive "Producer" command, it will check the topic if "isBacklogQuotaExceeded".
While in
PersistentTopic#isBacklogQuotaExceeded
,isTimeBacklogExceeded
is used, in which will turns to a blocking operation if "isPreciseTimeBasedBacklogQuotaCheck" is set as true.The blocking operations in pulsar io threads may impact broker performance, this PR optimized this blocking procedure to async mode.
Modifications
Add "CompletableFuture checkTimeBacklogExceeded()" in PersistentTopic for the async check procedure.
Update corresponding method calls to async mode in
ServerCnx#handleProducer
.Verifying this change
This change is already covered by existing tests, such as org.apache.pulsar.broker.admin.TopicPoliciesTest and org.apache.pulsar.broker.service.BacklogQuotaManagerTest
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below and label this PR (if you have committer privilege).
Need to update docs?
no-need-doc
Only code optimize.