diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 91920c1db6707..0171e5b447882 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1812,39 +1812,104 @@ protected void internalSkipMessages(String subName, int numMessages, boolean aut protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResponse, int expireTimeInSeconds, boolean authoritative) { + CompletableFuture future; if (topicName.isGlobal()) { - try { - validateGlobalNamespaceOwnership(namespaceName); - } catch (Exception e) { - log.error("[{}] Failed to expire messages for all subscription on topic {}", - clientAppId(), topicName, e); - resumeAsyncResponseExceptionally(asyncResponse, e); - return; - } - } - // If the topic name is a partition name, no need to get partition topic metadata again - if (topicName.isPartitioned()) { - internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, - expireTimeInSeconds, authoritative); + future = validateGlobalNamespaceOwnershipAsync(namespaceName); } else { - getPartitionedTopicMetadataAsync(topicName, - authoritative, false).thenAccept(partitionMetadata -> { - if (partitionMetadata.partitions > 0) { - final List> futures = Lists.newArrayList(); + future = CompletableFuture.completedFuture(null); + } + future.thenCompose(__ -> + getPartitionedTopicMetadataAsync(topicName, authoritative, false) + .thenAccept(partitionMetadata -> { + if (topicName.isPartitioned()) { + internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, + partitionMetadata, expireTimeInSeconds, authoritative); + } else { + if (partitionMetadata.partitions > 0) { + final List> futures = + Lists.newArrayListWithCapacity(partitionMetadata.partitions); - // expire messages for each partition topic - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); + // expire messages for each partition topic + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar() + .getAdminClient() + .topics() + .expireMessagesForAllSubscriptionsAsync( + topicNamePartition.toString(), expireTimeInSeconds)); + } catch (Exception e) { + log.error("[{}] Failed to expire messages up to {} on {}", + clientAppId(), expireTimeInSeconds, + topicNamePartition, e); + asyncResponse.resume(new RestException(e)); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + log.error("[{}] Failed to expire messages up to {} on {}", + clientAppId(), expireTimeInSeconds, + topicName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, + partitionMetadata, expireTimeInSeconds, authoritative); + } + } + } + ) + ).exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, + cause); + resumeAsyncResponseExceptionally(asyncResponse, cause); + return null; + }); + + } + + private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, + PartitionedTopicMetadata + partitionMetadata, + int expireTimeInSeconds, + boolean authoritative) { + // validate ownership and redirect if current broker is not owner + validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES) + .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> { + if (t == null) { + resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.NOT_FOUND, + "Topic not found")); + return; + } + if (!(t instanceof PersistentTopic)) { + resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.METHOD_NOT_ALLOWED, + "Expire messages for all subscriptions on a non-persistent topic is not allowed")); + return; + } + PersistentTopic topic = (PersistentTopic) t; + final List> futures = + Lists.newArrayListWithCapacity((int) topic.getReplicators().size()); + List subNames = + Lists.newArrayListWithCapacity((int) topic.getReplicators().size() + + (int) topic.getSubscriptions().size()); + subNames.addAll(topic.getReplicators().keys()); + subNames.addAll(topic.getSubscriptions().keys()); + for (int i = 0; i < subNames.size(); i++) { try { - futures.add(pulsar() - .getAdminClient() - .topics() - .expireMessagesForAllSubscriptionsAsync( - topicNamePartition.toString(), expireTimeInSeconds)); + futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, + subNames.get(i), expireTimeInSeconds, authoritative)); } catch (Exception e) { - log.error("[{}] Failed to expire messages up to {} on {}", - clientAppId(), expireTimeInSeconds, - topicNamePartition, e); + log.error("[{}] Failed to expire messages for all subscription up to {} on {}", + clientAppId(), expireTimeInSeconds, topicName, e); asyncResponse.resume(new RestException(e)); return; } @@ -1852,83 +1917,23 @@ protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResp FutureUtil.waitForAll(futures).handle((result, exception) -> { if (exception != null) { - Throwable t = exception.getCause(); - log.error("[{}] Failed to expire messages up to {} on {}", - clientAppId(), expireTimeInSeconds, - topicName, t); - asyncResponse.resume(new RestException(t)); + Throwable throwable = FutureUtil.unwrapCompletionException(exception); + log.error("[{}] Failed to expire messages for all subscription up to {} on {}", + clientAppId(), expireTimeInSeconds, topicName, throwable); + asyncResponse.resume(new RestException(throwable)); return null; } - asyncResponse.resume(Response.noContent().build()); return null; }); - } else { - internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, - expireTimeInSeconds, authoritative); - } - }).exceptionally(ex -> { - log.error("[{}] Failed to expire messages for all subscription on topic {}", - clientAppId(), topicName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); - } - } - - private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, - int expireTimeInSeconds, - boolean authoritative) { - // validate ownership and redirect if current broker is not owner - PersistentTopic topic; - try { - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES); - topic = (PersistentTopic) getTopicReference(topicName); - } catch (WebApplicationException wae) { - if (log.isDebugEnabled()) { - log.debug("[{}] Failed to expire messages for all subscription on topic {}," - + " redirecting to other brokers.", - clientAppId(), topicName, wae); - } - resumeAsyncResponseExceptionally(asyncResponse, wae); - return; - } catch (Exception e) { - log.error("[{}] Failed to expire messages for all subscription on topic {}", - clientAppId(), topicName, e); - resumeAsyncResponseExceptionally(asyncResponse, e); - return; - } - final AtomicReference exception = new AtomicReference<>(); - - topic.getReplicators().forEach((subName, replicator) -> { - try { - internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative); - } catch (Throwable t) { - exception.set(t); - } - }); - - topic.getSubscriptions().forEach((subName, subscriber) -> { - try { - internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative); - } catch (Throwable t) { - exception.set(t); - } + }) + ).exceptionally(e -> { + Throwable throwable = FutureUtil.unwrapCompletionException(e); + log.error("[{}] Failed to expire messages for all subscription up to {} on {}", clientAppId(), + expireTimeInSeconds, topicName, throwable); + asyncResponse.resume(new RestException(throwable)); + return null; }); - - if (exception.get() != null) { - if (exception.get() instanceof WebApplicationException) { - WebApplicationException wae = (WebApplicationException) exception.get(); - asyncResponse.resume(wae); - return; - } else { - asyncResponse.resume(new RestException(exception.get())); - return; - } - } - - asyncResponse.resume(Response.noContent().build()); } protected void internalResetCursor(AsyncResponse asyncResponse, String subName, long timestamp, @@ -3367,131 +3372,137 @@ protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, String subName, int expireTimeInSeconds, boolean authoritative) { + CompletableFuture future; if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - // If the topic name is a partition name, no need to get partition topic metadata again - if (topicName.isPartitioned()) { - try { - internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - return; - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - return; - } - asyncResponse.resume(Response.noContent().build()); + future = validateGlobalNamespaceOwnershipAsync(namespaceName); } else { - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); - if (partitionMetadata.partitions > 0) { - final List> futures = Lists.newArrayList(); - - // expire messages for each partition topic - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar() - .getAdminClient() - .topics() - .expireMessagesAsync(topicNamePartition.toString(), - subName, expireTimeInSeconds)); - } catch (Exception e) { - log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds, - topicNamePartition, e); - asyncResponse.resume(new RestException(e)); - return; - } - } - - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return null; + future = CompletableFuture.completedFuture(null); + } + future.thenCompose(__ -> + // If the topic name is a partition name, no need to get partition topic metadata again + getPartitionedTopicMetadataAsync(topicName, authoritative, false) + .thenCompose(partitionMetadata -> { + if (topicName.isPartitioned()) { + return internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, subName, + expireTimeInSeconds, authoritative) + .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())); } else { - log.error("[{}] Failed to expire messages up to {} on {}", - clientAppId(), expireTimeInSeconds, - topicName, t); - asyncResponse.resume(new RestException(t)); - return null; - } - } + if (partitionMetadata.partitions > 0) { + return CompletableFuture.completedFuture(null).thenAccept(unused -> { + final List> futures = Lists.newArrayList(); + + // expire messages for each partition topic + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar() + .getAdminClient() + .topics() + .expireMessagesAsync(topicNamePartition.toString(), + subName, expireTimeInSeconds)); + } catch (Exception e) { + log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), + expireTimeInSeconds, topicNamePartition, e); + asyncResponse.resume(new RestException(e)); + return; + } + } - asyncResponse.resume(Response.noContent().build()); - return null; - }); - } else { - try { - internalExpireMessagesByTimestampForSinglePartition(subName, expireTimeInSeconds, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - return; - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - return; - } - asyncResponse.resume(Response.noContent().build()); - } - } + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "Subscription not found")); + return null; + } else { + log.error("[{}] Failed to expire messages up to {} on {}", + clientAppId(), expireTimeInSeconds, + topicName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + asyncResponse.resume(Response.noContent().build()); + return null; + }); + }); + } else { + return internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, + subName, expireTimeInSeconds, authoritative) + .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())); + } + } + }) + ).exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds, topicName, + cause); + resumeAsyncResponseExceptionally(asyncResponse, cause); + return null; + }); } - private void internalExpireMessagesByTimestampForSinglePartition(String subName, int expireTimeInSeconds, + private CompletableFuture internalExpireMessagesByTimestampForSinglePartitionAsync( + PartitionedTopicMetadata partitionMetadata, String subName, int expireTimeInSeconds, boolean authoritative) { - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - // If the topic name is a partition name, no need to get partition topic metadata again - if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) { + if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) { String msg = "This method should not be called for partitioned topic"; - log.error("[{}] {} {} {}", clientAppId(), msg, topicName, subName); - throw new IllegalStateException(msg); - } - - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES); - - if (!(getTopicReference(topicName) instanceof PersistentTopic)) { - log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName, subName); - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Expire messages on a non-persistent topic is not allowed"); - } + return FutureUtil.failedFuture(new IllegalStateException(msg)); + } else { + final CompletableFuture resultFuture = new CompletableFuture<>(); + validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES) + .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> { + if (t == null) { + resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Topic not found")); + return; + } + if (!(t instanceof PersistentTopic)) { + resultFuture.completeExceptionally(new RestException(Status.METHOD_NOT_ALLOWED, + "Expire messages on a non-persistent topic is not allowed")); + return; + } + PersistentTopic topic = (PersistentTopic) t; - PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); - try { - boolean issued; - if (subName.startsWith(topic.getReplicatorPrefix())) { - String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); - checkNotNull(repl); - issued = repl.expireMessages(expireTimeInSeconds); - } else { - PersistentSubscription sub = topic.getSubscription(subName); - checkNotNull(sub); - issued = sub.expireMessages(expireTimeInSeconds); - } - if (issued) { - log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), expireTimeInSeconds, topicName, - subName); - } else { - if (log.isDebugEnabled()) { - log.debug("Expire message by timestamp not issued on topic {} for subscription {} due to ongoing " - + "message expiration not finished or subscription almost catch up. If it's performed on " - + "a partitioned topic operation might succeeded on other partitions, please check " - + "stats of individual partition.", topicName, subName); - } - throw new RestException(Status.CONFLICT, "Expire message by timestamp not issued on topic " - + topicName + " for subscription " + subName + " due to ongoing message expiration not finished or " - + " subscription almost catch up. If it's performed on a partitioned topic operation might succeeded " - + "on other partitions, please check stats of individual partition."); - } - } catch (NullPointerException npe) { - throw new RestException(Status.NOT_FOUND, "Subscription not found"); - } catch (Exception exception) { - log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", clientAppId(), - expireTimeInSeconds, topicName, subName, exception); - throw new RestException(exception); + boolean issued; + if (subName.startsWith(topic.getReplicatorPrefix())) { + String remoteCluster = PersistentReplicator.getRemoteCluster(subName); + PersistentReplicator repl = (PersistentReplicator) topic + .getPersistentReplicator(remoteCluster); + checkNotNull(repl); + issued = repl.expireMessages(expireTimeInSeconds); + } else { + PersistentSubscription sub = topic.getSubscription(subName); + checkNotNull(sub); + issued = sub.expireMessages(expireTimeInSeconds); + } + if (issued) { + log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), + expireTimeInSeconds, topicName, subName); + resultFuture.complete(__); + } else { + if (log.isDebugEnabled()) { + log.debug("Expire message by timestamp not issued on topic {} for subscription {} " + + "due to ongoing message expiration not finished or subscription almost" + + " catch up. If it's performed on a partitioned topic operation might " + + "succeeded on other partitions, please check stats of individual " + + "partition.", topicName, subName); + } + resultFuture.completeExceptionally(new RestException(Status.CONFLICT, "Expire message " + + "by timestamp not issued on topic " + topicName + " for subscription " + + subName + " due to ongoing message expiration not finished or subscription " + + "almost catch up. If it's performed on a partitioned topic operation might" + + " succeeded on other partitions, please check stats of individual partition." + )); + return; + } + }) + ).exceptionally(e -> { + resultFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e)); + return null; + }); + return resultFuture; } }