diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 1934bcca9eceb..84395a0f138a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -96,6 +96,7 @@ import org.apache.pulsar.broker.resourcegroup.ResourceGroupService; import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager; import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager; +import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown; @@ -141,6 +142,8 @@ import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.coordination.CoordinationService; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -603,6 +606,8 @@ public void start() throws PulsarServerException { pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore, config.getZooKeeperOperationTimeoutSeconds()); + pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster); + orderedExecutor = OrderedExecutor.newBuilder() .numThreads(config.getNumOrderedExecutorThreads()) .name("pulsar-ordered") @@ -834,6 +839,14 @@ config, localMetadataStore, getZkClient(), bkClientFactory, ioEventLoopGroup } } + private void handleDeleteCluster(Notification notification) { + if (notification.getPath().startsWith(ClusterResources.CLUSTERS_ROOT) + && notification.getType() == NotificationType.Deleted) { + final String clusterName = notification.getPath().substring(ClusterResources.CLUSTERS_ROOT.length() + 1); + getBrokerService().closeAndRemoveReplicationClient(clusterName); + } + } + public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException { return MetadataStoreExtended.create(config.getZookeeperServers(), MetadataStoreConfig.builder() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 04815d21de81a..110975df19511 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -637,6 +637,32 @@ public void close() throws IOException { } } + public CompletableFuture closeAndRemoveReplicationClient(String clusterName) { + List> futures = new ArrayList<>((int) topics.size()); + topics.forEach((__, future) -> { + CompletableFuture f = new CompletableFuture<>(); + futures.add(f); + future.whenComplete((ot, ex) -> { + if (ot.isPresent()) { + Replicator r = ot.get().getReplicators().get(clusterName); + if (r != null && r.isConnected()) { + r.disconnect(false).whenComplete((v, e) -> f.complete(null)); + return; + } + } + f.complete(null); + }); + }); + + return FutureUtil.waitForAll(futures).thenCompose(__ -> { + PulsarClient client = replicationClients.remove(clusterName); + if (client == null) { + return CompletableFuture.completedFuture(null); + } + return client.closeAsync(); + }); + } + public CompletableFuture closeAsync() { try { log.info("Shutting down Pulsar Broker service"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 825d800459677..1d85cb47cce48 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -83,6 +83,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ReplicatorStats; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; @@ -1168,6 +1169,55 @@ public void createPartitionedTopicTest() throws Exception { checkListContainExpectedTopic(admin3, namespace, expectedTopicList); } + @Test + public void testRemoveClusterFromNamespace() throws Exception { + final String cluster4 = "r4"; + admin1.clusters().createCluster(cluster4, ClusterData.builder() + .serviceUrl(url3.toString()) + .serviceUrlTls(urlTls3.toString()) + .brokerServiceUrl(pulsar3.getSafeBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls()) + .build()); + + admin1.tenants().createTenant("pulsar1", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), + Sets.newHashSet("r1", "r3", cluster4))); + + admin1.namespaces().createNamespace("pulsar1/ns1", Sets.newHashSet("r1", "r3", cluster4)); + + PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient(cluster4); + Assert.assertNotNull(repClient1); + Assert.assertFalse(repClient1.isClosed()); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + final String topicName = "persistent://pulsar1/ns1/testRemoveClusterFromNamespace-" + UUID.randomUUID(); + + Producer producer = client.newProducer() + .topic(topicName) + .create(); + + producer.send("Pulsar".getBytes()); + + producer.close(); + client.close(); + + Replicator replicator = pulsar1.getBrokerService().getTopicReference(topicName) + .get().getReplicators().get(cluster4); + + Awaitility.await().untilAsserted(() -> Assert.assertTrue(replicator.isConnected())); + + admin1.clusters().deleteCluster(cluster4); + + Awaitility.await().untilAsserted(() -> Assert.assertFalse(replicator.isConnected())); + Awaitility.await().untilAsserted(() -> Assert.assertTrue(repClient1.isClosed())); + + Awaitility.await().untilAsserted(() -> Assert.assertNull( + pulsar1.getBrokerService().getReplicationClients().get(cluster4))); + } + private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List expectedTopicList) { // wait non-partitioned topics replicators created finished final List list = new ArrayList<>();