-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] PIP-192 updated metrics and cleanup broker selector #19945
Conversation
92a9f59
to
131868d
Compare
Raised a load-balance dashboard PR here. streamnative/apache-pulsar-grafana-dashboard#93 |
131868d
to
05d3e4b
Compare
Optional<String> selectedBroker = brokerSelector.select(availableBrokers, null, context); | ||
private Optional<String> selectBroker(String serviceUnit) { | ||
var namespaceBundle = getNamespaceBundle(serviceUnit); | ||
if (namespaceBundle.getNamespaceObject().equals(SYSTEM_NAMESPACE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need to skip bundles of the system namespace? I found the method ExtensibleLoadManagerImpl#assign
will ignore some internal topics and use the channel owner as the owner broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. We shouldn't. Updated.
private final PulsarService pulsar; | ||
private final ServiceConfiguration config; | ||
private final Schema<ServiceUnitStateData> schema; | ||
private final ConcurrentOpenHashMap<String, CompletableFuture<String>> getOwnerRequests; | ||
private final String lookupServiceAddress; | ||
private final ConcurrentOpenHashMap<String, CompletableFuture<Void>> cleanupJobs; | ||
private final StateChangeListeners stateChangeListeners; | ||
private BrokerSelectionStrategy brokerSelector; | ||
private ExtensibleLoadManagerImpl brokerSelector; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should change the field name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
@@ -329,9 +337,9 @@ protected LoadManagerContext getContext() { | |||
} | |||
|
|||
@VisibleForTesting | |||
protected BrokerSelectionStrategy getBrokerSelector() { | |||
protected ExtensibleLoadManagerImpl getBrokerSelector() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the previous comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
+ " This broker is not the owner.", bundle)); | ||
counter.update(Failure, Unknown); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need update counter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. This can happen a lot, and we don't want to add noise in failure count.
@gaoran10 @Demogorgon314 PTAL by any chance. |
@@ -465,7 +472,7 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) { | |||
|
|||
ServiceUnitStateData data = tableview.get(serviceUnit); | |||
ServiceUnitState state = state(data); | |||
ownerLookUpCounters.get(state).incrementAndGet(); | |||
ownerLookUpCounters.get(state).total.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ownerLookUpCounters.get(state).total.incrementAndGet(); | |
ownerLookUpCounters.get(state).getTotal().incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
return deferGetOwnerRequest(serviceUnit).thenApply( | ||
return deferGetOwnerRequest(serviceUnit).whenComplete((__, e) -> { | ||
if (e != null) { | ||
ownerLookUpCounters.get(state).failure.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ownerLookUpCounters.get(state).failure.incrementAndGet(); | |
ownerLookUpCounters.get(state).getFailure().incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
broker -> broker == null ? Optional.empty() : Optional.of(broker)); | ||
} | ||
case Init, Free -> { | ||
return CompletableFuture.completedFuture(Optional.empty()); | ||
} | ||
case Deleted -> { | ||
ownerLookUpCounters.get(state).failure.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ownerLookUpCounters.get(state).failure.incrementAndGet(); | |
ownerLookUpCounters.get(state).getFailure().incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
return CompletableFuture.failedFuture(new IllegalArgumentException(serviceUnit + " is deleted.")); | ||
} | ||
default -> { | ||
ownerLookUpCounters.get(state).failure.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ownerLookUpCounters.get(state).failure.incrementAndGet(); | |
ownerLookUpCounters.get(state).getFailure().incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
@gaoran10 @Technoboy- PTAL by any chance. |
/pulsarbot rerun-failure-checks |
Master Issue: #16691
Motivation
Raising a PR to implement: #16691
Modifications
This PR
Override
EventType metricsVerifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
We will have separate PRs to update the Doc later.
Matching PR in forked repository
PR in forked repository: heesung-sn#40