diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 139f59ef7b3ac..151c499dac94c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4585,115 +4585,119 @@ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncRespon return; } - // Permission to consume this topic is required - try { - validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName); - } catch (Exception e) { - resumeAsyncResponseExceptionally(asyncResponse, e); - return; - } + // 1.Permission to consume this topic is required + // 2.Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters + CompletableFuture validateFuture = + validateTopicOperationAsync(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName) + .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName)); - // Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters - try { - validateGlobalNamespaceOwnership(namespaceName); - } catch (Exception e) { - resumeAsyncResponseExceptionally(asyncResponse, e); - return; - } + CompletableFuture resultFuture; // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { - internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative, - enabled); + resultFuture = validateFuture.thenAccept( + __ -> internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, + authoritative, enabled)); } else { - getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> { - if (partitionMetadata.partitions > 0) { - final List> futures = Lists.newArrayList(); + resultFuture = validateFuture. + thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)) + .thenAccept(partitionMetadata -> { + if (partitionMetadata.partitions > 0) { + final List> futures = Lists.newArrayList(); - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync( - topicNamePartition.toString(), subName, enabled)); - } catch (Exception e) { - log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", - clientAppId(), enabled, topicNamePartition, subName, e); - resumeAsyncResponseExceptionally(asyncResponse, e); - return; - } - } + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync( + topicNamePartition.toString(), subName, enabled)); + } catch (Exception e) { + log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", + clientAppId(), enabled, topicNamePartition, subName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + } - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse - .resume(new RestException(Status.NOT_FOUND, "Topic or subscription not found")); - return null; - } else if (t instanceof PreconditionFailedException) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Cannot enable/disable replicated subscriptions on non-global topics")); - return null; - } else { - log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", - clientAppId(), enabled, topicName, subName, t); - asyncResponse.resume(new RestException(t)); + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse + .resume(new RestException(Status.NOT_FOUND, + "Topic or subscription not found")); + return null; + } else if (t instanceof PreconditionFailedException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Cannot enable/disable replicated subscriptions on non-global topics")); + return null; + } else { + log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", + clientAppId(), enabled, topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + + asyncResponse.resume(Response.noContent().build()); return null; - } + }); + } else { + internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, + authoritative, enabled); } - - asyncResponse.resume(Response.noContent().build()); - return null; }); - } else { - internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative, - enabled); - } - }).exceptionally(ex -> { - log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled, - topicName, subName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); } - } - private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse, - String subName, boolean authoritative, boolean enabled) { - try { - // Redirect the request to the appropriate broker if this broker is not the owner of the topic - validateTopicOwnership(topicName, authoritative); + resultFuture.exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled, + topicName, subName, cause); + resumeAsyncResponseExceptionally(asyncResponse, cause); + return null; + }); + } - Topic topic = getTopicReference(topicName); - if (topic == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); - return; - } + private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic( + AsyncResponse asyncResponse, String subName, boolean authoritative, boolean enabled) { + // Redirect the request to the appropriate broker if this broker is not the owner of the topic + validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenAccept(topic -> { + if (topic == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + return; + } - Subscription sub = topic.getSubscription(subName); - if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return; - } + Subscription sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return; + } - if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) { - if (!((PersistentSubscription) sub).setReplicated(enabled)) { - asyncResponse.resume( - new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to update cursor properties")); - return; - } + if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) { + if (!((PersistentSubscription) sub).setReplicated(enabled)) { + asyncResponse.resume( + new RestException(Status.INTERNAL_SERVER_ERROR, + "Failed to update cursor properties")); + return; + } - ((PersistentTopic) topic).checkReplicatedSubscriptionControllerState(); - log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName, - subName); - asyncResponse.resume(Response.noContent().build()); - } else { - asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, - "Cannot enable/disable replicated subscriptions on non-persistent topics")); - } - } catch (Exception e) { - resumeAsyncResponseExceptionally(asyncResponse, e); - } + ((PersistentTopic) topic).checkReplicatedSubscriptionControllerState(); + log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(), + enabled, topicName, subName); + asyncResponse.resume(Response.noContent().build()); + } else { + asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, + "Cannot enable/disable replicated subscriptions on non-persistent topics")); + } + } + ).exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.error("[{}] Failed to set replicated subscription status on {} {}", clientAppId(), + topicName, subName, cause); + resumeAsyncResponseExceptionally(asyncResponse, cause); + return null; + }); } protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncResponse,