From a4b61a6f168814771381966906d2113695e6d146 Mon Sep 17 00:00:00 2001 From: penghui Date: Fri, 16 Jul 2021 22:00:30 +0800 Subject: [PATCH 1/3] Close the replicator and replication client when delete cluster. Currently, when a cluster been deleted, the replicator and the replication client will not been closed. The producer of the replicator will try to reconnect to the deleted cluster continuously. We should close and cleanup the replicator and the replication client for the deleted cluster. --- .../apache/pulsar/broker/PulsarService.java | 13 +++++ .../pulsar/broker/service/BrokerService.java | 26 ++++++++++ .../pulsar/broker/service/ReplicatorTest.java | 50 +++++++++++++++++++ 3 files changed, 89 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 1934bcca9eceb..b783744fd1219 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -96,6 +96,7 @@ import org.apache.pulsar.broker.resourcegroup.ResourceGroupService; import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager; import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager; +import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown; @@ -141,6 +142,8 @@ import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.coordination.CoordinationService; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -602,6 +605,8 @@ public void start() throws PulsarServerException { configurationMetadataStore = createConfigurationMetadataStore(); pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore, config.getZooKeeperOperationTimeoutSeconds()); + + pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster); orderedExecutor = OrderedExecutor.newBuilder() .numThreads(config.getNumOrderedExecutorThreads()) @@ -834,6 +839,14 @@ config, localMetadataStore, getZkClient(), bkClientFactory, ioEventLoopGroup } } + private void handleDeleteCluster(Notification notification) { + if (notification.getPath().startsWith(ClusterResources.CLUSTERS_ROOT) + && notification.getType() == NotificationType.Deleted) { + final String clusterName = notification.getPath().substring(ClusterResources.CLUSTERS_ROOT.length() + 1); + getBrokerService().closeAndRemoveReplicationClient(clusterName); + } + } + public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException { return MetadataStoreExtended.create(config.getZookeeperServers(), MetadataStoreConfig.builder() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 04815d21de81a..1251a98c37ac4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -637,6 +637,32 @@ public void close() throws IOException { } } + public CompletableFuture closeAndRemoveReplicationClient(String clusterName) { + List> futures = new ArrayList<>((int) topics.size()); + topics.forEach((__, future) -> { + CompletableFuture f = new CompletableFuture<>(); + futures.add(f); + future.whenComplete((ot, ex) -> { + if (ot.isPresent()) { + Replicator r = ot.get().getReplicators().get(clusterName); + if (r != null && r.isConnected()) { + r.disconnect(true).whenComplete((v, e) -> f.complete(null)); + return; + } + } + f.complete(null); + }); + }); + + return FutureUtil.waitForAll(futures).thenCompose(__ -> { + PulsarClient client = replicationClients.remove(clusterName); + if (client == null) { + return CompletableFuture.completedFuture(null); + } + return client.closeAsync(); + }); + } + public CompletableFuture closeAsync() { try { log.info("Shutting down Pulsar Broker service"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 825d800459677..1d85cb47cce48 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -83,6 +83,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ReplicatorStats; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; @@ -1168,6 +1169,55 @@ public void createPartitionedTopicTest() throws Exception { checkListContainExpectedTopic(admin3, namespace, expectedTopicList); } + @Test + public void testRemoveClusterFromNamespace() throws Exception { + final String cluster4 = "r4"; + admin1.clusters().createCluster(cluster4, ClusterData.builder() + .serviceUrl(url3.toString()) + .serviceUrlTls(urlTls3.toString()) + .brokerServiceUrl(pulsar3.getSafeBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls()) + .build()); + + admin1.tenants().createTenant("pulsar1", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), + Sets.newHashSet("r1", "r3", cluster4))); + + admin1.namespaces().createNamespace("pulsar1/ns1", Sets.newHashSet("r1", "r3", cluster4)); + + PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient(cluster4); + Assert.assertNotNull(repClient1); + Assert.assertFalse(repClient1.isClosed()); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + final String topicName = "persistent://pulsar1/ns1/testRemoveClusterFromNamespace-" + UUID.randomUUID(); + + Producer producer = client.newProducer() + .topic(topicName) + .create(); + + producer.send("Pulsar".getBytes()); + + producer.close(); + client.close(); + + Replicator replicator = pulsar1.getBrokerService().getTopicReference(topicName) + .get().getReplicators().get(cluster4); + + Awaitility.await().untilAsserted(() -> Assert.assertTrue(replicator.isConnected())); + + admin1.clusters().deleteCluster(cluster4); + + Awaitility.await().untilAsserted(() -> Assert.assertFalse(replicator.isConnected())); + Awaitility.await().untilAsserted(() -> Assert.assertTrue(repClient1.isClosed())); + + Awaitility.await().untilAsserted(() -> Assert.assertNull( + pulsar1.getBrokerService().getReplicationClients().get(cluster4))); + } + private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List expectedTopicList) { // wait non-partitioned topics replicators created finished final List list = new ArrayList<>(); From 91b6aac18ea643b4bcda3d79177a8c0f5f4fe655 Mon Sep 17 00:00:00 2001 From: penghui Date: Sat, 17 Jul 2021 19:11:19 +0800 Subject: [PATCH 2/3] Fix checkstyle --- .../src/main/java/org/apache/pulsar/broker/PulsarService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index b783744fd1219..84395a0f138a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -605,7 +605,7 @@ public void start() throws PulsarServerException { configurationMetadataStore = createConfigurationMetadataStore(); pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore, config.getZooKeeperOperationTimeoutSeconds()); - + pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster); orderedExecutor = OrderedExecutor.newBuilder() From 4fef86219dc8ef7a4d3c49b6d15302f943321672 Mon Sep 17 00:00:00 2001 From: penghui Date: Sun, 18 Jul 2021 18:16:32 +0800 Subject: [PATCH 3/3] Address comment. --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 1251a98c37ac4..110975df19511 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -646,7 +646,7 @@ public CompletableFuture closeAndRemoveReplicationClient(String clusterNam if (ot.isPresent()) { Replicator r = ot.get().getReplicators().get(clusterName); if (r != null && r.isConnected()) { - r.disconnect(true).whenComplete((v, e) -> f.complete(null)); + r.disconnect(false).whenComplete((v, e) -> f.complete(null)); return; } }