Skip to content

Commit 9dce020

Browse files
Demogorgon314Technoboy-
authored andcommitted
[improve][broker] Handle get owned namespaces admin API in ExtensibleLoadManager (#20552)
1 parent 32ba747 commit 9dce020

File tree

5 files changed

+133
-25
lines changed

5 files changed

+133
-25
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
2424
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
2525
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
26+
import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
2627
import com.google.common.annotations.VisibleForTesting;
2728
import java.io.IOException;
2829
import java.util.ArrayList;
@@ -37,16 +38,20 @@
3738
import java.util.concurrent.ScheduledFuture;
3839
import java.util.concurrent.TimeUnit;
3940
import java.util.concurrent.atomic.AtomicReference;
41+
import java.util.stream.Collectors;
4042
import lombok.Getter;
4143
import lombok.extern.slf4j.Slf4j;
44+
import org.apache.commons.lang.StringUtils;
4245
import org.apache.pulsar.broker.PulsarServerException;
4346
import org.apache.pulsar.broker.PulsarService;
4447
import org.apache.pulsar.broker.ServiceConfiguration;
4548
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
4649
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
4750
import org.apache.pulsar.broker.loadbalance.LoadManager;
51+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
4852
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
4953
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
54+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
5055
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
5156
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
5257
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
@@ -170,7 +175,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
170175
private final SplitCounter splitCounter = new SplitCounter();
171176

172177
// record unload metrics
173-
private final AtomicReference<List<Metrics>> unloadMetrics = new AtomicReference();
178+
private final AtomicReference<List<Metrics>> unloadMetrics = new AtomicReference<>();
174179
// record split metrics
175180
private final AtomicReference<List<Metrics>> splitMetrics = new AtomicReference<>();
176181

@@ -180,6 +185,24 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
180185
.build();
181186
private final CountDownLatch initWaiter = new CountDownLatch(1);
182187

188+
/**
189+
* Get all the bundles that are owned by this broker.
190+
*/
191+
public Set<NamespaceBundle> getOwnedServiceUnits() {
192+
Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
193+
String brokerId = brokerRegistry.getBrokerId();
194+
return entrySet.stream()
195+
.filter(entry -> {
196+
var stateData = entry.getValue();
197+
return stateData.state() == ServiceUnitState.Owned
198+
&& StringUtils.isNotBlank(stateData.dstBroker())
199+
&& stateData.dstBroker().equals(brokerId);
200+
}).map(entry -> {
201+
var bundle = entry.getKey();
202+
return getNamespaceBundle(pulsar, bundle);
203+
}).collect(Collectors.toSet());
204+
}
205+
183206
public enum Role {
184207
Leader,
185208
Follower

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+6-10
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,8 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
703703
stateChangeListeners.notify(serviceUnit, data, null);
704704
if (isTargetBroker(data.dstBroker())) {
705705
log(null, serviceUnit, data, null);
706-
pulsar.getNamespaceService().onNamespaceBundleOwned(getNamespaceBundle(serviceUnit));
706+
pulsar.getNamespaceService()
707+
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit));
707708
lastOwnEventHandledAt = System.currentTimeMillis();
708709
} else if (data.force() && isTargetBroker(data.sourceBroker())) {
709710
closeServiceUnit(serviceUnit);
@@ -803,12 +804,6 @@ private boolean isTargetBroker(String broker) {
803804
return broker.equals(lookupServiceAddress);
804805
}
805806

806-
private NamespaceBundle getNamespaceBundle(String bundle) {
807-
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
808-
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
809-
return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange);
810-
}
811-
812807
private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
813808
return getOwnerRequests
814809
.computeIfAbsent(serviceUnit, k -> {
@@ -829,7 +824,7 @@ private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
829824
private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
830825
long startTime = System.nanoTime();
831826
MutableInt unloadedTopics = new MutableInt();
832-
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
827+
NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit);
833828
return pulsar.getBrokerService().unloadServiceUnit(
834829
bundle,
835830
true,
@@ -860,7 +855,7 @@ private CompletableFuture<Void> splitServiceUnit(String serviceUnit, ServiceUnit
860855
long startTime = System.nanoTime();
861856
NamespaceService namespaceService = pulsar.getNamespaceService();
862857
NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory();
863-
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
858+
NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit);
864859
CompletableFuture<Void> completionFuture = new CompletableFuture<>();
865860
Map<String, Optional<String>> bundleToDestBroker = data.splitServiceUnitToDestBroker();
866861
List<Long> boundaries = null;
@@ -1275,7 +1270,8 @@ private synchronized void doCleanup(String broker) {
12751270

12761271
private Optional<String> selectBroker(String serviceUnit, String inactiveBroker) {
12771272
try {
1278-
return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Set.of(inactiveBroker))
1273+
return loadManager.selectAsync(
1274+
LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit), Set.of(inactiveBroker))
12791275
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
12801276
} catch (Throwable e) {
12811277
log.error("Failed to select a broker for serviceUnit:{}", serviceUnit);

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java

+6
Original file line numberDiff line numberDiff line change
@@ -711,4 +711,10 @@ public static void refreshBrokerToFailureDomainMap(PulsarService pulsar,
711711
LOG.warn("Failed to get domain-list for cluster {}", e.getMessage());
712712
}
713713
}
714+
715+
public static NamespaceBundle getNamespaceBundle(PulsarService pulsar, String bundle) {
716+
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
717+
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
718+
return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange);
719+
}
714720
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

+46-11
Original file line numberDiff line numberDiff line change
@@ -757,21 +757,42 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) {
757757
}
758758

