Skip to content

Commit

Permalink
[Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST ca…
Browse files Browse the repository at this point in the history
…ll (apache#16946)
  • Loading branch information
AnonHxy authored and tisonkun committed Sep 14, 2022
1 parent c7a0eec commit 17b620a
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5255,27 +5255,35 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Map<String, Boolean>>> futures =
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
final Map<String, Boolean> status = Maps.newHashMap();
List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
Map<String, Boolean> status = Maps.newHashMap();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName partition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
partition.toString(), subName).whenComplete((response, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get replicated subscriptions on {} {}",
clientAppId(), partition, subName, throwable);
asyncResponse.resume(new RestException(throwable));
futures.add(
pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
.thenCompose(owned -> {
if (owned) {
return getReplicatedSubscriptionStatusFromLocalBroker(partition, subName);
} else {
try {
return pulsar().getAdminClient().topics()
.getReplicatedSubscriptionStatusAsync(partition.toString(), subName)
.whenComplete((__, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get replicated subscriptions on"
+ " {} {}",
clientAppId(), partition, subName, throwable);
}
});
} catch (Exception e) {
log.warn("[{}] Failed to get replicated subscription status on {} {}",
clientAppId(), partition, subName, e);
return FutureUtil.failedFuture(e);
}
}
status.putAll(response);
}));
} catch (Exception e) {
log.warn("[{}] Failed to get replicated subscription status on {} {}",
clientAppId(), partition, subName, e);
throw new RestException(e);
}
}).thenAccept(status::putAll)
);
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
Expand Down Expand Up @@ -5313,42 +5321,41 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
});
}

private CompletableFuture<Map<String, Boolean>> getReplicatedSubscriptionStatusFromLocalBroker(
TopicName localTopicName,
String subName) {
return getTopicReferenceAsync(localTopicName).thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(localTopicName.toString(), subName)));
}
if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
return CompletableFuture.completedFuture(
Collections.singletonMap(localTopicName.toString(), sub.isReplicated()));
} else {
return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot get replicated subscriptions on non-persistent topics"));
}
});
}

private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(
AsyncResponse asyncResponse,
String subName,
boolean authoritative) {
// Redirect the request to the appropriate broker if this broker is not the owner of the topic
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
return;
}

Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}

if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
Map res = Maps.newHashMap();
res.put(topicName.toString(), sub.isReplicated());
asyncResponse.resume(res);
.thenCompose(__ -> getReplicatedSubscriptionStatusFromLocalBroker(topicName, subName))
.whenComplete((res, e) -> {
if (e != null) {
Throwable cause = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(),
topicName, subName, cause);
resumeAsyncResponseExceptionally(asyncResponse, e);
} else {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot get replicated subscriptions on non-persistent topics"));
asyncResponse.resume(res);
}
})
.exceptionally(e -> {
Throwable cause = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(),
topicName, subName, cause);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -55,14 +56,17 @@
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -1617,4 +1621,36 @@ public void testUpdatePartitionedTopic()
partitionedTopicMetadata = metaCaptor.getValue();
Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
}

@Test
public void testInternalGetReplicatedSubscriptionStatusFromLocal() throws Exception {
String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal
+ "/testInternalGetReplicatedSubscriptionStatusFromLocal";
String subName = "sub_testInternalGetReplicatedSubscriptionStatusFromLocal";
TopicName topic = TopicName.get(topicName);
admin.topics().createPartitionedTopic(topicName, 2);
admin.topics().createSubscription(topicName, subName, MessageId.latest);

// partition-0 call from local and partition-1 call from admin.
NamespaceService namespaceService = spy(pulsar.getNamespaceService());
doReturn(CompletableFuture.completedFuture(true))
.when(namespaceService).isServiceUnitOwnedAsync(topic.getPartition(0));
doReturn(CompletableFuture.completedFuture(false))
.when(namespaceService).isServiceUnitOwnedAsync(topic.getPartition(1));

doReturn(namespaceService).when(pulsar).getNamespaceService();

PulsarAdmin adminFromPulsar = spy(pulsar.getAdminClient());
doReturn(adminFromPulsar).when(pulsar).getAdminClient();
Topics topics = spy(adminFromPulsar.topics());
doReturn(topics).when(adminFromPulsar).topics();

AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.getReplicatedSubscriptionStatus(response, testTenant, testNamespaceLocal, topic.getLocalName(),
subName, false);
verify(response, timeout(5000).times(1)).resume(any());

// verify we only call getReplicatedSubscriptionStatusAsync once.
verify(topics, times(1)).getReplicatedSubscriptionStatusAsync(any(), any());
}
}

0 comments on commit 17b620a

Please sign in to comment.