Skip to content

Commit

Permalink
Close the replicator and replication client when delete cluster. (apa…
Browse files Browse the repository at this point in the history
…che#11342)

### Motivation

Currently, when a cluster has been deleted, the replicator and the replication client will not be closed.
The producer of the replicator will try to reconnect to the deleted cluster continuously.
We should close and clean up the replicator and the replication client for the deleted cluster.

### Verifying this change

A new test was added for verifying the replicator and the replication client will be closed after the cluster is deleted.
  • Loading branch information
codelipenghui authored Jul 19, 2021
1 parent eb4d8aa commit 8a4147e
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
Expand Down Expand Up @@ -141,6 +142,8 @@
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand Down Expand Up @@ -603,6 +606,8 @@ public void start() throws PulsarServerException {
pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore,
config.getZooKeeperOperationTimeoutSeconds());

pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);

orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(config.getNumOrderedExecutorThreads())
.name("pulsar-ordered")
Expand Down Expand Up @@ -834,6 +839,14 @@ config, localMetadataStore, getZkClient(), bkClientFactory, ioEventLoopGroup
}
}

private void handleDeleteCluster(Notification notification) {
if (notification.getPath().startsWith(ClusterResources.CLUSTERS_ROOT)
&& notification.getType() == NotificationType.Deleted) {
final String clusterName = notification.getPath().substring(ClusterResources.CLUSTERS_ROOT.length() + 1);
getBrokerService().closeAndRemoveReplicationClient(clusterName);
}
}

public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return MetadataStoreExtended.create(config.getZookeeperServers(),
MetadataStoreConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,32 @@ public void close() throws IOException {
}
}

public CompletableFuture<Void> closeAndRemoveReplicationClient(String clusterName) {
List<CompletableFuture<Void>> futures = new ArrayList<>((int) topics.size());
topics.forEach((__, future) -> {
CompletableFuture<Void> f = new CompletableFuture<>();
futures.add(f);
future.whenComplete((ot, ex) -> {
if (ot.isPresent()) {
Replicator r = ot.get().getReplicators().get(clusterName);
if (r != null && r.isConnected()) {
r.disconnect(false).whenComplete((v, e) -> f.complete(null));
return;
}
}
f.complete(null);
});
});

return FutureUtil.waitForAll(futures).thenCompose(__ -> {
PulsarClient client = replicationClients.remove(clusterName);
if (client == null) {
return CompletableFuture.completedFuture(null);
}
return client.closeAsync();
});
}

public CompletableFuture<Void> closeAsync() {
try {
log.info("Shutting down Pulsar Broker service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -1168,6 +1169,55 @@ public void createPartitionedTopicTest() throws Exception {
checkListContainExpectedTopic(admin3, namespace, expectedTopicList);
}

@Test
public void testRemoveClusterFromNamespace() throws Exception {
final String cluster4 = "r4";
admin1.clusters().createCluster(cluster4, ClusterData.builder()
.serviceUrl(url3.toString())
.serviceUrlTls(urlTls3.toString())
.brokerServiceUrl(pulsar3.getSafeBrokerServiceUrl())
.brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls())
.build());

admin1.tenants().createTenant("pulsar1",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"),
Sets.newHashSet("r1", "r3", cluster4)));

admin1.namespaces().createNamespace("pulsar1/ns1", Sets.newHashSet("r1", "r3", cluster4));

PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient(cluster4);
Assert.assertNotNull(repClient1);
Assert.assertFalse(repClient1.isClosed());

PulsarClient client = PulsarClient.builder()
.serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

final String topicName = "persistent://pulsar1/ns1/testRemoveClusterFromNamespace-" + UUID.randomUUID();

Producer<byte[]> producer = client.newProducer()
.topic(topicName)
.create();

producer.send("Pulsar".getBytes());

producer.close();
client.close();

Replicator replicator = pulsar1.getBrokerService().getTopicReference(topicName)
.get().getReplicators().get(cluster4);

Awaitility.await().untilAsserted(() -> Assert.assertTrue(replicator.isConnected()));

admin1.clusters().deleteCluster(cluster4);

Awaitility.await().untilAsserted(() -> Assert.assertFalse(replicator.isConnected()));
Awaitility.await().untilAsserted(() -> Assert.assertTrue(repClient1.isClosed()));

Awaitility.await().untilAsserted(() -> Assert.assertNull(
pulsar1.getBrokerService().getReplicationClients().get(cluster4)));
}

private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List<String> expectedTopicList) {
// wait non-partitioned topics replicators created finished
final List<String> list = new ArrayList<>();
Expand Down

0 comments on commit 8a4147e

Please sign in to comment.