diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index e33f94a3f744d..503b21b430d58 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -638,19 +638,15 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole zkSync(path); log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName); asyncResponse.resume(Response.noContent().build()); - return; } catch (KeeperException.NoNodeException nne) { asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned topic does not exist")); - return; } catch (KeeperException.BadVersionException e) { log.warn("[{}] Failed to delete partitioned topic {}: concurrent modification", clientAppId(), topicName); asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification")); - return; } catch (Exception e) { log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e); asyncResponse.resume(new RestException(e)); - return; } }); } @@ -660,57 +656,65 @@ protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authorit if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - - getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> { - if (meta.partitions > 0) { - final List> futures = Lists.newArrayList(); - - for (int i = 0; i < meta.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar().getAdminClient().topics().unloadAsync(topicNamePartition.toString())); - } catch (Exception e) { - log.error("[{}] Failed to unload topic {}", clientAppId(), topicNamePartition, e); - asyncResponse.resume(new RestException(e)); - return; + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalUnloadNonPartitionedTopic(asyncResponse, authoritative); + } else { + getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> { + if (meta.partitions > 0) { + final List> futures = Lists.newArrayList(); + + for (int i = 0; i < meta.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics().unloadAsync(topicNamePartition.toString())); + } catch (Exception e) { + log.error("[{}] Failed to unload topic {}", clientAppId(), topicNamePartition, e); + asyncResponse.resume(new RestException(e)); + return; + } } - } - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable th = exception.getCause(); - if (th instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage())); - } else { - log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, exception); - asyncResponse.resume(new RestException(exception)); + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable th = exception.getCause(); + if (th instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage())); + } else { + log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, exception); + asyncResponse.resume(new RestException(exception)); + } + return null; } + + asyncResponse.resume(Response.noContent().build()); return null; - } + }); + } else { + internalUnloadNonPartitionedTopic(asyncResponse, authoritative); + } + }).exceptionally(t -> { + Throwable th = t.getCause(); + asyncResponse.resume(new RestException(th)); + return null; + }); + } + } - asyncResponse.resume(Response.noContent().build()); - return null; - }); - } else { - validateAdminAccessForTenant(topicName.getTenant()); - validateTopicOwnership(topicName, authoritative); + private void internalUnloadNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { + validateAdminAccessForTenant(topicName.getTenant()); + validateTopicOwnership(topicName, authoritative); - Topic topic = getTopicReference(topicName); - topic.close(false).whenComplete((r, ex) -> { - if (ex != null) { - log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex.getMessage(), ex); - asyncResponse.resume(new RestException(ex)); + Topic topic = getTopicReference(topicName); + topic.close(false).whenComplete((r, ex) -> { + if (ex != null) { + log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex.getMessage(), ex); + asyncResponse.resume(new RestException(ex)); - } else { - log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName); - asyncResponse.resume(Response.noContent().build()); - } - }); + } else { + log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName); + asyncResponse.resume(Response.noContent().build()); } - }).exceptionally(t -> { - Throwable th = t.getCause(); - asyncResponse.resume(new RestException(th)); - return null; }); } @@ -753,25 +757,26 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - - final List subscriptions = Lists.newArrayList(); - - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); - if (partitionMetadata.partitions > 0) { - try { - // get the subscriptions only from the 1st partition since all the other partitions will have the same - // subscriptions - pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString()) + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative); + } else { + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); + if (partitionMetadata.partitions > 0) { + try { + // get the subscriptions only from the 1st partition since all the other partitions will have the same + // subscriptions + pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString()) .whenComplete((r, ex) -> { if (ex != null) { log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(), - topicName, ex.getMessage()); + topicName, ex.getMessage()); if (ex instanceof PulsarAdminException) { PulsarAdminException pae = (PulsarAdminException) ex; if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Internal topics have not been generated yet")); + "Internal topics have not been generated yet")); return; } else { asyncResponse.resume(new RestException(pae)); @@ -782,29 +787,34 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut return; } } - + final List subscriptions = Lists.newArrayList(); subscriptions.addAll(r); asyncResponse.resume(subscriptions); return; }); - } catch (Exception e) { - log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e); - asyncResponse.resume(e); - return; + } catch (Exception e) { + log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e); + asyncResponse.resume(e); + return; + } + } else { + internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative); } - } else { - validateAdminOperationOnTopic(authoritative); - Topic topic = getTopicReference(topicName); + } + } - try { - topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName)); - asyncResponse.resume(subscriptions); - return; - } catch (Exception e) { - log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e); - asyncResponse.resume(new RestException(e)); - return; - } + private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { + validateAdminOperationOnTopic(authoritative); + Topic topic = getTopicReference(topicName); + try { + final List subscriptions = Lists.newArrayList(); + topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName)); + asyncResponse.resume(subscriptions); + return; + } catch (Exception e) { + log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e); + asyncResponse.resume(new RestException(e)); + return; } } @@ -952,67 +962,72 @@ protected void internalDeleteSubscription(AsyncResponse asyncResponse, String su if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); - if (partitionMetadata.partitions > 0) { - final List> futures = Lists.newArrayList(); + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative); + } else { + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); + if (partitionMetadata.partitions > 0) { + final List> futures = Lists.newArrayList(); - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar().getAdminClient().topics() + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics() .deleteSubscriptionAsync(topicNamePartition.toString(), subName)); - } catch (Exception e) { - log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName, + } catch (Exception e) { + log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName, e); - asyncResponse.resume(new RestException(e)); - return; + asyncResponse.resume(new RestException(e)); + return; + } } - } - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return null; - } else if (t instanceof PreconditionFailedException) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return null; + } else if (t instanceof PreconditionFailedException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Subscription has active connected consumers")); - return null; - } else { - log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, t); - asyncResponse.resume(new RestException(t)); - return null; + return null; + } else { + log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } } - } - asyncResponse.resume(Response.noContent().build()); - return null; - }); - } else { - validateAdminAccessForSubscriber(subName, authoritative); - Topic topic = getTopicReference(topicName); - try { - Subscription sub = topic.getSubscription(subName); - checkNotNull(sub); - sub.delete().get(); - log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName); - asyncResponse.resume(Response.noContent().build()); - return; - } catch (Exception e) { - Throwable t = e.getCause(); - if (e instanceof NullPointerException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return; - } else if (t instanceof SubscriptionBusyException) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Subscription has active connected consumers")); - return; - } else { - log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, e); - asyncResponse.resume(new RestException(t)); - return; - } + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative); + } + } + } + + private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) { + validateAdminAccessForSubscriber(subName, authoritative); + Topic topic = getTopicReference(topicName); + try { + Subscription sub = topic.getSubscription(subName); + checkNotNull(sub); + sub.delete().get(); + log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName); + asyncResponse.resume(Response.noContent().build()); + } catch (Exception e) { + Throwable t = e.getCause(); + if (e instanceof NullPointerException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + } else if (t instanceof SubscriptionBusyException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Subscription has active connected consumers")); + } else { + log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, e); + asyncResponse.resume(new RestException(t)); } } } @@ -1021,67 +1036,76 @@ protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subNa if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); - if (partitionMetadata.partitions > 0) { - final List> futures = Lists.newArrayList(); + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative); + } else { + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); + if (partitionMetadata.partitions > 0) { + final List> futures = Lists.newArrayList(); - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(), + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(), subName)); - } catch (Exception e) { - log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicNamePartition, subName, e); - asyncResponse.resume(new RestException(e)); - return; + } catch (Exception e) { + log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicNamePartition, subName, e); + asyncResponse.resume(new RestException(e)); + return; + } } - } - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return null; - } else { - log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, t); - asyncResponse.resume(new RestException(t)); - return null; + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return null; + } else { + log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } } - } - asyncResponse.resume(Response.noContent().build()); - return null; - }); - } else { - validateAdminAccessForSubscriber(subName, authoritative); - PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); - BiConsumer biConsumer = (v, ex) -> { - if (ex != null) { - asyncResponse.resume(new RestException(ex)); - log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, ex); - } else { asyncResponse.resume(Response.noContent().build()); - log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName); - } - }; - try { - if (subName.startsWith(topic.getReplicatorPrefix())) { - String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); - checkNotNull(repl); - repl.clearBacklog().whenComplete(biConsumer); - } else { - PersistentSubscription sub = topic.getSubscription(subName); - checkNotNull(sub); - sub.clearBacklog().whenComplete(biConsumer); - } - } catch (Exception e) { - if (e instanceof NullPointerException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - } else { - asyncResponse.resume(new RestException(e)); - } + return null; + }); + } else { + internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative); + } + } + } + + private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) { + validateAdminAccessForSubscriber(subName, authoritative); + PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); + BiConsumer biConsumer = (v, ex) -> { + if (ex != null) { + asyncResponse.resume(new RestException(ex)); + log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, ex); + } else { + asyncResponse.resume(Response.noContent().build()); + log.info("[{}] Cleared backlog on {} {}", clientAppId(), topicName, subName); + } + }; + try { + if (subName.startsWith(topic.getReplicatorPrefix())) { + String remoteCluster = PersistentReplicator.getRemoteCluster(subName); + PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); + checkNotNull(repl); + repl.clearBacklog().whenComplete(biConsumer); + } else { + PersistentSubscription sub = topic.getSubscription(subName); + checkNotNull(sub); + sub.clearBacklog().whenComplete(biConsumer); + } + } catch (Exception e) { + if (e instanceof NullPointerException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + } else { + asyncResponse.resume(new RestException(e)); } } } @@ -1122,73 +1146,82 @@ protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResp if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); - if (partitionMetadata.partitions > 0) { - final List> futures = Lists.newArrayList(); + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, expireTimeInSeconds, authoritative); + } else { + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); + if (partitionMetadata.partitions > 0) { + final List> futures = Lists.newArrayList(); - // expire messages for each partition topic - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync( + // expire messages for each partition topic + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync( topicNamePartition.toString(), expireTimeInSeconds)); - } catch (Exception e) { - log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds, + } catch (Exception e) { + log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds, topicNamePartition, e); - asyncResponse.resume(new RestException(e)); - return; + asyncResponse.resume(new RestException(e)); + return; + } } - } - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds, + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds, topicName, t); - asyncResponse.resume(new RestException(t)); - return null; - } + asyncResponse.resume(new RestException(t)); + return null; + } - asyncResponse.resume(Response.noContent().build()); - return null; - }); - } else { - // validate ownership and redirect if current broker is not owner - validateAdminOperationOnTopic(authoritative); + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, expireTimeInSeconds, authoritative); + } + } + } - PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); - final AtomicReference exception = new AtomicReference<>(); + private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, int expireTimeInSeconds, + boolean authoritative) { + // validate ownership and redirect if current broker is not owner + validateAdminOperationOnTopic(authoritative); - topic.getReplicators().forEach((subName, replicator) -> { - try { - internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative); - } catch (Throwable t) { - exception.set(t); - } - }); + PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); + final AtomicReference exception = new AtomicReference<>(); - topic.getSubscriptions().forEach((subName, subscriber) -> { - try { - internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative); - } catch (Throwable t) { - exception.set(t); - } - }); + topic.getReplicators().forEach((subName, replicator) -> { + try { + internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative); + } catch (Throwable t) { + exception.set(t); + } + }); - if (exception.get() != null) { - if (exception.get() instanceof WebApplicationException) { - WebApplicationException wae = (WebApplicationException) exception.get(); - asyncResponse.resume(wae); - return; - } else { - asyncResponse.resume(new RestException(exception.get())); - return; - } + topic.getSubscriptions().forEach((subName, subscriber) -> { + try { + internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative); + } catch (Throwable t) { + exception.set(t); } + }); - asyncResponse.resume(Response.noContent().build()); - return; + if (exception.get() != null) { + if (exception.get() instanceof WebApplicationException) { + WebApplicationException wae = (WebApplicationException) exception.get(); + asyncResponse.resume(wae); + return; + } else { + asyncResponse.resume(new RestException(exception.get())); + return; + } } + + asyncResponse.resume(Response.noContent().build()); } protected void internalResetCursor(AsyncResponse asyncResponse, String subName, long timestamp, @@ -1196,108 +1229,111 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName, if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); - final int numPartitions = partitionMetadata.partitions; - if (numPartitions > 0) { - final CompletableFuture future = new CompletableFuture<>(); - final AtomicInteger count = new AtomicInteger(numPartitions); - final AtomicInteger failureCount = new AtomicInteger(0); - final AtomicReference partitionException = new AtomicReference<>(); - - for (int i = 0; i < numPartitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - pulsar().getAdminClient().topics() + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative); + } else { + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); + final int numPartitions = partitionMetadata.partitions; + if (numPartitions > 0) { + final CompletableFuture future = new CompletableFuture<>(); + final AtomicInteger count = new AtomicInteger(numPartitions); + final AtomicInteger failureCount = new AtomicInteger(0); + final AtomicReference partitionException = new AtomicReference<>(); + + for (int i = 0; i < numPartitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + pulsar().getAdminClient().topics() .resetCursorAsync(topicNamePartition.toString(), subName, timestamp).handle((r, ex) -> { - if (ex != null) { - if (ex instanceof PreconditionFailedException) { - // throw the last exception if all partitions get this error - // any other exception on partition is reported back to user - failureCount.incrementAndGet(); - partitionException.set(ex); - } else { - log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", - clientAppId(), topicNamePartition, subName, timestamp, ex); - future.completeExceptionally(ex); - return null; - } + if (ex != null) { + if (ex instanceof PreconditionFailedException) { + // throw the last exception if all partitions get this error + // any other exception on partition is reported back to user + failureCount.incrementAndGet(); + partitionException.set(ex); + } else { + log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", + clientAppId(), topicNamePartition, subName, timestamp, ex); + future.completeExceptionally(ex); + return null; } + } - if (count.decrementAndGet() == 0) { - future.complete(null); - } + if (count.decrementAndGet() == 0) { + future.complete(null); + } - return null; - }); - } catch (Exception e) { - log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), + return null; + }); + } catch (Exception e) { + log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicNamePartition, subName, timestamp, e); - future.completeExceptionally(e); + future.completeExceptionally(e); + } } - } - future.whenComplete((r, ex) -> { - if (ex != null) { - if (ex instanceof PulsarAdminException) { - asyncResponse.resume(new RestException((PulsarAdminException) ex)); - return; - } else { - asyncResponse.resume(new RestException(ex)); - return; + future.whenComplete((r, ex) -> { + if (ex != null) { + if (ex instanceof PulsarAdminException) { + asyncResponse.resume(new RestException((PulsarAdminException) ex)); + return; + } else { + asyncResponse.resume(new RestException(ex)); + return; + } } - } - // report an error to user if unable to reset for all partitions - if (failureCount.get() == numPartitions) { - log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName, + // report an error to user if unable to reset for all partitions + if (failureCount.get() == numPartitions) { + log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName, subName, timestamp, partitionException.get()); - asyncResponse.resume( + asyncResponse.resume( new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage())); - return; - } else if (failureCount.get() > 0) { - log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}", clientAppId(), + return; + } else if (failureCount.get() > 0) { + log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}", clientAppId(), topicName, subName, timestamp, partitionException.get()); - } + } - asyncResponse.resume(Response.noContent().build()); - return; - }); - } else { - validateAdminAccessForSubscriber(subName, authoritative); - log.info("[{}] [{}] Received reset cursor on subscription {} to time {}", clientAppId(), topicName, subName, - timestamp); - PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); - if (topic == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); - return; + asyncResponse.resume(Response.noContent().build()); + }); + } else { + internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative); } - try { - PersistentSubscription sub = topic.getSubscription(subName); - checkNotNull(sub); - sub.resetCursor(timestamp).get(); - log.info("[{}] [{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName, - timestamp); - asyncResponse.resume(Response.noContent().build()); - return; - } catch (Exception e) { - Throwable t = e.getCause(); - log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName, - subName, timestamp, e); - if (e instanceof NullPointerException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return; - } else if (e instanceof NotAllowedException) { - asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage())); - return; - } else if (t instanceof SubscriptionInvalidCursorPosition) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Unable to find position for timestamp specified -" + t.getMessage())); - return; - } else { - asyncResponse.resume(new RestException(e)); - return; - } + } + } + + private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp, + boolean authoritative) { + validateAdminAccessForSubscriber(subName, authoritative); + log.info("[{}] [{}] Received reset cursor on subscription {} to time {}", clientAppId(), topicName, subName, + timestamp); + PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); + if (topic == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + return; + } + try { + PersistentSubscription sub = topic.getSubscription(subName); + checkNotNull(sub); + sub.resetCursor(timestamp).get(); + log.info("[{}] [{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName, + timestamp); + asyncResponse.resume(Response.noContent().build()); + } catch (Exception e) { + Throwable t = e.getCause(); + log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName, + subName, timestamp, e); + if (e instanceof NullPointerException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + } else if (e instanceof NotAllowedException) { + asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage())); + } else if (t instanceof SubscriptionInvalidCursorPosition) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Unable to find position for timestamp specified -" + t.getMessage())); + } else { + asyncResponse.resume(new RestException(e)); } } } @@ -1310,27 +1346,30 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.earliest : messageId; log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), topicName, subscriptionName, targetMessageId); - - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); - final int numPartitions = partitionMetadata.partitions; - if (numPartitions > 0) { - final CompletableFuture future = new CompletableFuture<>(); - final AtomicInteger count = new AtomicInteger(numPartitions); - final AtomicInteger failureCount = new AtomicInteger(0); - final AtomicReference partitionException = new AtomicReference<>(); - - // Create the subscription on each partition - for (int i = 0; i < numPartitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - pulsar().getAdminClient().topics() + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated); + } else { + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); + final int numPartitions = partitionMetadata.partitions; + if (numPartitions > 0) { + final CompletableFuture future = new CompletableFuture<>(); + final AtomicInteger count = new AtomicInteger(numPartitions); + final AtomicInteger failureCount = new AtomicInteger(0); + final AtomicReference partitionException = new AtomicReference<>(); + + // Create the subscription on each partition + for (int i = 0; i < numPartitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + pulsar().getAdminClient().topics() .createSubscriptionAsync(topicNamePartition.toString(), subscriptionName, targetMessageId) .handle((r, ex) -> { if (ex != null) { // fail the operation on unknown exception or if all the partitioned failed due to // subscription-already-exist if (failureCount.incrementAndGet() == numPartitions - || !(ex instanceof PulsarAdminException.ConflictException)) { + || !(ex instanceof PulsarAdminException.ConflictException)) { partitionException.set(ex); } } @@ -1341,75 +1380,79 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su return null; }); - } catch (Exception e) { - log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), + } catch (Exception e) { + log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicNamePartition, subscriptionName, targetMessageId, e); - future.completeExceptionally(e); + future.completeExceptionally(e); + } } - } - future.whenComplete((r, ex) -> { - if (ex != null) { - if (ex instanceof PulsarAdminException) { - asyncResponse.resume(new RestException((PulsarAdminException) ex)); - return; - } else { - asyncResponse.resume(new RestException(ex)); - return; + future.whenComplete((r, ex) -> { + if (ex != null) { + if (ex instanceof PulsarAdminException) { + asyncResponse.resume(new RestException((PulsarAdminException) ex)); + return; + } else { + asyncResponse.resume(new RestException(ex)); + return; + } } - } - if (partitionException.get() != null) { - log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, + if (partitionException.get() != null) { + log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, subscriptionName, targetMessageId, partitionException.get()); - if (partitionException.get() instanceof PulsarAdminException) { - asyncResponse.resume(new RestException((PulsarAdminException) partitionException.get())); - return; - } else { - asyncResponse.resume(new RestException(partitionException.get())); - return; + if (partitionException.get() instanceof PulsarAdminException) { + asyncResponse.resume(new RestException((PulsarAdminException) partitionException.get())); + return; + } else { + asyncResponse.resume(new RestException(partitionException.get())); + return; + } } - } - - asyncResponse.resume(Response.noContent().build()); - return; - }); - } else { - validateAdminAccessForSubscriber(subscriptionName, authoritative); - - PersistentTopic topic = (PersistentTopic) getOrCreateTopic(topicName); - if (topic.getSubscriptions().containsKey(subscriptionName)) { - asyncResponse.resume(new RestException(Status.CONFLICT, "Subscription already exists for topic")); - return; + asyncResponse.resume(Response.noContent().build()); + }); + } else { + internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated); } + } + } - try { - PersistentSubscription subscription = (PersistentSubscription) topic - .createSubscription(subscriptionName, InitialPosition.Latest, replicated).get(); - // Mark the cursor as "inactive" as it was created without a real consumer connected - subscription.deactivateCursor(); - subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId())) - .get(); - } catch (Throwable e) { - Throwable t = e.getCause(); - log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, - subscriptionName, targetMessageId, e); - if (t instanceof SubscriptionInvalidCursorPosition) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Unable to find position for position specified: " + t.getMessage())); - return; - } else { - asyncResponse.resume(new RestException(e)); - return; - } - } + private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subscriptionName, + MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) { + validateAdminAccessForSubscriber(subscriptionName, authoritative); - log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName, - subscriptionName, targetMessageId); - asyncResponse.resume(Response.noContent().build()); + PersistentTopic topic = (PersistentTopic) getOrCreateTopic(topicName); + + if (topic.getSubscriptions().containsKey(subscriptionName)) { + asyncResponse.resume(new RestException(Status.CONFLICT, "Subscription already exists for topic")); return; } + + try { + PersistentSubscription subscription = (PersistentSubscription) topic + .createSubscription(subscriptionName, InitialPosition.Latest, replicated).get(); + // Mark the cursor as "inactive" as it was created without a real consumer connected + subscription.deactivateCursor(); + subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId())) + .get(); + } catch (Throwable e) { + Throwable t = e.getCause(); + log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, + subscriptionName, targetMessageId, e); + if (t instanceof SubscriptionInvalidCursorPosition) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Unable to find position for position specified: " + t.getMessage())); + return; + } else { + asyncResponse.resume(new RestException(e)); + return; + } + } + + log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName, + subscriptionName, targetMessageId); + asyncResponse.resume(Response.noContent().build()); } protected void internalResetCursorOnPosition(String subName, boolean authoritative, MessageIdImpl messageId) { @@ -1418,10 +1461,8 @@ protected void internalResetCursorOnPosition(String subName, boolean authoritati } log.info("[{}][{}] received reset cursor on subscription {} to position {}", clientAppId(), topicName, subName, messageId); - - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); - - if (partitionMetadata.partitions > 0) { + // If the topic name is a partition name, no need to get partition topic metadata again + if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) { log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), topicName, subName); throw new RestException(Status.METHOD_NOT_ALLOWED, @@ -1458,8 +1499,8 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); - if (partitionMetadata.partitions > 0) { + // If the topic name is a partition name, no need to get partition topic metadata again + if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) { throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed"); } validateAdminAccessForSubscriber(subName, authoritative); @@ -1602,43 +1643,8 @@ protected void internalExpireMessages(AsyncResponse asyncResponse, String subNam if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); - if (partitionMetadata.partitions > 0) { - final List> futures = Lists.newArrayList(); - - // expire messages for each partition topic - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar().getAdminClient().topics().expireMessagesAsync(topicNamePartition.toString(), - subName, expireTimeInSeconds)); - } catch (Exception e) { - log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds, - topicNamePartition, e); - asyncResponse.resume(new RestException(e)); - return; - } - } - - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return null; - } else { - log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds, - topicName, t); - asyncResponse.resume(new RestException(t)); - return null; - } - } - - asyncResponse.resume(Response.noContent().build()); - return null; - }); - } else { + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { try { internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative); } catch (WebApplicationException wae) { @@ -1649,7 +1655,54 @@ protected void internalExpireMessages(AsyncResponse asyncResponse, String subNam return; } asyncResponse.resume(Response.noContent().build()); - return; + } else { + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); + if (partitionMetadata.partitions > 0) { + final List> futures = Lists.newArrayList(); + + // expire messages for each partition topic + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics().expireMessagesAsync(topicNamePartition.toString(), + subName, expireTimeInSeconds)); + } catch (Exception e) { + log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds, + topicNamePartition, e); + asyncResponse.resume(new RestException(e)); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return null; + } else { + log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds, + topicName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + try { + internalExpireMessagesForSinglePartition(subName, expireTimeInSeconds, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + return; + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + return; + } + asyncResponse.resume(Response.noContent().build()); + } } } @@ -1658,9 +1711,8 @@ private void internalExpireMessagesForSinglePartition(String subName, int expire if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); - if (partitionMetadata.partitions > 0) { + // If the topic name is a partition name, no need to get partition topic metadata again + if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) { String msg = "This method should not be called for partitioned topic"; log.error("[{}] {} {} {}", clientAppId(), msg, topicName, subName); throw new IllegalStateException(msg);