759759
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, Optional<String> destinationBroker) {
760-
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
761-
return ExtensibleLoadManagerImpl.get(loadManager.get())
762-
.unloadNamespaceBundleAsync(bundle, destinationBroker);
763-
}
760+
764761
// unload namespace bundle
765-
return unloadNamespaceBundle(bundle, config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
762+
return unloadNamespaceBundle(bundle, destinationBroker,
763+
config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
764+
}
765+
766+
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
767+
Optional<String> destinationBroker,
768+
long timeout,
769+
TimeUnit timeoutUnit) {
770+
return unloadNamespaceBundle(bundle, destinationBroker, timeout, timeoutUnit, true);
771+
}
772+
773+
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
774+
long timeout,
775+
TimeUnit timeoutUnit) {
776+
return unloadNamespaceBundle(bundle, Optional.empty(), timeout, timeoutUnit, true);
766777
}
767778

768-
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout, TimeUnit timeoutUnit) {
769-
return unloadNamespaceBundle(bundle, timeout, timeoutUnit, true);
779+
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
780+
long timeout,
781+
TimeUnit timeoutUnit,
782+
boolean closeWithoutWaitingClientDisconnect) {
783+
return unloadNamespaceBundle(bundle, Optional.empty(), timeout,
784+
timeoutUnit, closeWithoutWaitingClientDisconnect);
770785
}
771786

772-
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout,
787+
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
788+
Optional<String> destinationBroker,
789+
long timeout,
773790
TimeUnit timeoutUnit,
774791
boolean closeWithoutWaitingClientDisconnect) {
792+
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
793+
return ExtensibleLoadManagerImpl.get(loadManager.get())
794+
.unloadNamespaceBundleAsync(bundle, destinationBroker);
795+
}
775796
// unload namespace bundle
776797
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
777798
if (ob == null) {
@@ -790,24 +811,34 @@ public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpac
790811
.getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName())
791812
.thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))
792813
.thenCompose(namespaceIsolationPolicies -> {
814+
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
815+
ExtensibleLoadManagerImpl extensibleLoadManager =
816+
ExtensibleLoadManagerImpl.get(loadManager.get());
817+
var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream()
818+
.collect(Collectors.toMap(NamespaceBundle::toString,
819+
bundle -> getNamespaceOwnershipStatus(true,
820+
namespaceIsolationPolicies.getPolicyByNamespace(
821+
bundle.getNamespaceObject()))));
822+
return CompletableFuture.completedFuture(statusMap);
823+
}
793824
Collection<CompletableFuture<OwnedBundle>> futures =
794825
ownershipCache.getOwnedBundlesAsync().values();
795826
return FutureUtil.waitForAll(futures)
796827
.thenApply(__ -> futures.stream()
797828
.map(CompletableFuture::join)
798829
.collect(Collectors.toMap(bundle -> bundle.getNamespaceBundle().toString(),
799-
bundle -> getNamespaceOwnershipStatus(bundle,
830+
bundle -> getNamespaceOwnershipStatus(bundle.isActive(),
800831
namespaceIsolationPolicies.getPolicyByNamespace(
801832
bundle.getNamespaceBundle().getNamespaceObject()))
802833
))
803834
);
804835
});
805836
}
806837

807-
private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj,
838+
private NamespaceOwnershipStatus getNamespaceOwnershipStatus(boolean isActive,
808839
NamespaceIsolationPolicy nsIsolationPolicy) {
809840
NamespaceOwnershipStatus nsOwnedStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false,
810-
nsObj.isActive());
841+
isActive);
811842
if (nsIsolationPolicy == null) {
812843
// no matching policy found, this namespace must be an uncontrolled one and using shared broker
813844
return nsOwnedStatus;
@@ -1103,6 +1134,10 @@ public OwnershipCache getOwnershipCache() {
11031134
}
11041135

11051136
public Set<NamespaceBundle> getOwnedServiceUnits() {
1137+
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
1138+
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
1139+
return extensibleLoadManager.getOwnedServiceUnits();
1140+
}
11061141
return ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle)
11071142
.collect(Collectors.toSet());
11081143
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

+51-3
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@
9696
import org.apache.pulsar.common.naming.NamespaceName;
9797
import org.apache.pulsar.common.naming.ServiceUnitId;
9898
import org.apache.pulsar.common.naming.TopicName;
99+
import org.apache.pulsar.common.policies.data.BrokerAssignment;
99100
import org.apache.pulsar.common.policies.data.BundlesData;
100101
import org.apache.pulsar.common.policies.data.ClusterData;
102+
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
101103
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
102104
import org.apache.pulsar.common.policies.data.TopicType;
103105
import org.apache.pulsar.common.stats.Metrics;
@@ -568,7 +570,7 @@ public void testDeployAndRollbackLoadManager() throws Exception {
568570
assertEquals(lookupResult1, lookupResult2);
569571
assertEquals(lookupResult1, lookupResult3);
570572

571-
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get("test")).get();
573+
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
572574
LookupOptions options = LookupOptions.builder()
573575
.authoritative(false)
574576
.requestHttps(false)
@@ -964,10 +966,10 @@ public void testDisableBroker() throws Exception {
964966
var pulsar3 = additionalPulsarTestContext.getPulsarService();
965967
ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl)
966968
FieldUtils.readField(pulsar3.getLoadManager().get(), "loadManager", true));
967-
String topic = "persistent://public/default/test";
969+
String topic = "persistent://" + defaultTestNamespace +"/test";
968970

969971
String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
970-
TopicName topicName = TopicName.get("test");
972+
TopicName topicName = TopicName.get(topic);
971973
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
972974
if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) {
973975
admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(),
@@ -1035,6 +1037,52 @@ public void testListTopic() throws Exception {
10351037
admin.namespaces().deleteNamespace(namespace, true);
10361038
}
10371039

1040+
@Test(timeOut = 30 * 1000)
1041+
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws PulsarAdminException {
1042+
Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
1043+
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
1044+
assertTrue(ownedServiceUnitsByPulsar1.isEmpty());
1045+
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
1046+
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
1047+
assertTrue(ownedServiceUnitsByPulsar2.isEmpty());
1048+
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
1049+
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
1050+
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
1051+
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
1052+
assertTrue(ownedNamespacesByPulsar1.isEmpty());
1053+
assertTrue(ownedNamespacesByPulsar2.isEmpty());
1054+
1055+
String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units";
1056+
admin.topics().createPartitionedTopic(topic, 1);
1057+
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).join();
1058+
CompletableFuture<Optional<BrokerLookupData>> owner = primaryLoadManager.assign(Optional.empty(), bundle);
1059+
assertFalse(owner.join().isEmpty());
1060+
1061+
BrokerLookupData brokerLookupData = owner.join().get();
1062+
if (brokerLookupData.getWebServiceUrl().equals(pulsar1.getWebServiceAddress())) {
1063+
assertOwnedServiceUnits(pulsar1, primaryLoadManager, bundle);
1064+
} else {
1065+
assertOwnedServiceUnits(pulsar2, secondaryLoadManager, bundle);
1066+
}
1067+
}
1068+
1069+
private void assertOwnedServiceUnits(
1070+
PulsarService pulsar,
1071+
ExtensibleLoadManagerImpl extensibleLoadManager,
1072+
NamespaceBundle bundle) throws PulsarAdminException {
1073+
Awaitility.await().untilAsserted(() -> {
1074+
Set<NamespaceBundle> ownedBundles = extensibleLoadManager.getOwnedServiceUnits();
1075+
assertTrue(ownedBundles.contains(bundle));
1076+
});
1077+
Map<String, NamespaceOwnershipStatus> ownedNamespaces =
1078+
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar.getLookupServiceAddress());
1079+
assertTrue(ownedNamespaces.containsKey(bundle.toString()));
1080+
NamespaceOwnershipStatus status = ownedNamespaces.get(bundle.toString());
1081+
assertTrue(status.is_active);
1082+
assertFalse(status.is_controlled);
1083+
assertEquals(status.broker_assignment, BrokerAssignment.shared);
1084+
}
1085+
10381086
private static abstract class MockBrokerFilter implements BrokerFilter {
10391087

10401088
@Override

0 commit comments

Comments
 (0)