Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Fix call sync method in async rest api for internalDeletePartitionedTopic #13805

Merged
merged 22 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4168b9c
[Broker] Fix call sync method in async rest api for ``internalDeleteP…
mattisonchao Jan 18, 2022
269b6eb
Update PulsarWebResource.java
mattisonchao Jan 18, 2022
19f533e
fixes checkstyle
mattisonchao Jan 18, 2022
fbb0aaf
Use ``thenAccept`` instead of new CompletableFuture
mattisonchao Jan 19, 2022
702545e
Revert "Use ``thenAccept`` instead of new CompletableFuture"
mattisonchao Jan 19, 2022
be05da0
Change apply to accept
mattisonchao Jan 19, 2022
c6ab9fd
fixes wrong error catch.
mattisonchao Jan 19, 2022
74ccf9f
Add ``FutureUtil.getOriginalException`` to hanlde various situations …
mattisonchao Jan 20, 2022
f9dd4cd
fixes wrong throw exception.
mattisonchao Jan 20, 2022
66dec57
fixes checkstyle
mattisonchao Jan 20, 2022
59c8519
Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/imp…
mattisonchao Jan 21, 2022
e370c98
Update pulsar-common/src/main/java/org/apache/pulsar/common/util/Futu…
mattisonchao Jan 21, 2022
eed6d3e
Refactor ``internalDeletePartitionedTopic`` method
mattisonchao Jan 25, 2022
6917840
Sync method add timeout
mattisonchao Jan 25, 2022
beabb49
Use lambda instead of for loop.
mattisonchao Jan 25, 2022
8dbbd6c
Use unwrapCompletionException instead of unwrapException
mattisonchao Jan 25, 2022
dc8d3ac
fix checkstyle
mattisonchao Jan 25, 2022
77ce9f3
Fixes exception handle
mattisonchao Jan 25, 2022
ed37e1e
Fixes missing return and method variable name.
mattisonchao Jan 25, 2022
b5a3b17
fixes method name.
mattisonchao Jan 25, 2022
28cc73e
Fixes method name
mattisonchao Jan 25, 2022
0a8c280
Fixes checkstyle
mattisonchao Jan 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -448,21 +448,6 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
}
}

public boolean allowNamespaceOperation(NamespaceName namespaceName,
NamespaceOperation operation,
String originalRole,
String role,
AuthenticationDataSource authData) {
try {
return allowNamespaceOperationAsync(
namespaceName, operation, originalRole, role, authData).get();
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
}
}

/**
* Grant authorization-action permission on a namespace to the given client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -560,156 +561,132 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author

protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative,
boolean force, boolean deleteSchema) {
try {
validateNamespaceOperation(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC);
validateTopicOwnership(topicName, authoritative);
} catch (WebApplicationException wae) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to delete partitioned topic {}, redirecting to other brokers.",
clientAppId(), topicName, wae);
}
resumeAsyncResponseExceptionally(asyncResponse, wae);
return;
} catch (Exception e) {
log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
final CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMeta -> {
final int numPartitions = partitionMeta.partitions;
if (numPartitions > 0) {
final AtomicInteger count = new AtomicInteger(numPartitions);
if (deleteSchema) {
count.incrementAndGet();
pulsar().getBrokerService().deleteSchemaStorage(topicName.getPartition(0).toString())
.whenComplete((r, ex) -> {
if (ex != null) {
log.warn("Failed to delete schema storage of topic: {}", topicName);
}
if (count.decrementAndGet() == 0) {
future.complete(null);
}
});
}
// delete authentication policies of the partitioned topic
CompletableFuture<Void> deleteAuthFuture = new CompletableFuture<>();
pulsar().getPulsarResources().getNamespaceResources()
.setPoliciesAsync(topicName.getNamespaceObject(), p -> {
for (int i = 0; i < numPartitions; i++) {
p.auth_policies.getTopicAuthentication().remove(topicName.getPartition(i).toString());
validateNamespaceOperationAsync(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC)
.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> pulsar().getBrokerService()
.fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(partitionedMeta -> {
final int numPartitions = partitionedMeta.partitions;
if (numPartitions < 1){
return CompletableFuture.completedFuture(null);
}
p.auth_policies.getTopicAuthentication().remove(topicName.toString());
return p;
}).thenAccept(v -> {
log.info("Successfully delete authentication policies for partitioned topic {}", topicName);
deleteAuthFuture.complete(null);
}).exceptionally(ex -> {
if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
log.warn("Namespace policies of {} not found", topicName.getNamespaceObject());
deleteAuthFuture.complete(null);
} else {
log.error("Failed to delete authentication policies for partitioned topic {}",
topicName, ex);
deleteAuthFuture.completeExceptionally(ex);
if (deleteSchema) {
return pulsar().getBrokerService()
.deleteSchemaStorage(topicName.getPartition(0).toString())
.thenCompose(unused ->
internalRemovePartitionsAuthenticationPoliciesAsync(numPartitions))
.thenCompose(unused2 ->
internalRemovePartitionsTopicAsync(numPartitions, force));
}
return null;
});

deleteAuthFuture.whenComplete((r, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
return;
return internalRemovePartitionsAuthenticationPoliciesAsync(numPartitions)
.thenCompose(unused -> internalRemovePartitionsTopicAsync(numPartitions, force));
})
// Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
).thenCompose(__ -> namespaceResources()
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName))
.thenAccept(__ -> {
log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof WebApplicationException
&& ((WebApplicationException) realCause).getResponse().getStatus()
== Status.TEMPORARY_REDIRECT.getStatusCode()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to delete partitioned topic {}, redirecting to other brokers.",
clientAppId(), topicName, realCause);
}
} else if (realCause instanceof PreconditionFailedException) {
asyncResponse.resume(
new RestException(Status.PRECONDITION_FAILED,
"Topic has active producers/subscriptions"));
} else if (realCause instanceof WebApplicationException){
asyncResponse.resume(realCause);
} else if (realCause instanceof MetadataStoreException.NotFoundException) {
log.warn("Namespace policies of {} not found", topicName.getNamespaceObject());
asyncResponse.resume(new RestException(
new RestException(Status.NOT_FOUND, "Partitioned topic does not exist")));
} else if (realCause instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) realCause));
} else if (realCause instanceof MetadataStoreException.BadVersionException) {
asyncResponse.resume(new RestException(
new RestException(Status.CONFLICT, "Concurrent modification")));
} else {
log.warn("[{}] Fail to Delete partitioned topic {}", clientAppId(), topicName, realCause);
asyncResponse.resume(new RestException(realCause));
}
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
.deleteAsync(topicNamePartition.toString(), force)
.whenComplete((r1, ex1) -> {
if (ex1 != null) {
if (ex1 instanceof NotFoundException) {
// if the sub-topic is not found, the client might not have called
// create producer or it might have been deleted earlier,
//so we ignore the 404 error.
// For all other exception,
//we fail the delete partition method even if a single
// partition is failed to be deleted
if (log.isDebugEnabled()) {
log.debug("[{}] Partition not found: {}", clientAppId(),
topicNamePartition);
}
} else {
log.error("[{}] Failed to delete partition {}", clientAppId(),
topicNamePartition, ex1);
future.completeExceptionally(ex1);
return;
return null;
});
}

private CompletableFuture<Void> internalRemovePartitionsTopicAsync(int numPartitions, boolean force) {
return FutureUtil.waitForAll(IntStream.range(0, numPartitions)
.mapToObj(i -> {
TopicName topicNamePartition = topicName.getPartition(i);
try {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getAdminClient().topics()
.deleteAsync(topicNamePartition.toString(), force)
.whenComplete((r, ex) -> {
if (ex != null) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof NotFoundException){
// if the sub-topic is not found, the client might not have called
// create producer or it might have been deleted earlier,
// so we ignore the 404 error.
// For all other exception,
// we fail the delete partition method even if a single
// partition is failed to be deleted
if (log.isDebugEnabled()) {
log.debug("[{}] Partition not found: {}", clientAppId(),
topicNamePartition);
}
} else {
log.info("[{}] Deleted partition {}", clientAppId(), topicNamePartition);
}
if (count.decrementAndGet() == 0) {
future.complete(null);
} else {
log.error("[{}] Failed to delete partition {}", clientAppId(),
topicNamePartition, realCause);
future.completeExceptionally(realCause);
}
});
} catch (Exception e) {
log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, e);
future.completeExceptionally(e);
}
} else {
future.complete(null);
}
});
return future;
} catch (PulsarServerException ex) {
log.error("[{}] Failed to get admin client while delete partition {}",
clientAppId(), topicNamePartition, ex);
return FutureUtil.failedFuture(ex);
}
});
} else {
future.complete(null);
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});

future.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof PreconditionFailedException) {
asyncResponse.resume(
new RestException(Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions"));
return;
} else if (ex instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) ex));
return;
} else if (ex instanceof WebApplicationException) {
asyncResponse.resume(ex);
return;
} else {
asyncResponse.resume(new RestException(ex));
return;
}
}
// Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
try {
namespaceResources().getPartitionedTopicResources()
.deletePartitionedTopicAsync(topicName).thenAccept(r2 -> {
log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex1 -> {
log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, ex1.getCause());
if (ex1.getCause()
instanceof org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(
new RestException(Status.NOT_FOUND, "Partitioned topic does not exist")));
} else if (ex1
.getCause()
instanceof org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException) {
asyncResponse.resume(
new RestException(new RestException(Status.CONFLICT, "Concurrent modification")));
}).collect(Collectors.toList()));
}

private CompletableFuture<Void> internalRemovePartitionsAuthenticationPoliciesAsync(int numPartitions) {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getPulsarResources().getNamespaceResources()
.setPoliciesAsync(topicName.getNamespaceObject(), p -> {
IntStream.range(0, numPartitions)
.forEach(i -> p.auth_policies.getTopicAuthentication()
.remove(topicName.getPartition(i).toString()));
p.auth_policies.getTopicAuthentication().remove(topicName.toString());
return p;
})
.whenComplete((r, ex) -> {
if (ex != null){
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof MetadataStoreException.NotFoundException) {
log.warn("Namespace policies of {} not found", topicName.getNamespaceObject());
future.complete(null);
} else {
log.error("Failed to delete authentication policies for partitioned topic {}",
topicName, ex);
future.completeExceptionally(realCause);
}
} else {
asyncResponse.resume(new RestException((ex1.getCause())));
log.info("Successfully delete authentication policies for partitioned topic {}", topicName);
future.complete(null);
}
return null;
});
} catch (Exception e1) {
log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e1);
asyncResponse.resume(new RestException(e1));
}
});
return future;
}

protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) {
Expand Down
Loading