Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call #16946

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5254,27 +5254,35 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Map<String, Boolean>>> futures =
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
final Map<String, Boolean> status = Maps.newHashMap();
List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
Map<String, Boolean> status = Maps.newHashMap();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName partition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
partition.toString(), subName).whenComplete((response, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get replicated subscriptions on {} {}",
clientAppId(), partition, subName, throwable);
asyncResponse.resume(new RestException(throwable));
futures.add(
pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
.thenCompose(owned -> {
if (owned) {
return getReplicatedSubscriptionStatusFromLocalBroker(partition, subName);
} else {
try {
return pulsar().getAdminClient().topics()
.getReplicatedSubscriptionStatusAsync(partition.toString(), subName)
.whenComplete((__, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get replicated subscriptions on"
+ " {} {}",
clientAppId(), partition, subName, throwable);
}
});
} catch (Exception e) {
log.warn("[{}] Failed to get replicated subscription status on {} {}",
clientAppId(), partition, subName, e);
return FutureUtil.failedFuture(e);
}
}
status.putAll(response);
}));
} catch (Exception e) {
log.warn("[{}] Failed to get replicated subscription status on {} {}",
clientAppId(), partition, subName, e);
throw new RestException(e);
}
}).thenAccept(status::putAll)
);
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
Expand Down Expand Up @@ -5312,42 +5320,41 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
});
}

private CompletableFuture<Map<String, Boolean>> getReplicatedSubscriptionStatusFromLocalBroker(
TopicName localTopicName,
String subName) {
return getTopicReferenceAsync(localTopicName).thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(localTopicName.toString(), subName)));
}
if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
return CompletableFuture.completedFuture(
Collections.singletonMap(localTopicName.toString(), sub.isReplicated()));
} else {
return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot get replicated subscriptions on non-persistent topics"));
}
});
}

private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(
AsyncResponse asyncResponse,
String subName,
boolean authoritative) {
// Redirect the request to the appropriate broker if this broker is not the owner of the topic
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
Jason918 marked this conversation as resolved.
Show resolved Hide resolved
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString())));
return;
}

Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}

if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
Map res = Maps.newHashMap();
res.put(topicName.toString(), sub.isReplicated());
asyncResponse.resume(res);
.thenCompose(__ -> getReplicatedSubscriptionStatusFromLocalBroker(topicName, subName))
.whenComplete((res, e) -> {
if (e != null) {
Throwable cause = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(),
topicName, subName, cause);
resumeAsyncResponseExceptionally(asyncResponse, e);
} else {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot get replicated subscriptions on non-persistent topics"));
asyncResponse.resume(res);
}
})
.exceptionally(e -> {
Throwable cause = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(),
topicName, subName, cause);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -55,14 +56,17 @@
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -1601,4 +1605,36 @@ public void testUpdatePartitionedTopic()
partitionedTopicMetadata = metaCaptor.getValue();
Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
}

@Test
public void testInternalGetReplicatedSubscriptionStatusFromLocal() throws Exception {
String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal
+ "/testInternalGetReplicatedSubscriptionStatusFromLocal";
String subName = "sub_testInternalGetReplicatedSubscriptionStatusFromLocal";
TopicName topic = TopicName.get(topicName);
admin.topics().createPartitionedTopic(topicName, 2);
admin.topics().createSubscription(topicName, subName, MessageId.latest);

// partition-0 call from local and partition-1 call from admin.
NamespaceService namespaceService = spy(pulsar.getNamespaceService());
doReturn(CompletableFuture.completedFuture(true))
.when(namespaceService).isServiceUnitOwnedAsync(topic.getPartition(0));
doReturn(CompletableFuture.completedFuture(false))
.when(namespaceService).isServiceUnitOwnedAsync(topic.getPartition(1));

doReturn(namespaceService).when(pulsar).getNamespaceService();

PulsarAdmin adminFromPulsar = spy(pulsar.getAdminClient());
doReturn(adminFromPulsar).when(pulsar).getAdminClient();
Topics topics = spy(adminFromPulsar.topics());
doReturn(topics).when(adminFromPulsar).topics();

AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.getReplicatedSubscriptionStatus(response, testTenant, testNamespaceLocal, topic.getLocalName(),
subName, false);
verify(response, timeout(5000).times(1)).resume(any());

// verify we only call getReplicatedSubscriptionStatusAsync once.
verify(topics, times(1)).getReplicatedSubscriptionStatusAsync(any(), any());
}
}