From 073a4205ec889807e2558b296090466302cd79d4 Mon Sep 17 00:00:00 2001 From: Ruguo Yu Date: Tue, 8 Mar 2022 23:20:43 +0800 Subject: [PATCH 1/3] Ensure the deletion consistency of topic and schema --- .../admin/impl/PersistentTopicsBase.java | 24 +++++-------- .../broker/admin/v1/PersistentTopics.java | 8 ++--- .../broker/admin/v2/PersistentTopics.java | 8 ++--- .../pulsar/broker/service/BrokerService.java | 22 ++---------- .../nonpersistent/NonPersistentTopic.java | 14 ++++---- .../service/persistent/PersistentTopic.java | 17 +++------ .../broker/admin/PersistentTopicsTest.java | 6 ++-- .../broker/service/PersistentTopicTest.java | 6 ++++ .../org/apache/pulsar/schema/SchemaTest.java | 10 +++--- .../apache/pulsar/client/admin/Topics.java | 36 ++++++++++++------- .../client/admin/internal/TopicsImpl.java | 8 ++--- .../pulsar/admin/cli/PulsarAdminToolTest.java | 8 ++--- .../apache/pulsar/admin/cli/CmdTopics.java | 12 ++----- 13 files changed, 76 insertions(+), 103 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a50f54dfa5d9e..a79014a6ecc2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -309,12 +309,12 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse }); } - protected void internalDeleteTopicForcefully(boolean authoritative, boolean deleteSchema) { + protected void internalDeleteTopicForcefully(boolean authoritative) { validateTopicOwnership(topicName, authoritative); validateNamespaceOperation(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC); try { - pulsar().getBrokerService().deleteTopic(topicName.toString(), true, deleteSchema).get(); + pulsar().getBrokerService().deleteTopic(topicName.toString(), true).get(); } catch (Exception e) { if (isManagedLedgerNotFoundException(e)) { log.info("[{}] Topic was already not existing {}", clientAppId(), topicName, e); @@ -581,7 +581,7 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author } protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, - boolean force, boolean deleteSchema) { + boolean force) { validateTopicOwnershipAsync(topicName, authoritative) .thenCompose(__ -> validateNamespaceOperationAsync(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC)) @@ -592,14 +592,6 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole if (numPartitions < 1){ return CompletableFuture.completedFuture(null); } - if (deleteSchema) { - return pulsar().getBrokerService() - .deleteSchemaStorage(topicName.getPartition(0).toString()) - .thenCompose(unused -> - internalRemovePartitionsAuthenticationPoliciesAsync(numPartitions)) - .thenCompose(unused2 -> - internalRemovePartitionsTopicAsync(numPartitions, force)); - } return internalRemovePartitionsAuthenticationPoliciesAsync(numPartitions) .thenCompose(unused -> internalRemovePartitionsTopicAsync(numPartitions, force)); }) @@ -1015,20 +1007,20 @@ private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncRespon }); } - protected void internalDeleteTopic(boolean authoritative, boolean force, boolean deleteSchema) { + protected void internalDeleteTopic(boolean authoritative, boolean force) { if (force) { - internalDeleteTopicForcefully(authoritative, deleteSchema); + internalDeleteTopicForcefully(authoritative); } else { - internalDeleteTopic(authoritative, deleteSchema); + internalDeleteTopic(authoritative); } } - protected void internalDeleteTopic(boolean authoritative, boolean deleteSchema) { + protected void internalDeleteTopic(boolean authoritative) { validateNamespaceOperation(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC); validateTopicOwnership(topicName, authoritative); try { - pulsar().getBrokerService().deleteTopic(topicName.toString(), false, deleteSchema).get(); + pulsar().getBrokerService().deleteTopic(topicName.toString(), false).get(); log.info("[{}] Successfully removed topic {}", clientAppId(), topicName); } catch (Exception e) { Throwable t = e.getCause(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 61f785c5a3479..fa9a206624ac3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -295,10 +295,10 @@ public void deletePartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("force") @DefaultValue("false") boolean force, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) { + @QueryParam("deleteSchema") @DefaultValue("true") boolean deleteSchema) { try { validateTopicName(property, cluster, namespace, encodedTopic); - internalDeletePartitionedTopic(asyncResponse, authoritative, force, deleteSchema); + internalDeletePartitionedTopic(asyncResponse, authoritative, force); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { @@ -334,9 +334,9 @@ public void deleteTopic(@PathParam("property") String property, @PathParam("clus @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("force") @DefaultValue("false") boolean force, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) { + @QueryParam("deleteSchema") @DefaultValue("true") boolean deleteSchema) { validateTopicName(property, cluster, namespace, encodedTopic); - internalDeleteTopic(authoritative, force, deleteSchema); + internalDeleteTopic(authoritative, force); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 5b1544fd06959..7e10ec9e9d660 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -913,10 +913,10 @@ public void deletePartitionedTopic( @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Delete the topic's schema storage") - @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) { + @QueryParam("deleteSchema") @DefaultValue("true") boolean deleteSchema) { try { validatePartitionedTopicName(tenant, namespace, encodedTopic); - internalDeletePartitionedTopic(asyncResponse, authoritative, force, deleteSchema); + internalDeletePartitionedTopic(asyncResponse, authoritative, force); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { @@ -981,9 +981,9 @@ public void deleteTopic( @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Delete the topic's schema storage") - @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) { + @QueryParam("deleteSchema") @DefaultValue("true") boolean deleteSchema) { validateTopicName(tenant, namespace, encodedTopic); - internalDeleteTopic(authoritative, force, deleteSchema); + internalDeleteTopic(authoritative, force); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f6af11f1e651d..9b36fbec11e88 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1023,22 +1023,11 @@ public CompletableFuture deleteSchemaStorage(String topic) { } public CompletableFuture deleteTopic(String topic, boolean forceDelete) { - return deleteTopic(topic, forceDelete, false); - } - - public CompletableFuture deleteTopic(String topic, boolean forceDelete, boolean deleteSchema) { Optional optTopic = getTopicReference(topic); if (optTopic.isPresent()) { Topic t = optTopic.get(); if (forceDelete) { - if (deleteSchema) { - return t.deleteSchema().thenCompose(schemaVersion -> { - log.info("Successfully delete topic {}'s schema of version {}", t.getName(), schemaVersion); - return t.deleteForcefully(); - }); - } else { - return t.deleteForcefully(); - } + return t.deleteForcefully(); } // v2 topics have a global name so check if the topic is replicated. @@ -1050,14 +1039,7 @@ public CompletableFuture deleteTopic(String topic, boolean forceDelete, bo new IllegalStateException("Delete forbidden topic is replicated on clusters " + clusters)); } - if (deleteSchema) { - return t.deleteSchema().thenCompose(schemaVersion -> { - log.info("Successfully delete topic {}'s schema of version {}", t.getName(), schemaVersion); - return t.delete(); - }); - } else { - return t.delete(); - } + return t.delete(); } if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 1edc7ae12e8f0..b7422bca96275 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -370,7 +370,7 @@ public CompletableFuture createSubscription(String subscriptionNam @Override public CompletableFuture delete() { - return delete(false, false, false); + return delete(false, false); } /** @@ -380,11 +380,10 @@ public CompletableFuture delete() { */ @Override public CompletableFuture deleteForcefully() { - return delete(false, true, false); + return delete(false, true); } - private CompletableFuture delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected, - boolean deleteSchema) { + private CompletableFuture delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected) { CompletableFuture deleteFuture = new CompletableFuture<>(); lock.writeLock().lock(); @@ -429,9 +428,8 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c } else { subscriptions.forEach((s, sub) -> futures.add(sub.delete())); } - if (deleteSchema) { - futures.add(deleteSchema().thenApply(schemaVersion -> null)); - } + + futures.add(deleteSchema().thenApply(schemaVersion -> null)); futures.add(deleteTopicPolicies()); FutureUtil.waitForAll(futures).whenComplete((v, ex) -> { if (ex != null) { @@ -920,7 +918,7 @@ public void checkGC() { maxInactiveDurationInSec); } - stopReplProducers().thenCompose(v -> delete(true, false, true)) + stopReplProducers().thenCompose(v -> delete(true, false)) .thenCompose(__ -> tryToDeletePartitionedMetadata()) .thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic)) .exceptionally(e -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index bfe0b6441091e..2f0fed880da8e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1094,11 +1094,6 @@ public CompletableFuture delete() { return delete(false, false, false); } - private CompletableFuture delete(boolean failIfHasSubscriptions, - boolean failIfHasBacklogs, boolean deleteSchema) { - return delete(failIfHasSubscriptions, failIfHasBacklogs, false, deleteSchema); - } - /** * Forcefully close all producers/consumers/replicators and deletes the topic. this function is used when local * cluster is removed from global-namespace replication list. Because broker doesn't allow lookup if local cluster @@ -1108,7 +1103,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, */ @Override public CompletableFuture deleteForcefully() { - return delete(false, false, true, false); + return delete(false, false, true); } /** @@ -1122,16 +1117,13 @@ public CompletableFuture deleteForcefully() { * Flag indicate whether explicitly close connected * producers/consumers/replicators before trying to delete topic. * If any client is connected to a topic and if this flag is disable then this operation fails. - * @param deleteSchema - * Flag indicating whether delete the schema defined for topic if exist. * * @return Completable future indicating completion of delete operation Completed exceptionally with: * IllegalStateException if topic is still active ManagedLedgerException if ledger delete operation fails */ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean failIfHasBacklogs, - boolean closeIfClientsConnected, - boolean deleteSchema) { + boolean closeIfClientsConnected) { CompletableFuture deleteFuture = new CompletableFuture<>(); lock.writeLock().lock(); @@ -1173,8 +1165,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, CompletableFuture deleteTopicAuthenticationFuture = new CompletableFuture<>(); brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); - deleteTopicAuthenticationFuture.thenCompose( - __ -> deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null)) + deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema()) .thenAccept(__ -> deleteTopicPolicies()) .thenCompose(__ -> transactionBufferCleanupAndClose()) .whenComplete((v, ex) -> { @@ -2245,7 +2236,7 @@ public void checkGC() { } replCloseFuture.thenCompose(v -> delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions, - deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, true)) + deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, false)) .thenApply((res) -> tryToDeletePartitionedMetadata()) .thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic)) .exceptionally(e -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 6426dcd600129..0ae7e5494d974 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1231,12 +1231,12 @@ public void testDeleteTopic() throws Exception { persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, false, null); CompletableFuture deleteTopicFuture = new CompletableFuture<>(); deleteTopicFuture.completeExceptionally(new MetadataStoreException.NotFoundException()); - doReturn(deleteTopicFuture).when(brokerService).deleteTopic(anyString(), anyBoolean(), anyBoolean()); + doReturn(deleteTopicFuture).when(brokerService).deleteTopic(anyString(), anyBoolean()); persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true, true); // CompletableFuture deleteTopicFuture2 = new CompletableFuture<>(); deleteTopicFuture2.completeExceptionally(new MetadataStoreException("test exception")); - doReturn(deleteTopicFuture2).when(brokerService).deleteTopic(anyString(), anyBoolean(), anyBoolean()); + doReturn(deleteTopicFuture2).when(brokerService).deleteTopic(anyString(), anyBoolean()); try { persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true, true); } catch (Exception e) { @@ -1245,7 +1245,7 @@ public void testDeleteTopic() throws Exception { // CompletableFuture deleteTopicFuture3 = new CompletableFuture<>(); deleteTopicFuture3.completeExceptionally(new MetadataStoreException.NotFoundException()); - doReturn(deleteTopicFuture3).when(brokerService).deleteTopic(anyString(), anyBoolean(), anyBoolean()); + doReturn(deleteTopicFuture3).when(brokerService).deleteTopic(anyString(), anyBoolean()); try { persistentTopics.deleteTopic(testTenant, testNamespace, topicName, false, true, true); } catch (RestException e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index d1aba97ee9032..82313394d90ab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -108,6 +108,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService; +import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -153,6 +155,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { protected PulsarService pulsar; private BrokerService brokerService; + private SchemaRegistryService schemaRegistryService; private ManagedLedgerFactory mlFactoryMock; private ServerCnx serverCnx; private MetadataStore store; @@ -183,6 +186,9 @@ public void setup() throws Exception { doReturn(svcConfig).when(pulsar).getConfiguration(); doReturn(mock(Compactor.class)).when(pulsar).getCompactor(); + schemaRegistryService = spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class); + doReturn(schemaRegistryService).when(pulsar).getSchemaRegistryService(); + mlFactoryMock = mock(ManagedLedgerFactory.class); doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 217e76820f15c..3aab8ec6f0e15 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -839,9 +839,9 @@ public void testDeleteTopicAndSchemaForV1() throws Exception { assertNotNull(schema); } - // not-force and not-delete-schema when delete topic + // not-force delete topic try { - admin.topics().delete(topic1, false, false); + admin.topics().delete(topic1, false); fail(); } catch (Exception e) { assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions")); @@ -849,7 +849,7 @@ public void testDeleteTopicAndSchemaForV1() throws Exception { assertEquals(this.getPulsar().getSchemaRegistryService() .trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 2); try { - admin.topics().deletePartitionedTopic(topic2, false, false); + admin.topics().deletePartitionedTopic(topic2, false); fail(); } catch (Exception e) { assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions")); @@ -858,10 +858,10 @@ public void testDeleteTopicAndSchemaForV1() throws Exception { .trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(), 1); // force and delete-schema when delete topic - admin.topics().delete(topic1, true, true); + admin.topics().delete(topic1, true); assertEquals(this.getPulsar().getSchemaRegistryService() .trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 0); - admin.topics().deletePartitionedTopic(topic2, true, true); + admin.topics().deletePartitionedTopic(topic2, true); assertEquals(this.getPulsar().getSchemaRegistryService() .trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(), 0); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 1110591eeb2b6..f8565cdb26cac 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -664,7 +664,7 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal CompletableFuture getPartitionedTopicMetadataAsync(String topic); /** - * Delete a partitioned topic. + * Delete a partitioned topic and its schemas. *

* It will also delete all the partitions of the topic if it exists. *

@@ -674,21 +674,24 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal * @param force * Delete topic forcefully * @param deleteSchema - * Delete topic's schema storage + * Delete topic's schema storage and it is always true even if it is specified as false * * @throws PulsarAdminException + * @deprecated Use {@link Topics#deletePartitionedTopic(String, boolean)} instead because parameter + * `deleteSchema` is always true */ + @Deprecated void deletePartitionedTopic(String topic, boolean force, boolean deleteSchema) throws PulsarAdminException; /** * @see Topics#deletePartitionedTopic(String, boolean, boolean) */ default void deletePartitionedTopic(String topic, boolean force) throws PulsarAdminException { - deletePartitionedTopic(topic, force, false); + deletePartitionedTopic(topic, force, true); } /** - * Delete a partitioned topic asynchronously. + * Delete a partitioned topic and its schemas asynchronously. *

* It will also delete all the partitions of the topic if it exists. *

@@ -698,17 +701,20 @@ default void deletePartitionedTopic(String topic, boolean force) throws PulsarAd * @param force * Delete topic forcefully * @param deleteSchema - * Delete topic's schema storage + * Delete topic's schema storage and it is always true even if it is specified as false * * @return a future that can be used to track when the partitioned topic is deleted + * @deprecated Use {@link Topics#deletePartitionedTopicAsync(String, boolean)} instead because parameter + * `deleteSchema` is always true */ + @Deprecated CompletableFuture deletePartitionedTopicAsync(String topic, boolean force, boolean deleteSchema); /** * @see Topics#deletePartitionedTopic(String, boolean, boolean) */ default CompletableFuture deletePartitionedTopicAsync(String topic, boolean force) { - return deletePartitionedTopicAsync(topic, force, false); + return deletePartitionedTopicAsync(topic, force, true); } /** @@ -736,7 +742,7 @@ default CompletableFuture deletePartitionedTopicAsync(String topic, boolea CompletableFuture deletePartitionedTopicAsync(String topic); /** - * Delete a topic. + * Delete a topic and its schemas. *

* Delete a topic. The topic cannot be deleted if force flag is disable and there's any active * subscription or producer connected to the it. Force flag deletes topic forcefully by closing @@ -748,7 +754,7 @@ default CompletableFuture deletePartitionedTopicAsync(String topic, boolea * @param force * Delete topic forcefully * @param deleteSchema - * Delete topic's schema storage + * Delete topic's schema storage and it is always true even if it is specified as false * * @throws NotAuthorizedException * Don't have admin permission @@ -758,18 +764,21 @@ default CompletableFuture deletePartitionedTopicAsync(String topic, boolea * Topic has active subscriptions or producers * @throws PulsarAdminException * Unexpected error + * @deprecated Use {@link Topics#delete(String, boolean)} instead because + * parameter `deleteSchema` is always true */ + @Deprecated void delete(String topic, boolean force, boolean deleteSchema) throws PulsarAdminException; /** * @see Topics#delete(String, boolean, boolean) */ default void delete(String topic, boolean force) throws PulsarAdminException { - delete(topic, force, false); + delete(topic, force, true); } /** - * Delete a topic asynchronously. + * Delete a topic and its schemas asynchronously. *

* Delete a topic asynchronously. The topic cannot be deleted if force flag is disable and there's any active * subscription or producer connected to the it. Force flag deletes topic forcefully by closing all active producers @@ -781,17 +790,20 @@ default void delete(String topic, boolean force) throws PulsarAdminException { * @param force * Delete topic forcefully * @param deleteSchema - * Delete topic's schema storage + * Delete topic's schema storage and it is always true even if it is specified as false * * @return a future that can be used to track when the topic is deleted + * @deprecated Use {@link Topics#deleteAsync(String, boolean)} instead because + * parameter `deleteSchema` is always true */ + @Deprecated CompletableFuture deleteAsync(String topic, boolean force, boolean deleteSchema); /** * @see Topics#deleteAsync(String, boolean, boolean) */ default CompletableFuture deleteAsync(String topic, boolean force) { - return deleteAsync(topic, force, false); + return deleteAsync(topic, force, true); } /** diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 1f2232068f3ec..08b7bd5f2e100 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -460,7 +460,7 @@ public CompletableFuture deletePartitionedTopicAsync(String topic) { @Override public void deletePartitionedTopic(String topic, boolean force, boolean deleteSchema) throws PulsarAdminException { - sync(() -> deletePartitionedTopicAsync(topic, force, deleteSchema)); + sync(() -> deletePartitionedTopicAsync(topic, force, true)); } @Override @@ -468,7 +468,7 @@ public CompletableFuture deletePartitionedTopicAsync(String topic, boolean TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "partitions") // .queryParam("force", Boolean.toString(force)) // - .queryParam("deleteSchema", Boolean.toString(deleteSchema)); + .queryParam("deleteSchema", "true"); return asyncDeleteRequest(path); } @@ -484,7 +484,7 @@ public CompletableFuture deleteAsync(String topic) { @Override public void delete(String topic, boolean force, boolean deleteSchema) throws PulsarAdminException { - sync(() -> deleteAsync(topic, force, deleteSchema)); + sync(() -> deleteAsync(topic, force, true)); } @Override @@ -492,7 +492,7 @@ public CompletableFuture deleteAsync(String topic, boolean force, boolean TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn) // .queryParam("force", Boolean.toString(force)) // - .queryParam("deleteSchema", Boolean.toString(deleteSchema)); + .queryParam("deleteSchema", "true"); return asyncDeleteRequest(path); } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index b8a8612fa27cc..c23d6f47cd581 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1300,8 +1300,8 @@ public void topics() throws Exception { cmdTopics.run(split("truncate persistent://myprop/clust/ns1/ds1")); verify(mockTopics).truncate("persistent://myprop/clust/ns1/ds1"); - cmdTopics.run(split("delete persistent://myprop/clust/ns1/ds1 -f -d")); - verify(mockTopics).delete("persistent://myprop/clust/ns1/ds1", true, true); + cmdTopics.run(split("delete persistent://myprop/clust/ns1/ds1 -f")); + verify(mockTopics).delete("persistent://myprop/clust/ns1/ds1", true); cmdTopics.run(split("unload persistent://myprop/clust/ns1/ds1")); verify(mockTopics).unload("persistent://myprop/clust/ns1/ds1"); @@ -1433,8 +1433,8 @@ public void topics() throws Exception { cmdTopics.run(split("get-partitioned-topic-metadata persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getPartitionedTopicMetadata("persistent://myprop/clust/ns1/ds1"); - cmdTopics.run(split("delete-partitioned-topic persistent://myprop/clust/ns1/ds1 -d -f")); - verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1", true, true); + cmdTopics.run(split("delete-partitioned-topic persistent://myprop/clust/ns1/ds1 -f")); + verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1", true); cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3")); verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 69823d07d91bc..f215a4bde72f5 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -632,14 +632,10 @@ private class DeletePartitionedCmd extends CliCommand { "--force" }, description = "Close all producer/consumer/replicator and delete topic forcefully") private boolean force = false; - @Parameter(names = { "-d", - "--deleteSchema" }, description = "Delete schema while deleting topic") - private boolean deleteSchema = false; - @Override void run() throws Exception { String topic = validateTopicName(params); - getTopics().deletePartitionedTopic(topic, force, deleteSchema); + getTopics().deletePartitionedTopic(topic, force); } } @@ -653,14 +649,10 @@ private class DeleteCmd extends CliCommand { "--force" }, description = "Close all producer/consumer/replicator and delete topic forcefully") private boolean force = false; - @Parameter(names = { "-d", - "--deleteSchema" }, description = "Delete schema while deleting topic") - private boolean deleteSchema = false; - @Override void run() throws PulsarAdminException { String topic = validateTopicName(params); - getTopics().delete(topic, force, deleteSchema); + getTopics().delete(topic, force); } } From b3e74de632cd35ae3137497223be3378fd97c85c Mon Sep 17 00:00:00 2001 From: Ruguo Yu Date: Fri, 11 Mar 2022 16:17:59 +0800 Subject: [PATCH 2/3] Address comment --- .../admin/impl/PersistentTopicsBase.java | 3 ++- .../broker/admin/v1/PersistentTopics.java | 6 ++---- .../broker/admin/v2/PersistentTopics.java | 8 ++------ .../pulsar/broker/service/BrokerService.java | 10 ---------- .../broker/admin/PersistentTopicsTest.java | 12 ++++++------ .../broker/service/PersistentTopicTest.java | 1 + .../org/apache/pulsar/client/admin/Topics.java | 6 ++++++ .../org/apache/pulsar/admin/cli/CmdTopics.java | 18 ++++++++++++++++-- 8 files changed, 35 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a79014a6ecc2d..72d7fcfe3cbc1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -580,7 +580,8 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author return metadata; } - protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, + protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, + boolean authoritative, boolean force) { validateTopicOwnershipAsync(topicName, authoritative) .thenCompose(__ -> validateNamespaceOperationAsync(topicName.getNamespaceObject(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index fa9a206624ac3..986ef9bdb2421 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -294,8 +294,7 @@ public void deletePartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("force") @DefaultValue("false") boolean force, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @QueryParam("deleteSchema") @DefaultValue("true") boolean deleteSchema) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateTopicName(property, cluster, namespace, encodedTopic); internalDeletePartitionedTopic(asyncResponse, authoritative, force); @@ -333,8 +332,7 @@ public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam public void deleteTopic(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("force") @DefaultValue("false") boolean force, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @QueryParam("deleteSchema") @DefaultValue("true") boolean deleteSchema) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(property, cluster, namespace, encodedTopic); internalDeleteTopic(authoritative, force); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 7e10ec9e9d660..f1c17502870c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -911,9 +911,7 @@ public void deletePartitionedTopic( defaultValue = "false", type = "boolean") @QueryParam("force") @DefaultValue("false") boolean force, @ApiParam(value = "Is authentication required to perform this operation") - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @ApiParam(value = "Delete the topic's schema storage") - @QueryParam("deleteSchema") @DefaultValue("true") boolean deleteSchema) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validatePartitionedTopicName(tenant, namespace, encodedTopic); internalDeletePartitionedTopic(asyncResponse, authoritative, force); @@ -979,9 +977,7 @@ public void deleteTopic( defaultValue = "false", type = "boolean") @QueryParam("force") @DefaultValue("false") boolean force, @ApiParam(value = "Is authentication required to perform this operation") - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @ApiParam(value = "Delete the topic's schema storage") - @QueryParam("deleteSchema") @DefaultValue("true") boolean deleteSchema) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); internalDeleteTopic(authoritative, force); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9b36fbec11e88..5d988330e6a97 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -155,7 +155,6 @@ import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; -import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FieldParser; import org.apache.pulsar.common.util.FutureUtil; @@ -1013,15 +1012,6 @@ public CompletableFuture> getTopic(final String topic, boolean c } } - public CompletableFuture deleteSchemaStorage(String topic) { - Optional optTopic = getTopicReference(topic); - if (optTopic.isPresent()) { - return optTopic.get().deleteSchema(); - } else { - return CompletableFuture.completedFuture(null); - } - } - public CompletableFuture deleteTopic(String topic, boolean forceDelete) { Optional optTopic = getTopicReference(topic); if (optTopic.isPresent()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 0ae7e5494d974..bb56f9a59c67f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -241,7 +241,7 @@ public void testGetSubscriptions() { // 9) Delete the partitioned topic response = mock(AsyncResponse.class); - persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, true, false); + persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, true); responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); @@ -577,7 +577,7 @@ public void testUnloadTopic() { // 4) delete partitioned topic response = mock(AsyncResponse.class); - persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, partitionTopicName, true, true, false); + persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, partitionTopicName, true, true); responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); @@ -1040,7 +1040,7 @@ public void testSetReplicatedSubscriptionStatus() { // 10) Delete the partitioned topic response = mock(AsyncResponse.class); - persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, topicName, true, true, false); + persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, topicName, true, true); responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(10000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); @@ -1232,13 +1232,13 @@ public void testDeleteTopic() throws Exception { CompletableFuture deleteTopicFuture = new CompletableFuture<>(); deleteTopicFuture.completeExceptionally(new MetadataStoreException.NotFoundException()); doReturn(deleteTopicFuture).when(brokerService).deleteTopic(anyString(), anyBoolean()); - persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true, true); + persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true); // CompletableFuture deleteTopicFuture2 = new CompletableFuture<>(); deleteTopicFuture2.completeExceptionally(new MetadataStoreException("test exception")); doReturn(deleteTopicFuture2).when(brokerService).deleteTopic(anyString(), anyBoolean()); try { - persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true, true); + persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true); } catch (Exception e) { Assert.assertTrue(e instanceof RestException); } @@ -1247,7 +1247,7 @@ public void testDeleteTopic() throws Exception { deleteTopicFuture3.completeExceptionally(new MetadataStoreException.NotFoundException()); doReturn(deleteTopicFuture3).when(brokerService).deleteTopic(anyString(), anyBoolean()); try { - persistentTopics.deleteTopic(testTenant, testNamespace, topicName, false, true, true); + persistentTopics.deleteTopic(testTenant, testNamespace, topicName, false, true); } catch (RestException e) { Assert.assertEquals(e.getResponse().getStatus(), 404); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 82313394d90ab..c3f9ba5cd3b39 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -186,6 +186,7 @@ public void setup() throws Exception { doReturn(svcConfig).when(pulsar).getConfiguration(); doReturn(mock(Compactor.class)).when(pulsar).getCompactor(); + // create SchemaRegistryService for testDeleteTopic() otherwise it will fail schemaRegistryService = spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class); doReturn(schemaRegistryService).when(pulsar).getSchemaRegistryService(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index f8565cdb26cac..508066a4311ff 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -685,6 +685,9 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal /** * @see Topics#deletePartitionedTopic(String, boolean, boolean) + * IMPORTANT NOTICE: the application is not able to connect to the topic(delete then re-create with same name) again + * if the schema auto uploading is disabled. Besides, users should to use the truncate method to clean up + * data of the topic instead of delete method if users continue to use this topic later. */ default void deletePartitionedTopic(String topic, boolean force) throws PulsarAdminException { deletePartitionedTopic(topic, force, true); @@ -772,6 +775,9 @@ default CompletableFuture deletePartitionedTopicAsync(String topic, boolea /** * @see Topics#delete(String, boolean, boolean) + * IMPORTANT NOTICE: the application is not able to connect to the topic(delete then re-create with same name) again + * if the schema auto uploading is disabled. Besides, users should to use the truncate method to clean up + * data of the topic instead of delete method if users continue to use this topic later. */ default void delete(String topic, boolean force) throws PulsarAdminException { delete(topic, force, true); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index f215a4bde72f5..9c87c0f0f5761 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -622,7 +622,10 @@ void run() throws Exception { } @Parameters(commandDescription = "Delete a partitioned topic. " - + "It will also delete all the partitions of the topic if it exists.") + + "It will also delete all the partitions of the topic if it exists." + + "And the application is not able to connect to the topic(delete then re-create with same name) again " + + "if the schema auto uploading is disabled. Besides, users should to use the truncate cmd to clean up " + + "data of the topic instead of delete cmd if users continue to use this topic later.") private class DeletePartitionedCmd extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) @@ -632,6 +635,10 @@ private class DeletePartitionedCmd extends CliCommand { "--force" }, description = "Close all producer/consumer/replicator and delete topic forcefully") private boolean force = false; + @Parameter(names = {"-d", "--deleteSchema"}, description = "Delete schema while deleting topic, " + + "but the parameter is invalid and the schema is always deleted", hidden = true) + private boolean deleteSchema = false; + @Override void run() throws Exception { String topic = validateTopicName(params); @@ -640,7 +647,10 @@ void run() throws Exception { } @Parameters(commandDescription = "Delete a topic. " - + "The topic cannot be deleted if there's any active subscription or producers connected to it.") + + "The topic cannot be deleted if there's any active subscription or producers connected to it." + + "And the application is not able to connect to the topic(delete then re-create with same name) again " + + "if the schema auto uploading is disabled. Besides, users should to use the truncate cmd to clean up " + + "data of the topic instead of delete cmd if users continue to use this topic later.") private class DeleteCmd extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List params; @@ -649,6 +659,10 @@ private class DeleteCmd extends CliCommand { "--force" }, description = "Close all producer/consumer/replicator and delete topic forcefully") private boolean force = false; + @Parameter(names = {"-d", "--deleteSchema"}, description = "Delete schema while deleting topic, " + + "but the parameter is invalid and the schema is always deleted", hidden = true) + private boolean deleteSchema = false; + @Override void run() throws PulsarAdminException { String topic = validateTopicName(params); From 9dedaef8ff41b50b463ebe57b0da366ae68d2d9b Mon Sep 17 00:00:00 2001 From: Ruguo Yu Date: Fri, 25 Mar 2022 17:12:10 +0800 Subject: [PATCH 3/3] Add test --- .../org/apache/pulsar/schema/SchemaTest.java | 90 ++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 3aab8ec6f0e15..db2c5fc193349 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -784,7 +784,7 @@ public void testDeleteTopicAndSchemaForV1() throws Exception { final String topicOne = "not-partitioned-topic"; final String topic2 = "persistent://" + tenant + "/" + cluster + "/" + namespace + "/partitioned-topic"; - // persistent, not-partitioned v1/topic + // persistent, non-partitioned v1/topic final String topic1 = TopicName.get( TopicDomain.persistent.value(), tenant, @@ -866,6 +866,94 @@ public void testDeleteTopicAndSchemaForV1() throws Exception { .trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(), 0); } + @Test + public void testDeleteTopicAndSchemaForV2() throws Exception { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topicOne = "persistent://" + tenant + "/" + namespace + "/non-partitioned-topic"; + final String topicTwo = "persistent://" + tenant + "/" + namespace + "/partitioned-topic"; + + admin.namespaces().createNamespace( + tenant + "/" + namespace, + Sets.newHashSet(CLUSTER_NAME)); + // persistent, non-partitioned v2/topic + admin.topics().createNonPartitionedTopic(topicOne); + // persistent, partitioned v2/topic + admin.topics().createPartitionedTopic(topicTwo, 1); + + @Cleanup + Producer p1_1 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class)) + .topic(topicOne) + .create(); + + @Cleanup + Producer p1_2 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class)) + .topic(topicOne) + .create(); + @Cleanup + Producer p2_1 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class)) + .topic(topicTwo) + .create(); + + // Get 2 schemas of topicOne + List> schemaFutures1 = + this.getPulsar().getSchemaRegistryService().getAllSchemas(TopicName.get(topicOne).getSchemaName()).get(); + FutureUtil.waitForAll(schemaFutures1).get(); + List schemas1 = schemaFutures1.stream().map(future -> { + try { + return future.get(); + } catch (Exception e) { + return null; + } + }).collect(Collectors.toList()); + assertEquals(schemas1.size(), 2); + for (SchemaRegistry.SchemaAndMetadata schema : schemas1) { + assertNotNull(schema); + } + + // Get 1 schema of topicTwo + List> schemaFutures2 = + this.getPulsar().getSchemaRegistryService().getAllSchemas(TopicName.get(topicTwo).getSchemaName()).get(); + FutureUtil.waitForAll(schemaFutures2).get(); + List schemas2 = schemaFutures2.stream().map(future -> { + try { + return future.get(); + } catch (Exception e) { + return null; + } + }).collect(Collectors.toList()); + assertEquals(schemas2.size(), 1); + for (SchemaRegistry.SchemaAndMetadata schema : schemas2) { + assertNotNull(schema); + } + + // not force delete topic and will fail because it has active producers or subscriptions + try { + admin.topics().delete(topicOne, false); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions")); + } + assertEquals(this.getPulsar().getSchemaRegistryService() + .trimDeletedSchemaAndGetList(TopicName.get(topicOne).getSchemaName()).get().size(), 2); + try { + admin.topics().deletePartitionedTopic(topicTwo, false); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions")); + } + assertEquals(this.getPulsar().getSchemaRegistryService() + .trimDeletedSchemaAndGetList(TopicName.get(topicTwo).getSchemaName()).get().size(), 1); + + // force delete topic and will delete schema by default + admin.topics().delete(topicOne, true); + assertEquals(this.getPulsar().getSchemaRegistryService() + .trimDeletedSchemaAndGetList(TopicName.get(topicOne).getSchemaName()).get().size(), 0); + admin.topics().deletePartitionedTopic(topicTwo, true); + assertEquals(this.getPulsar().getSchemaRegistryService() + .trimDeletedSchemaAndGetList(TopicName.get(topicTwo).getSchemaName()).get().size(), 0); + } + @Test public void testProducerMultipleSchemaMessages() throws Exception { final String tenant = PUBLIC_TENANT;