Skip to content

Commit

Permalink
Cherry-picking #11342 Close the replicator and replication client whe…
Browse files Browse the repository at this point in the history
…n delete cluster in branch-2.7. (#11390)
  • Loading branch information
Technoboy- authored Jul 23, 2021
1 parent 0ecda73 commit 0adc65d
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class ConfigurationCacheService {
public static final String FAILURE_DOMAIN = "failureDomain";
public final String CLUSTER_FAILURE_DOMAIN_ROOT;
public static final String POLICIES_ROOT = "/admin/policies";
private static final String CLUSTERS_ROOT = "/admin/clusters";
public static final String CLUSTERS_ROOT = "/admin/clusters";

public static final String PARTITIONED_TOPICS_ROOT = "/admin/partitioned-topics";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
Expand All @@ -148,6 +149,7 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.slf4j.Logger;
Expand Down Expand Up @@ -724,6 +726,23 @@ protected void startZkCacheService() throws PulsarServerException {

this.configurationCacheService = new ConfigurationCacheService(globalZkCache, this.config.getClusterName());
this.localZkCacheService = new LocalZooKeeperCacheService(getLocalZkCache(), this.configurationCacheService);
this.configurationCacheService.clustersCache().registerListener(new DeleteClusterListener());
}

class DeleteClusterListener implements ZooKeeperCacheListener {

@Override
public void onUpdate(String path, Object data, Stat stat) {

}

@Override
public void onDelete(String path) {
if (path.startsWith(ConfigurationCacheService.CLUSTERS_ROOT)) {
final String clusterName = path.substring(ConfigurationCacheService.CLUSTERS_ROOT.length() + 1);
getBrokerService().closeAndRemoveReplicationClient(clusterName);
}
}
}

protected void startNamespaceService() throws PulsarServerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -2496,6 +2497,32 @@ private void checkTopicLevelPolicyEnable() {
}
}

public CompletableFuture<Void> closeAndRemoveReplicationClient(String clusterName) {
List<CompletableFuture<Void>> futures = new ArrayList<>((int) topics.size());
topics.forEach((__, future) -> {
CompletableFuture<Void> f = new CompletableFuture<>();
futures.add(f);
future.whenComplete((ot, ex) -> {
if (ot.isPresent()) {
Replicator r = ot.get().getReplicators().get(clusterName);
if (r != null && r.isConnected()) {
r.disconnect(false).whenComplete((v, e) -> f.complete(null));
return;
}
}
f.complete(null);
});
});

return FutureUtil.waitForAll(futures).thenCompose(__ -> {
PulsarClient client = replicationClients.remove(clusterName);
if (client == null) {
return CompletableFuture.completedFuture(null);
}
return client.closeAsync();
});
}

public void setInterceptor(BrokerInterceptor interceptor) {
this.interceptor = interceptor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -1109,6 +1110,47 @@ public void createPartitionedTopicTest() throws Exception {
checkListContainExpectedTopic(admin3, namespace, expectedTopicList);
}

@Test(priority = 100)
public void testRemoveClusterFromNamespace() throws Exception {
final String cluster3 = "r3";

admin1.tenants().createTenant("pulsar1",
new TenantInfo(Sets.newHashSet("appid1", "appid2", "appid3"),
Sets.newHashSet("r1", "r3")));

admin1.namespaces().createNamespace("pulsar1/ns1", Sets.newHashSet("r1", "r3"));

PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient(cluster3);
Assert.assertNotNull(repClient1);
Assert.assertFalse(repClient1.isClosed());

PulsarClient client = PulsarClient.builder()
.serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

final String topicName = "persistent://pulsar1/ns1/testRemoveClusterFromNamespace-" + UUID.randomUUID();

Producer<byte[]> producer = client.newProducer()
.topic(topicName)
.create();

producer.send("Pulsar".getBytes());

producer.close();
client.close();

Replicator replicator = pulsar1.getBrokerService().getTopicReference(topicName)
.get().getReplicators().get(cluster3);

admin1.clusters().deleteCluster(cluster3);

Awaitility.await().untilAsserted(() -> Assert.assertFalse(replicator.isConnected()));
Awaitility.await().untilAsserted(() -> Assert.assertTrue(repClient1.isClosed()));

Awaitility.await().untilAsserted(() -> Assert.assertNull(
pulsar1.getBrokerService().getReplicationClients().get(cluster3)));
}

private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List<String> expectedTopicList) {
// wait non-partitioned topics replicators created finished
final List<String> list = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@

public interface ZooKeeperCacheListener<T> {
public void onUpdate(String path, T data, Stat stat);

default void onDelete(String path) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public void reloadCache(final String path) {
if (LOG.isDebugEnabled()) {
LOG.debug("Node [{}] does not exist", path);
}
listeners.forEach(listener -> listener.onDelete(path));
return;
}

Expand Down

0 comments on commit 0adc65d

Please sign in to comment.