Skip to content

Commit

Permalink
[Issue 13854][broker] Fix call sync method in async rest api for inte…
Browse files Browse the repository at this point in the history
…rnalSetReplicatedSubscriptionStatus (#13887)
  • Loading branch information
suiyuzeng authored Jan 28, 2022
1 parent 0f19132 commit 7ea2448
Showing 1 changed file with 97 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4585,115 +4585,119 @@ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
return;
}

// Permission to consume this topic is required
try {
validateTopicOperation(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
// 1.Permission to consume this topic is required
// 2.Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
CompletableFuture<Void> validateFuture =
validateTopicOperationAsync(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName)
.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName));

// Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}

CompletableFuture<Void> resultFuture;
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
enabled);
resultFuture = validateFuture.thenAccept(
__ -> internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName,
authoritative, enabled));
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
resultFuture = validateFuture.
thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
topicNamePartition.toString(), subName, enabled));
} catch (Exception e) {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
clientAppId(), enabled, topicNamePartition, subName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
topicNamePartition.toString(), subName, enabled));
} catch (Exception e) {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
clientAppId(), enabled, topicNamePartition, subName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse
.resume(new RestException(Status.NOT_FOUND, "Topic or subscription not found"));
return null;
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Cannot enable/disable replicated subscriptions on non-global topics"));
return null;
} else {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
clientAppId(), enabled, topicName, subName, t);
asyncResponse.resume(new RestException(t));
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse
.resume(new RestException(Status.NOT_FOUND,
"Topic or subscription not found"));
return null;
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Cannot enable/disable replicated subscriptions on non-global topics"));
return null;
} else {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}",
clientAppId(), enabled, topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}

asyncResponse.resume(Response.noContent().build());
return null;
}
});
} else {
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName,
authoritative, enabled);
}

asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
enabled);
}
}).exceptionally(ex -> {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
topicName, subName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}

private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName, boolean authoritative, boolean enabled) {
try {
// Redirect the request to the appropriate broker if this broker is not the owner of the topic
validateTopicOwnership(topicName, authoritative);
resultFuture.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
topicName, subName, cause);
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}

Topic topic = getTopicReference(topicName);
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
return;
}
private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(
AsyncResponse asyncResponse, String subName, boolean authoritative, boolean enabled) {
// Redirect the request to the appropriate broker if this broker is not the owner of the topic
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
return;
}

Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
}
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
}

if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
if (!((PersistentSubscription) sub).setReplicated(enabled)) {
asyncResponse.resume(
new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to update cursor properties"));
return;
}
if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
if (!((PersistentSubscription) sub).setReplicated(enabled)) {
asyncResponse.resume(
new RestException(Status.INTERNAL_SERVER_ERROR,
"Failed to update cursor properties"));
return;
}

((PersistentTopic) topic).checkReplicatedSubscriptionControllerState();
log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName,
subName);
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot enable/disable replicated subscriptions on non-persistent topics"));
}
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
}
((PersistentTopic) topic).checkReplicatedSubscriptionControllerState();
log.info("[{}] Changed replicated subscription status to {} - {} {}", clientAppId(),
enabled, topicName, subName);
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot enable/disable replicated subscriptions on non-persistent topics"));
}
}
).exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to set replicated subscription status on {} {}", clientAppId(),
topicName, subName, cause);
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}

protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncResponse,
Expand Down

0 comments on commit 7ea2448

Please sign in to comment.