Skip to content

Commit 19face6

Browse files
[fix][broker] Fix redirect loop when using ExtensibleLoadManager and list in bundle admin API (#20528)
PIP: #16691 ### Motivation When using `ExtensibleLoadManager` and list in bundle admin API, it will redirect forever because `isServiceUnitOwned` method is checking the `ownershipCache` as the ownership storage, however, when using `ExtensibleLoadManager`, it stored the ownership to table view. ### Modifications * Call `isServiceUnitOwnedAsync ` when using `isServiceUnitOwned `. * Add unit test to cover this case.
1 parent ac46e2e commit 19face6

File tree

2 files changed

+39
-17
lines changed

2 files changed

+39
-17
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

+1-17
Original file line numberDiff line numberDiff line change
@@ -1108,19 +1108,7 @@ public Set<NamespaceBundle> getOwnedServiceUnits() {
11081108
}
11091109

11101110
public boolean isServiceUnitOwned(ServiceUnitId suName) throws Exception {
1111-
if (suName instanceof TopicName) {
1112-
return isTopicOwnedAsync((TopicName) suName).get();
1113-
}
1114-
1115-
if (suName instanceof NamespaceName) {
1116-
return isNamespaceOwned((NamespaceName) suName);
1117-
}
1118-
1119-
if (suName instanceof NamespaceBundle) {
1120-
return ownershipCache.isNamespaceBundleOwned((NamespaceBundle) suName);
1121-
}
1122-
1123-
throw new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName());
1111+
return isServiceUnitOwnedAsync(suName).get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
11241112
}
11251113

11261114
public CompletableFuture<Boolean> isServiceUnitOwnedAsync(ServiceUnitId suName) {
@@ -1174,10 +1162,6 @@ public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName)
11741162
});
11751163
}
11761164

1177-
private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
1178-
return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null;
1179-
}
1180-
11811165
private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {
11821166
// TODO: Add unit tests cover it.
11831167
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

+38
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.Map;
5656
import java.util.Optional;
5757
import java.util.Set;
58+
import java.util.UUID;
5859
import java.util.concurrent.CompletableFuture;
5960
import java.util.concurrent.ExecutionException;
6061
import java.util.concurrent.TimeUnit;
@@ -926,6 +927,43 @@ public void testDisableBroker() throws Exception {
926927
}
927928
}
928929

930+
@Test(timeOut = 30 * 1000)
931+
public void testListTopic() throws Exception {
932+
final String namespace = "public/testListTopic";
933+
admin.namespaces().createNamespace(namespace, 3);
934+
935+
final String persistentTopicName = TopicName.get(
936+
"persistent", NamespaceName.get(namespace),
937+
"get_topics_mode_" + UUID.randomUUID()).toString();
938+
939+
final String nonPersistentTopicName = TopicName.get(
940+
"non-persistent", NamespaceName.get(namespace),
941+
"get_topics_mode_" + UUID.randomUUID()).toString();
942+
admin.topics().createPartitionedTopic(persistentTopicName, 3);
943+
admin.topics().createPartitionedTopic(nonPersistentTopicName, 3);
944+
pulsarClient.newProducer().topic(persistentTopicName).create().close();
945+
pulsarClient.newProducer().topic(nonPersistentTopicName).create().close();
946+
947+
BundlesData bundlesData = admin.namespaces().getBundles(namespace);
948+
List<String> boundaries = bundlesData.getBoundaries();
949+
int topicNum = 0;
950+
for (int i = 0; i < boundaries.size() - 1; i++) {
951+
String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
952+
List<String> topic = admin.topics().getListInBundle(namespace, bundle);
953+
if (topic == null) {
954+
continue;
955+
}
956+
topicNum += topic.size();
957+
for (String s : topic) {
958+
assertFalse(TopicName.get(s).isPersistent());
959+
}
960+
}
961+
assertEquals(topicNum, 3);
962+
963+
List<String> list = admin.topics().getList(namespace);
964+
assertEquals(list.size(), 6);
965+
}
966+
929967
private static abstract class MockBrokerFilter implements BrokerFilter {
930968

931969
@Override

0 commit comments

Comments
 (0)