-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Handle get owned namespaces admin API in ExtensibleLoadManager #20552
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -757,21 +757,42 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) { | |
} | ||
|
||
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, Optional<String> destinationBroker) { | ||
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { | ||
return ExtensibleLoadManagerImpl.get(loadManager.get()) | ||
.unloadNamespaceBundleAsync(bundle, destinationBroker); | ||
} | ||
|
||
// unload namespace bundle | ||
return unloadNamespaceBundle(bundle, config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); | ||
return unloadNamespaceBundle(bundle, destinationBroker, | ||
config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); | ||
} | ||
|
||
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, | ||
Optional<String> destinationBroker, | ||
long timeout, | ||
TimeUnit timeoutUnit) { | ||
return unloadNamespaceBundle(bundle, destinationBroker, timeout, timeoutUnit, true); | ||
} | ||
|
||
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, | ||
long timeout, | ||
TimeUnit timeoutUnit) { | ||
return unloadNamespaceBundle(bundle, Optional.empty(), timeout, timeoutUnit, true); | ||
} | ||
|
||
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout, TimeUnit timeoutUnit) { | ||
return unloadNamespaceBundle(bundle, timeout, timeoutUnit, true); | ||
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, | ||
long timeout, | ||
TimeUnit timeoutUnit, | ||
boolean closeWithoutWaitingClientDisconnect) { | ||
return unloadNamespaceBundle(bundle, Optional.empty(), timeout, | ||
timeoutUnit, closeWithoutWaitingClientDisconnect); | ||
} | ||
|
||
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout, | ||
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, | ||
Optional<String> destinationBroker, | ||
long timeout, | ||
TimeUnit timeoutUnit, | ||
boolean closeWithoutWaitingClientDisconnect) { | ||
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { | ||
return ExtensibleLoadManagerImpl.get(loadManager.get()) | ||
.unloadNamespaceBundleAsync(bundle, destinationBroker); | ||
} | ||
// unload namespace bundle | ||
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle); | ||
if (ob == null) { | ||
|
@@ -790,24 +811,34 @@ public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpac | |
.getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName()) | ||
.thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new)) | ||
.thenCompose(namespaceIsolationPolicies -> { | ||
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { | ||
ExtensibleLoadManagerImpl extensibleLoadManager = | ||
ExtensibleLoadManagerImpl.get(loadManager.get()); | ||
var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream() | ||
.collect(Collectors.toMap(NamespaceBundle::toString, | ||
bundle -> getNamespaceOwnershipStatus(true, | ||
namespaceIsolationPolicies.getPolicyByNamespace( | ||
bundle.getNamespaceObject())))); | ||
return CompletableFuture.completedFuture(statusMap); | ||
} | ||
Collection<CompletableFuture<OwnedBundle>> futures = | ||
ownershipCache.getOwnedBundlesAsync().values(); | ||
return FutureUtil.waitForAll(futures) | ||
.thenApply(__ -> futures.stream() | ||
.map(CompletableFuture::join) | ||
.collect(Collectors.toMap(bundle -> bundle.getNamespaceBundle().toString(), | ||
bundle -> getNamespaceOwnershipStatus(bundle, | ||
bundle -> getNamespaceOwnershipStatus(bundle.isActive(), | ||
namespaceIsolationPolicies.getPolicyByNamespace( | ||
bundle.getNamespaceBundle().getNamespaceObject())) | ||
)) | ||
); | ||
}); | ||
} | ||
|
||
private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj, | ||
private NamespaceOwnershipStatus getNamespaceOwnershipStatus(boolean isActive, | ||
NamespaceIsolationPolicy nsIsolationPolicy) { | ||
NamespaceOwnershipStatus nsOwnedStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false, | ||
nsObj.isActive()); | ||
isActive); | ||
if (nsIsolationPolicy == null) { | ||
// no matching policy found, this namespace must be an uncontrolled one and using shared broker | ||
return nsOwnedStatus; | ||
|
@@ -1103,6 +1134,10 @@ public OwnershipCache getOwnershipCache() { | |
} | ||
|
||
public Set<NamespaceBundle> getOwnedServiceUnits() { | ||
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Honestly, having all these references to a specific implementation (ExtensibleLoadManagerImpl) is a code smell. We should rely on object oriented programming principals. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we must do some refactoring in the feature... We might need to do some abstract for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It appears that the new vs old LM logic variations in NamespaceService are more than what we originally estimated. Yes, we can define NamespaceServiceExtension extends NamespaceService. If the community decides to only maintain the extension logic in the future(I assume this wont happen in the near future), I think we can clean the old LMlogic in NamespaceService too. |
||
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); | ||
return extensibleLoadManager.getOwnedServiceUnits(); | ||
} | ||
return ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle) | ||
.collect(Collectors.toSet()); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the
var
keyword necessary here?While I don't have any objections to using the
var
keyword (I actually use it at work), it's worth considering the need for consistency in the codebase. As I haven't come across this usage in the project so far, it would be beneficial to reach a consensus on the usage ofvar
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"As I haven't come across this usage in the project so far"
In fact, Pulsar already has been using "var".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, is that so? Well, if that's the case, I will take back what I said 👍🏻 Thank you for letting me know.
But, may I ask in which class there is
var
usage? I also saw some in about 10 test classes, but not in production code yet. 🤔There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java#L1141
You can see the
ServiceUnitStateChannelImpl
class.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Demogorgon314 thanks🙏🏽
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we are not using "var" yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, It's ok to use
var
in the new features. Since we are already in the JDK17 andvar
will help avoid very long code causes us to have to break code into multiple lines to followcheckstyle
rule.For existing code, we should avoid using any new API&Feature to prevent getting a compile error when cherry-picking patches to previous branches.