From 110add00c1cdc59542e7ef906cd5f4409e63dc04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=81=93=E5=90=9B?= Date: Thu, 4 Apr 2024 23:08:45 +0800 Subject: [PATCH] [admin][broker] Fix force delete subscription not working (#22423) --- .../admin/impl/PersistentTopicsBase.java | 5 ++-- .../broker/admin/PersistentTopicsTest.java | 30 +++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 00c2eed27635e..602cd47e5950b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1656,7 +1656,7 @@ protected CompletableFuture internalDeleteSubscriptionAsync(String subName for (int i = 0; i < partitionMetadata.partitions; i++) { TopicName topicNamePartition = topicName.getPartition(i); futures.add(adminClient.topics() - .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false)); + .deleteSubscriptionAsync(topicNamePartition.toString(), subName, force)); } return FutureUtil.waitForAll(futures).handle((result, exception) -> { @@ -1675,8 +1675,7 @@ protected CompletableFuture internalDeleteSubscriptionAsync(String subName return null; }); } - return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, - force); + return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, force); }); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 59c3dbf6ff3ca..d7ffa656bdb6b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -77,11 +77,13 @@ import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -1696,6 +1698,34 @@ public void testCreateMissingPartitions() throws Exception { assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics().createMissedPartitions(topicName)); } + @Test + public void testForceDeleteSubscription() throws Exception { + try { + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); + String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testForceDeleteSubscription"; + String subName = "sub1"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, subName, MessageId.latest); + + @Cleanup + Consumer c0 = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + @Cleanup + Consumer c1 = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + admin.topics().deleteSubscription(topicName, subName, true); + } finally { + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true); + } + } + @Test public void testUpdatePropertiesOnNonDurableSub() throws Exception { String topic = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testUpdatePropertiesOnNonDurableSub";