Skip to content

Commit

Permalink
Pulsar Admin List Subscription lists only subscriptions created for P…
Browse files Browse the repository at this point in the history
…artition-0 when partition specific subscriptions are created (#11339).
  • Loading branch information
Technoboy- committed Jul 18, 2021
1 parent eb4d8aa commit 071d2ac
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1030,34 +1030,46 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut
false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
try {
// get the subscriptions only from the 1st partition
// since all the other partitions will have the same
// subscriptions
pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
.whenComplete((r, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(),
topicName, ex.getMessage());

if (ex instanceof PulsarAdminException) {
PulsarAdminException pae = (PulsarAdminException) ex;
if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Internal topics have not been generated yet"));
return;
} else {
asyncResponse.resume(new RestException(pae));
return;
}
} else {
asyncResponse.resume(new RestException(ex));
return;
}
final Set<String> subscriptions = Sets.newConcurrentHashSet();
final List<CompletableFuture<Object>> subscriptionFutures = Lists.newArrayList();
String path = String.format("/managed-ledgers/%s/%s", namespaceName.toString(), domain());
List<String> children = getLocalPolicies().getChildren(path);
List<String> activeTopics = Lists.newArrayList();
for (String topic : children) {
if (topic.contains(topicName.getLocalName())) {
activeTopics.add(topic);
}
}
if (log.isDebugEnabled()) {
log.debug("activeTopics : {}", activeTopics);
}
for (String topic : activeTopics) {
CompletableFuture<List<String>> subscriptionsAsync = pulsar().getAdminClient().topics()
.getSubscriptionsAsync(TopicName.get(domain(), namespaceName, topic).toString());
subscriptionFutures.add(subscriptionsAsync.thenApply(r -> subscriptions.addAll(r)));
}
FutureUtil.waitForAll(subscriptionFutures).whenComplete((r, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(),
topicName, ex.getMessage());
if (ex instanceof PulsarAdminException) {
PulsarAdminException pae = (PulsarAdminException) ex;
if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Internal topics have not been generated yet"));
return;
} else {
asyncResponse.resume(new RestException(pae));
return;
}
final List<String> subscriptions = Lists.newArrayList();
subscriptions.addAll(r);
asyncResponse.resume(subscriptions);
});
} else {
asyncResponse.resume(new RestException(ex));
return;
}
} else {
asyncResponse.resume(new ArrayList<>(subscriptions));
}
});
} catch (Exception e) {
log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
asyncResponse.resume(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,27 @@ public void testGetSubscriptions() {
true);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());

// 8) Delete the partitioned topic
// 8) Create a sub of partitioned-topic
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName + "-partition-1", "test", true,
(MessageIdImpl) MessageId.earliest, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
//
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-1", true);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList("test"));
//
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", true);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
//
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName, true);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList("test"));

// 9) Delete the partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, true, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
Expand Down

0 comments on commit 071d2ac

Please sign in to comment.