From 66b0d7cf7d97575ac5f27fde8285fb40c3e3a90d Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Wed, 7 Dec 2022 21:47:01 +0800
Subject: [PATCH 01/15] Implement broker registry for new load manager

---
 .../extensions/BrokerRegistryImpl.java        | 218 ++++++++++++++++
 .../extensions/BrokerRegistryTest.java        | 244 ++++++++++++++++++
 2 files changed, 462 insertions(+)
 create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
new file mode 100644
index 0000000000000..d5698dd9fd5c4
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.metadata.api.coordination.ResourceLock;
+
+/**
+ * The broker registry impl, base on the LockManager.
+ */
+@Slf4j
+public class BrokerRegistryImpl implements BrokerRegistry {
+
+    private static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+
+    private final PulsarService pulsar;
+
+    private final ServiceConfiguration conf;
+
+    private final BrokerLookupData brokerLookupData;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+    private final String brokerZNodePath;
+
+    private final String lookupServiceAddress;
+
+    @VisibleForTesting
+    protected final Map<String, BrokerLookupData> brokerLookupDataMap;
+
+    private final ScheduledExecutorService scheduler;
+
+    private final List<BiConsumer<String, NotificationType>> listeners;
+
+    private final AtomicBoolean registered;
+
+    private volatile ResourceLock<BrokerLookupData> brokerLookupDataLock;
+
+    public BrokerRegistryImpl(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.conf = pulsar.getConfiguration();
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+        this.scheduler = pulsar.getLoadManagerExecutor();
+        this.brokerLookupDataMap = new ConcurrentHashMap<>();
+        this.listeners = new ArrayList<>();
+
+        this.registered = new AtomicBoolean(false);
+        this.lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+                + conf.getWebServicePort().orElseGet(() -> conf.getWebServicePortTls().get());
+        this.brokerZNodePath = LOOKUP_DATA_PATH + "/" + lookupServiceAddress;
+        this.brokerLookupData = new BrokerLookupData(
+                pulsar.getSafeWebServiceAddress(),
+                pulsar.getWebServiceAddressTls(),
+                pulsar.getBrokerServiceUrl(),
+                pulsar.getBrokerServiceUrlTls(),
+                pulsar.getAdvertisedListeners(),
+                pulsar.getProtocolDataToAdvertise(),
+                pulsar.getConfiguration().isEnablePersistentTopics(),
+                pulsar.getConfiguration().isEnableNonPersistentTopics(),
+                pulsar.getBrokerVersion());
+    }
+
+    @Override
+    public void start() {
+        pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
+    }
+
+    @Override
+    public void register() {
+        if (registered.compareAndSet(false, true)) {
+            this.brokerLookupDataLock =
+                    brokerLookupDataLockManager.acquireLock(brokerZNodePath, brokerLookupData).join();
+        }
+    }
+
+    @Override
+    public void unregister() throws MetadataStoreException {
+        if (registered.compareAndSet(true, false)) {
+            try {
+                brokerLookupDataLock.release().join();
+            } catch (CompletionException e) {
+                throw MetadataStoreException.unwrap(e);
+            }
+        }
+    }
+
+    @Override
+    public String getLookupServiceAddress() {
+        return this.lookupServiceAddress;
+    }
+
+    @Override
+    public CompletableFuture<List<String>> getAvailableBrokersAsync() {
+        CompletableFuture<List<String>> future = new CompletableFuture<>();
+        brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH)
+                .thenAccept(listLocks -> future.complete(Lists.newArrayList(listLocks)))
+                .exceptionally(ex -> {
+                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                    log.warn("Error when trying to get active brokers", realCause);
+                    future.complete(Lists.newArrayList(this.brokerLookupDataMap.keySet()));
+                    return null;
+                });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker) {
+        return brokerLookupDataLockManager.readLock(keyPath(broker));
+    }
+
+    @Override
+    public void forEach(BiConsumer<String, BrokerLookupData> action) {
+        this.brokerLookupDataMap.forEach(action);
+    }
+
+    public void listen(BiConsumer<String, NotificationType> listener) {
+        this.listeners.add(listener);
+    }
+
+    @Override
+    public void close() throws PulsarServerException {
+        try {
+            this.unregister();
+        } catch (MetadataStoreException ex) {
+            if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
+                throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex));
+            } else {
+                throw new PulsarServerException(MetadataStoreException.unwrap(ex));
+            }
+        }
+
+        this.listeners.clear();
+    }
+
+    private void handleMetadataStoreNotification(Notification t) {
+        if (t.getPath().startsWith(LOOKUP_DATA_PATH) && t.getPath().length() > LOOKUP_DATA_PATH.length()) {
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("Handle notification: [{}]", t);
+                }
+                this.scheduler.submit(() -> {
+                    String lookupServiceAddress = t.getPath().substring(LOOKUP_DATA_PATH.length() + 1);
+                    this.updateBrokerLookupDataToLocalCache(lookupServiceAddress, t.getType());
+                    for (BiConsumer<String, NotificationType> listener : listeners) {
+                        listener.accept(lookupServiceAddress, t.getType());
+                    }
+                });
+            } catch (RejectedExecutionException e) {
+                // Executor is shutting down
+            }
+        }
+    }
+
+    private void updateBrokerLookupDataToLocalCache(String lookupServiceAddress, NotificationType type) {
+        switch (type) {
+            case Created, Modified, ChildrenChanged -> {
+                try {
+                    Optional<BrokerLookupData> lookupData =
+                            brokerLookupDataLockManager.readLock(keyPath(lookupServiceAddress))
+                                    .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+                    if (lookupData.isEmpty()) {
+                        brokerLookupDataMap.remove(lookupServiceAddress);
+                        log.info("[{}] Broker lookup data is not present", lookupServiceAddress);
+                        break;
+                    }
+                    brokerLookupDataMap.put(lookupServiceAddress, lookupData.get());
+                } catch (Exception e) {
+                    log.warn("Error reading broker data from cache for broker - [{}], [{}]",
+                            lookupServiceAddress, e.getMessage());
+                }
+            }
+            case Deleted -> brokerLookupDataMap.remove(lookupServiceAddress);
+        }
+    }
+
+    private String keyPath(String lookupServiceAddress) {
+        return String.format("%s/%s", LOOKUP_DATA_PATH, lookupServiceAddress);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
new file mode 100644
index 0000000000000..642a62c100218
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+/**
+ * Unit test for {@link BrokerRegistry}.
+ */
+public class BrokerRegistryTest {
+
+    private ExecutorService executor;
+
+    private LocalBookkeeperEnsemble bkEnsemble;
+
+    private PulsarService pulsar1;
+
+    private PulsarService pulsar2;
+
+    // Make sure the load manager don't register itself to `/loadbalance/brokers/{lookupServiceAddress}`
+    public static class MockLoadManager implements LoadManager {
+
+        @Override
+        public void start() throws PulsarServerException {
+            // No-op
+        }
+
+        @Override
+        public boolean isCentralized() {
+            return false;
+        }
+
+        @Override
+        public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
+            return Optional.empty();
+        }
+
+        @Override
+        public LoadManagerReport generateLoadReport() throws Exception {
+            return null;
+        }
+
+        @Override
+        public void setLoadReportForceUpdateFlag() {
+            // No-op
+        }
+
+        @Override
+        public void writeLoadReportOnZookeeper() throws Exception {
+            // No-op
+        }
+
+        @Override
+        public void writeResourceQuotasToZooKeeper() throws Exception {
+            // No-op
+        }
+
+        @Override
+        public List<Metrics> getLoadBalancingMetrics() {
+            return null;
+        }
+
+        @Override
+        public void doLoadShedding() {
+            // No-op
+        }
+
+        @Override
+        public void doNamespaceBundleSplit() throws Exception {
+            // No-op
+        }
+
+        @Override
+        public void disableBroker() throws Exception {
+            // No-op
+        }
+
+        @Override
+        public Set<String> getAvailableBrokers() throws Exception {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+            return null;
+        }
+
+        @Override
+        public void stop() throws PulsarServerException {
+            // No-op
+        }
+
+        @Override
+        public void initialize(PulsarService pulsar) {
+            // No-op
+        }
+    }
+
+    @BeforeMethod
+    void setup() throws Exception {
+        executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>());
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble.start();
+
+        // Start broker 1
+        ServiceConfiguration config1 = new ServiceConfiguration();
+        config1.setLoadBalancerEnabled(false);
+        config1.setLoadManagerClassName(MockLoadManager.class.getName());
+        config1.setClusterName("use");
+        config1.setWebServicePort(Optional.of(0));
+        config1.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config1.setBrokerShutdownTimeoutMs(0L);
+        config1.setBrokerServicePort(Optional.of(0));
+        config1.setAdvertisedAddress("localhost");
+        createCluster(bkEnsemble.getZkClient(), config1);
+        pulsar1 = new PulsarService(config1);
+        pulsar1.start();
+
+        // Start broker 2
+        ServiceConfiguration config2 = new ServiceConfiguration();
+        config2.setLoadBalancerEnabled(false);
+        config2.setLoadManagerClassName(MockLoadManager.class.getName());
+        config2.setClusterName("use");
+        config2.setWebServicePort(Optional.of(0));
+        config2.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config2.setBrokerShutdownTimeoutMs(0L);
+        config2.setBrokerServicePort(Optional.of(0));
+        config2.setAdvertisedAddress("localhost");
+        pulsar2 = new PulsarService(config2);
+        pulsar2.start();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    void shutdown() throws Exception {
+        executor.shutdownNow();
+
+        pulsar2.close();
+        pulsar1.close();
+
+        bkEnsemble.stop();
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testRegister() throws Exception {
+        BrokerRegistryImpl brokerRegistry1 = new BrokerRegistryImpl(pulsar1);
+        BrokerRegistryImpl brokerRegistry2 = new BrokerRegistryImpl(pulsar2);
+
+        Set<String> address = new HashSet<>();
+        brokerRegistry1.listen((lookupServiceAddress, type) -> {
+            address.add(lookupServiceAddress);
+        });
+
+        brokerRegistry1.start();
+        brokerRegistry2.start();
+        brokerRegistry1.register();
+        brokerRegistry2.register();
+
+        assertEquals(brokerRegistry1.getAvailableBrokersAsync().get().size(), 2);
+        assertEquals(brokerRegistry2.getAvailableBrokersAsync().get().size(), 2);
+        Awaitility.await().atMost(Duration.ofSeconds(5))
+                        .untilAsserted(() -> {
+                            assertEquals(brokerRegistry1.brokerLookupDataMap.size(), 2);
+                            assertEquals(brokerRegistry2.brokerLookupDataMap.size(), 2);
+                            assertEquals(address.size(), 2);
+                        });
+
+        assertEquals(address, brokerRegistry1.brokerLookupDataMap.keySet());
+        assertEquals(address, brokerRegistry2.brokerLookupDataMap.keySet());
+
+        Optional<BrokerLookupData> lookupDataOpt =
+                brokerRegistry1.lookupAsync(brokerRegistry2.getLookupServiceAddress()).get();
+        assertTrue(lookupDataOpt.isPresent());
+        assertEquals(lookupDataOpt.get().brokerVersion(), pulsar2.getBrokerVersion());
+
+        brokerRegistry1.unregister();
+        assertEquals(brokerRegistry2.getAvailableBrokersAsync().get().size(), 1);
+
+        brokerRegistry1.close();
+        brokerRegistry2.close();
+    }
+
+
+    private void createCluster(ZooKeeper zk, ServiceConfiguration config) throws Exception {
+        ZkUtils.createFullPathOptimistic(zk, "/admin/clusters/" + config.getClusterName(),
+                ObjectMapperFactory.getThreadLocal().writeValueAsBytes(
+                        ClusterData.builder()
+                                .serviceUrl("http://" + config.getAdvertisedAddress() + ":" + config.getWebServicePort()
+                                        .get())
+                                .build()),
+                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+}
+

From 723f1546ef6e18b8f469f5264d5925c8fb19e3bf Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Mon, 12 Dec 2022 22:41:00 +0800
Subject: [PATCH 02/15] Refactor

---
 .../extensions/BrokerRegistryImpl.java        | 114 +++++++++++++----
 .../extensions/BrokerRegistryTest.java        | 119 +++++++++---------
 2 files changed, 150 insertions(+), 83 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index d5698dd9fd5c4..ff5f1593fe8fd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -27,10 +27,11 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -65,27 +66,27 @@ public class BrokerRegistryImpl implements BrokerRegistry {
     private final String lookupServiceAddress;
 
     @VisibleForTesting
-    protected final Map<String, BrokerLookupData> brokerLookupDataMap;
+    protected final Map<String, BrokerLookupData> brokerLookupDataCache;
 
     private final ScheduledExecutorService scheduler;
 
     private final List<BiConsumer<String, NotificationType>> listeners;
 
-    private final AtomicBoolean registered;
-
     private volatile ResourceLock<BrokerLookupData> brokerLookupDataLock;
 
+    private volatile boolean registered = false;
+
+    private volatile CompletableFuture<Void> cacheReloadFuture;
+
     public BrokerRegistryImpl(PulsarService pulsar) {
         this.pulsar = pulsar;
         this.conf = pulsar.getConfiguration();
         this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
         this.scheduler = pulsar.getLoadManagerExecutor();
-        this.brokerLookupDataMap = new ConcurrentHashMap<>();
+        this.brokerLookupDataCache = new ConcurrentHashMap<>();
         this.listeners = new ArrayList<>();
-
-        this.registered = new AtomicBoolean(false);
-        this.lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
-                + conf.getWebServicePort().orElseGet(() -> conf.getWebServicePortTls().get());
+        this.cacheReloadFuture = CompletableFuture.completedFuture(null);
+        this.lookupServiceAddress = pulsar.getLookupServiceAddress();
         this.brokerZNodePath = LOOKUP_DATA_PATH + "/" + lookupServiceAddress;
         this.brokerLookupData = new BrokerLookupData(
                 pulsar.getSafeWebServiceAddress(),
@@ -100,24 +101,38 @@ public BrokerRegistryImpl(PulsarService pulsar) {
     }
 
     @Override
-    public void start() {
+    public void start() throws PulsarServerException {
         pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
+        try {
+            this.register();
+            // Update all lookup data to cache
+            this.reloadAllBrokerLookupCacheAsync()
+                    .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+        } catch (MetadataStoreException | ExecutionException | InterruptedException | TimeoutException e) {
+            throw new PulsarServerException(e);
+        }
     }
 
     @Override
-    public void register() {
-        if (registered.compareAndSet(false, true)) {
-            this.brokerLookupDataLock =
-                    brokerLookupDataLockManager.acquireLock(brokerZNodePath, brokerLookupData).join();
+    public synchronized void register() throws MetadataStoreException {
+        if (!registered) {
+            try {
+                this.brokerLookupDataLock = brokerLookupDataLockManager.acquireLock(brokerZNodePath, brokerLookupData)
+                        .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+                registered = true;
+            } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                throw MetadataStoreException.unwrap(e);
+            }
         }
     }
 
     @Override
-    public void unregister() throws MetadataStoreException {
-        if (registered.compareAndSet(true, false)) {
+    public synchronized void unregister() throws MetadataStoreException {
+        if (registered) {
             try {
-                brokerLookupDataLock.release().join();
-            } catch (CompletionException e) {
+                brokerLookupDataLock.release().get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+                registered = false;
+            } catch (CompletionException | InterruptedException | ExecutionException | TimeoutException e) {
                 throw MetadataStoreException.unwrap(e);
             }
         }
@@ -132,11 +147,17 @@ public String getLookupServiceAddress() {
     public CompletableFuture<List<String>> getAvailableBrokersAsync() {
         CompletableFuture<List<String>> future = new CompletableFuture<>();
         brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH)
-                .thenAccept(listLocks -> future.complete(Lists.newArrayList(listLocks)))
+                .thenAccept(listLocks ->  {
+                    if (this.brokerLookupDataCache.size() != listLocks.size()
+                            || !this.brokerLookupDataCache.keySet().containsAll(listLocks)) {
+                        this.triggerCacheReload();
+                    }
+                    future.complete(Lists.newArrayList(listLocks));
+                })
                 .exceptionally(ex -> {
                     Throwable realCause = FutureUtil.unwrapCompletionException(ex);
                     log.warn("Error when trying to get active brokers", realCause);
-                    future.complete(Lists.newArrayList(this.brokerLookupDataMap.keySet()));
+                    future.complete(Lists.newArrayList(this.brokerLookupDataCache.keySet()));
                     return null;
                 });
         return future;
@@ -149,7 +170,7 @@ public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker)
 
     @Override
     public void forEach(BiConsumer<String, BrokerLookupData> action) {
-        this.brokerLookupDataMap.forEach(action);
+        this.brokerLookupDataCache.forEach(action);
     }
 
     public void listen(BiConsumer<String, NotificationType> listener) {
@@ -198,17 +219,62 @@ private void updateBrokerLookupDataToLocalCache(String lookupServiceAddress, Not
                             brokerLookupDataLockManager.readLock(keyPath(lookupServiceAddress))
                                     .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                     if (lookupData.isEmpty()) {
-                        brokerLookupDataMap.remove(lookupServiceAddress);
+                        brokerLookupDataCache.remove(lookupServiceAddress);
                         log.info("[{}] Broker lookup data is not present", lookupServiceAddress);
                         break;
                     }
-                    brokerLookupDataMap.put(lookupServiceAddress, lookupData.get());
+                    brokerLookupDataCache.put(lookupServiceAddress, lookupData.get());
                 } catch (Exception e) {
                     log.warn("Error reading broker data from cache for broker - [{}], [{}]",
                             lookupServiceAddress, e.getMessage());
                 }
             }
-            case Deleted -> brokerLookupDataMap.remove(lookupServiceAddress);
+            case Deleted -> brokerLookupDataCache.remove(lookupServiceAddress);
+        }
+    }
+
+    private synchronized CompletableFuture<Void> reloadAllBrokerLookupCacheAsync() {
+        if (!cacheReloadFuture.isDone()) {
+            return cacheReloadFuture;
+        }
+        cacheReloadFuture = new CompletableFuture<>();
+        brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenAccept(activeBrokers -> {
+            final List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String broker : activeBrokers) {
+                futures.add(brokerLookupDataLockManager.readLock(keyPath(broker)).thenAccept(lookupData -> {
+                    if (lookupData.isEmpty()) {
+                        brokerLookupDataCache.remove(broker);
+                        log.info("[{}] Broker lookup data is not present", broker);
+                        return;
+                    }
+                    // Replace or initialize the lookup data.
+                    brokerLookupDataCache.put(broker, lookupData.get());
+                }).exceptionally(ex -> {
+                    log.warn("Error reading broker lookup data from cache for broker - [{}], [{}]",
+                            broker, ex.getMessage());
+                    return null;
+                }));
+            }
+            FutureUtil.waitForAll(futures).thenAccept(__ -> {
+                // Remove obsolete brokers.
+                for (final String broker : brokerLookupDataCache.keySet()) {
+                    if (!activeBrokers.contains(broker)) {
+                        brokerLookupDataCache.remove(broker);
+                    }
+                }
+                cacheReloadFuture.complete(null);
+            }).exceptionally(e -> {
+                log.warn("Error to reload the broker lookup cache, [{}]", e.getMessage());
+                cacheReloadFuture.complete(null);
+                return null;
+            });
+        });
+        return cacheReloadFuture;
+    }
+
+    private synchronized void triggerCacheReload() {
+        if (cacheReloadFuture.isDone()) {
+            scheduler.submit(this::reloadAllBrokerLookupCacheAsync);
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 642a62c100218..52abce91646cf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -27,11 +27,12 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.util.ZkUtils;
+import lombok.SneakyThrows;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -39,17 +40,13 @@
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.common.naming.ServiceUnitId;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.stats.Metrics;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
 import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
@@ -58,13 +55,12 @@
  */
 public class BrokerRegistryTest {
 
+    private final List<PulsarService> pulsarServices = new CopyOnWriteArrayList<>();
+
     private ExecutorService executor;
 
     private LocalBookkeeperEnsemble bkEnsemble;
 
-    private PulsarService pulsar1;
-
-    private PulsarService pulsar2;
 
     // Make sure the load manager don't register itself to `/loadbalance/brokers/{lookupServiceAddress}`
     public static class MockLoadManager implements LoadManager {
@@ -145,56 +141,58 @@ public void initialize(PulsarService pulsar) {
         }
     }
 
-    @BeforeMethod
+    @BeforeClass
     void setup() throws Exception {
         executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS,
                 new LinkedBlockingQueue<>());
         // Start local bookkeeper ensemble
         bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
         bkEnsemble.start();
+    }
 
-        // Start broker 1
-        ServiceConfiguration config1 = new ServiceConfiguration();
-        config1.setLoadBalancerEnabled(false);
-        config1.setLoadManagerClassName(MockLoadManager.class.getName());
-        config1.setClusterName("use");
-        config1.setWebServicePort(Optional.of(0));
-        config1.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
-        config1.setBrokerShutdownTimeoutMs(0L);
-        config1.setBrokerServicePort(Optional.of(0));
-        config1.setAdvertisedAddress("localhost");
-        createCluster(bkEnsemble.getZkClient(), config1);
-        pulsar1 = new PulsarService(config1);
-        pulsar1.start();
-
-        // Start broker 2
-        ServiceConfiguration config2 = new ServiceConfiguration();
-        config2.setLoadBalancerEnabled(false);
-        config2.setLoadManagerClassName(MockLoadManager.class.getName());
-        config2.setClusterName("use");
-        config2.setWebServicePort(Optional.of(0));
-        config2.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
-        config2.setBrokerShutdownTimeoutMs(0L);
-        config2.setBrokerServicePort(Optional.of(0));
-        config2.setAdvertisedAddress("localhost");
-        pulsar2 = new PulsarService(config2);
-        pulsar2.start();
+    @SneakyThrows
+    private PulsarService createPulsarService() {
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setLoadBalancerEnabled(false);
+        config.setLoadManagerClassName(MockLoadManager.class.getName());
+        config.setClusterName("use");
+        config.setWebServicePort(Optional.of(0));
+        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setBrokerShutdownTimeoutMs(0L);
+        config.setBrokerServicePort(Optional.of(0));
+        config.setAdvertisedAddress("localhost");
+        PulsarService pulsar = new PulsarService(config);
+        pulsarServices.add(pulsar);
+        return pulsar;
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     void shutdown() throws Exception {
         executor.shutdownNow();
-
-        pulsar2.close();
-        pulsar1.close();
-
         bkEnsemble.stop();
     }
 
+    @AfterMethod(alwaysRun = true)
+    void cleanUp() {
+        pulsarServices.forEach(pulsarService -> {
+            if (pulsarService.isRunning()) {
+                pulsarService.shutdownNow();
+            }
+        });
+        pulsarServices.clear();
+    }
+
     @Test(timeOut = 30 * 1000)
-    public void testRegister() throws Exception {
+    public void testRegisterAndLookupCache() throws Exception {
+        PulsarService pulsar1 = createPulsarService();
+        PulsarService pulsar2 = createPulsarService();
+        PulsarService pulsar3 = createPulsarService();
+        pulsar1.start();
+        pulsar2.start();
+        pulsar3.start();
         BrokerRegistryImpl brokerRegistry1 = new BrokerRegistryImpl(pulsar1);
         BrokerRegistryImpl brokerRegistry2 = new BrokerRegistryImpl(pulsar2);
+        BrokerRegistryImpl brokerRegistry3 = new BrokerRegistryImpl(pulsar3);
 
         Set<String> address = new HashSet<>();
         brokerRegistry1.listen((lookupServiceAddress, type) -> {
@@ -203,20 +201,28 @@ public void testRegister() throws Exception {
 
         brokerRegistry1.start();
         brokerRegistry2.start();
-        brokerRegistry1.register();
-        brokerRegistry2.register();
+
+        assertEquals(brokerRegistry1.brokerLookupDataCache.size(), 2);
+        assertEquals(brokerRegistry2.brokerLookupDataCache.size(), 2);
+        assertEquals(address.size(), 2);
 
         assertEquals(brokerRegistry1.getAvailableBrokersAsync().get().size(), 2);
         assertEquals(brokerRegistry2.getAvailableBrokersAsync().get().size(), 2);
+
+        // Check three broker cache are flush successes.
+        brokerRegistry3.start();
+        assertEquals(brokerRegistry3.brokerLookupDataCache.size(), 3);
+        assertEquals(brokerRegistry3.getAvailableBrokersAsync().get().size(), 3);
         Awaitility.await().atMost(Duration.ofSeconds(5))
                         .untilAsserted(() -> {
-                            assertEquals(brokerRegistry1.brokerLookupDataMap.size(), 2);
-                            assertEquals(brokerRegistry2.brokerLookupDataMap.size(), 2);
-                            assertEquals(address.size(), 2);
+                            assertEquals(brokerRegistry1.brokerLookupDataCache.size(), 3);
+                            assertEquals(brokerRegistry2.brokerLookupDataCache.size(), 3);
+                            assertEquals(address.size(), 3);
                         });
 
-        assertEquals(address, brokerRegistry1.brokerLookupDataMap.keySet());
-        assertEquals(address, brokerRegistry2.brokerLookupDataMap.keySet());
+        assertEquals(address, brokerRegistry1.brokerLookupDataCache.keySet());
+        assertEquals(address, brokerRegistry2.brokerLookupDataCache.keySet());
+        assertEquals(address, brokerRegistry3.brokerLookupDataCache.keySet());
 
         Optional<BrokerLookupData> lookupDataOpt =
                 brokerRegistry1.lookupAsync(brokerRegistry2.getLookupServiceAddress()).get();
@@ -224,21 +230,16 @@ public void testRegister() throws Exception {
         assertEquals(lookupDataOpt.get().brokerVersion(), pulsar2.getBrokerVersion());
 
         brokerRegistry1.unregister();
-        assertEquals(brokerRegistry2.getAvailableBrokersAsync().get().size(), 1);
+        assertEquals(brokerRegistry2.getAvailableBrokersAsync().get().size(), 2);
 
         brokerRegistry1.close();
         brokerRegistry2.close();
+        brokerRegistry3.close();
     }
 
+    @Test
+    public void test() {
 
-    private void createCluster(ZooKeeper zk, ServiceConfiguration config) throws Exception {
-        ZkUtils.createFullPathOptimistic(zk, "/admin/clusters/" + config.getClusterName(),
-                ObjectMapperFactory.getThreadLocal().writeValueAsBytes(
-                        ClusterData.builder()
-                                .serviceUrl("http://" + config.getAdvertisedAddress() + ":" + config.getWebServicePort()
-                                        .get())
-                                .build()),
-                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 }
 

From 3fe33e1f7708b17003f4b876be751d69e980ff2d Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Mon, 12 Dec 2022 22:42:48 +0800
Subject: [PATCH 03/15] Cleanup the listeners first when close

---
 .../broker/loadbalance/extensions/BrokerRegistryImpl.java      | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index ff5f1593fe8fd..76a42081f3546 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -180,6 +180,7 @@ public void listen(BiConsumer<String, NotificationType> listener) {
     @Override
     public void close() throws PulsarServerException {
         try {
+            this.listeners.clear();
             this.unregister();
         } catch (MetadataStoreException ex) {
             if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
@@ -188,8 +189,6 @@ public void close() throws PulsarServerException {
                 throw new PulsarServerException(MetadataStoreException.unwrap(ex));
             }
         }
-
-        this.listeners.clear();
     }
 
     private void handleMetadataStoreNotification(Notification t) {

From c6f8f13e8c65a8e5486cbaa570d57224c8399851 Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Mon, 12 Dec 2022 23:43:06 +0800
Subject: [PATCH 04/15] Add more unit tests

---
 .../extensions/BrokerRegistry.java            |  5 ++
 .../extensions/BrokerRegistryImpl.java        | 12 ++++
 .../extensions/BrokerRegistryTest.java        | 55 ++++++++++++++++---
 3 files changed, 64 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
index 94ac87f7cf704..be751a1e2340e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
@@ -38,6 +38,11 @@ public interface BrokerRegistry extends AutoCloseable {
      */
     void start() throws PulsarServerException;
 
+    /**
+     * Return the broker registry is started.
+     */
+    boolean isStarted();
+
     /**
      * Register local broker to metadata store.
      */
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 76a42081f3546..7c9c41ab66345 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -76,6 +77,8 @@ public class BrokerRegistryImpl implements BrokerRegistry {
 
     private volatile boolean registered = false;
 
+    private AtomicBoolean started = new AtomicBoolean(false);
+
     private volatile CompletableFuture<Void> cacheReloadFuture;
 
     public BrokerRegistryImpl(PulsarService pulsar) {
@@ -108,11 +111,17 @@ public void start() throws PulsarServerException {
             // Update all lookup data to cache
             this.reloadAllBrokerLookupCacheAsync()
                     .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+            this.started.set(true);
         } catch (MetadataStoreException | ExecutionException | InterruptedException | TimeoutException e) {
             throw new PulsarServerException(e);
         }
     }
 
+    @Override
+    public boolean isStarted() {
+        return this.started.get();
+    }
+
     @Override
     public synchronized void register() throws MetadataStoreException {
         if (!registered) {
@@ -179,6 +188,9 @@ public void listen(BiConsumer<String, NotificationType> listener) {
 
     @Override
     public void close() throws PulsarServerException {
+        if (!started.compareAndSet(true, false)) {
+            return;
+        }
         try {
             this.listeners.clear();
             this.unregister();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 52abce91646cf..8a3362bd51833 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import java.time.Duration;
 import java.util.HashSet;
@@ -33,6 +36,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -53,9 +57,11 @@
 /**
  * Unit test for {@link BrokerRegistry}.
  */
+@Slf4j
 public class BrokerRegistryTest {
 
     private final List<PulsarService> pulsarServices = new CopyOnWriteArrayList<>();
+    private final List<BrokerRegistryImpl> brokerRegistries = new CopyOnWriteArrayList<>();
 
     private ExecutorService executor;
 
@@ -161,11 +167,17 @@ private PulsarService createPulsarService() {
         config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setAdvertisedAddress("localhost");
-        PulsarService pulsar = new PulsarService(config);
+        PulsarService pulsar = spy(new PulsarService(config));
         pulsarServices.add(pulsar);
         return pulsar;
     }
 
+    private BrokerRegistryImpl createBrokerRegistryImpl(PulsarService pulsar) {
+        BrokerRegistryImpl brokerRegistry = spy(new BrokerRegistryImpl(pulsar));
+        brokerRegistries.add(brokerRegistry);
+        return brokerRegistry;
+    }
+
     @AfterClass(alwaysRun = true)
     void shutdown() throws Exception {
         executor.shutdownNow();
@@ -174,6 +186,18 @@ void shutdown() throws Exception {
 
     @AfterMethod(alwaysRun = true)
     void cleanUp() {
+        log.info("Cleaning up the broker registry...");
+        brokerRegistries.forEach(brokerRegistry -> {
+            if (brokerRegistry.isStarted()) {
+                try {
+                    brokerRegistry.close();
+                } catch (PulsarServerException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+        brokerRegistries.clear();
+        log.info("Cleaning up the pulsar services...");
         pulsarServices.forEach(pulsarService -> {
             if (pulsarService.isRunning()) {
                 pulsarService.shutdownNow();
@@ -190,9 +214,9 @@ public void testRegisterAndLookupCache() throws Exception {
         pulsar1.start();
         pulsar2.start();
         pulsar3.start();
-        BrokerRegistryImpl brokerRegistry1 = new BrokerRegistryImpl(pulsar1);
-        BrokerRegistryImpl brokerRegistry2 = new BrokerRegistryImpl(pulsar2);
-        BrokerRegistryImpl brokerRegistry3 = new BrokerRegistryImpl(pulsar3);
+        BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
+        BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
+        BrokerRegistryImpl brokerRegistry3 = createBrokerRegistryImpl(pulsar3);
 
         Set<String> address = new HashSet<>();
         brokerRegistry1.listen((lookupServiceAddress, type) -> {
@@ -232,14 +256,29 @@ public void testRegisterAndLookupCache() throws Exception {
         brokerRegistry1.unregister();
         assertEquals(brokerRegistry2.getAvailableBrokersAsync().get().size(), 2);
 
-        brokerRegistry1.close();
-        brokerRegistry2.close();
-        brokerRegistry3.close();
     }
 
     @Test
-    public void test() {
+    public void testRegisterFail() throws Exception {
+        PulsarService pulsar1 = createPulsarService();
+        PulsarService pulsar2 = createPulsarService();
+        pulsar1.start();
+        pulsar2.start();
 
+        doReturn(pulsar1.getLookupServiceAddress()).when(pulsar2).getLookupServiceAddress();
+        BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
+        BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
+        brokerRegistry1.start();
+        try {
+            brokerRegistry2.start();
+            fail();
+        } catch (Exception ex) {
+            log.info("Broker registry start failed.", ex);
+            assertTrue(ex instanceof PulsarServerException);
+            assertTrue(ex.getMessage().contains("LockBusyException"));
+        }
     }
+
+    // TODO: Add more unit tests
 }
 

From ec723fbf8bc628a800b318a6fdabefb99317348c Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Tue, 13 Dec 2022 13:04:27 +0800
Subject: [PATCH 05/15] Add more unit tests

---
 .../extensions/BrokerRegistryImpl.java        | 17 +++-
 .../extensions/BrokerRegistryTest.java        | 95 ++++++++++++++++++-
 2 files changed, 106 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 7c9c41ab66345..c4d941ec78a13 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -77,7 +77,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
 
     private volatile boolean registered = false;
 
-    private AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean started = new AtomicBoolean(false);
 
     private volatile CompletableFuture<Void> cacheReloadFuture;
 
@@ -105,6 +105,9 @@ public BrokerRegistryImpl(PulsarService pulsar) {
 
     @Override
     public void start() throws PulsarServerException {
+        if (started.get()) {
+            return;
+        }
         pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
         try {
             this.register();
@@ -165,7 +168,7 @@ public CompletableFuture<List<String>> getAvailableBrokersAsync() {
                 })
                 .exceptionally(ex -> {
                     Throwable realCause = FutureUtil.unwrapCompletionException(ex);
-                    log.warn("Error when trying to get active brokers", realCause);
+                    log.warn("Error when trying to get active brokers, return cached active brokers.", realCause);
                     future.complete(Lists.newArrayList(this.brokerLookupDataCache.keySet()));
                     return null;
                 });
@@ -192,9 +195,12 @@ public void close() throws PulsarServerException {
             return;
         }
         try {
-            this.listeners.clear();
             this.unregister();
-        } catch (MetadataStoreException ex) {
+            brokerLookupDataLockManager.close();
+            scheduler.shutdown();
+            this.brokerLookupDataCache.clear();
+            this.listeners.clear();
+        } catch (Exception ex) {
             if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
                 throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex));
             } else {
@@ -244,7 +250,8 @@ private void updateBrokerLookupDataToLocalCache(String lookupServiceAddress, Not
         }
     }
 
-    private synchronized CompletableFuture<Void> reloadAllBrokerLookupCacheAsync() {
+    @VisibleForTesting
+    protected synchronized CompletableFuture<Void> reloadAllBrokerLookupCacheAsync() {
         if (!cacheReloadFuture.isDone()) {
             return cacheReloadFuture;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 8a3362bd51833..4352a72dd657b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -18,8 +18,13 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -33,6 +38,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import lombok.SneakyThrows;
@@ -45,6 +51,10 @@
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.coordination.CoordinationService;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.awaitility.Awaitility;
@@ -253,6 +263,7 @@ public void testRegisterAndLookupCache() throws Exception {
         assertTrue(lookupDataOpt.isPresent());
         assertEquals(lookupDataOpt.get().brokerVersion(), pulsar2.getBrokerVersion());
 
+        // Unregister and see the available brokers.
         brokerRegistry1.unregister();
         assertEquals(brokerRegistry2.getAvailableBrokersAsync().get().size(), 2);
 
@@ -279,6 +290,88 @@ public void testRegisterFail() throws Exception {
         }
     }
 
-    // TODO: Add more unit tests
+    @Test
+    public void testGetAvailableBrokersAsyncAndReadFromCache() throws Exception {
+        PulsarService pulsar1 = createPulsarService();
+        PulsarService pulsar2 = createPulsarService();
+        pulsar1.start();
+        pulsar2.start();
+
+        CoordinationService coordinationService = spy(pulsar1.getCoordinationService());
+
+        doReturn(coordinationService).when(pulsar1).getCoordinationService();
+
+        LockManager<BrokerLookupData> lockManager = spy(coordinationService.getLockManager(BrokerLookupData.class));
+
+        doReturn(lockManager).when(coordinationService).getLockManager(BrokerLookupData.class);
+
+        BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
+        Set<String> address = new HashSet<>();
+        brokerRegistry1.listen((lookupServiceAddress, type) -> {
+            address.add(lookupServiceAddress);
+        });
+        BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
+        brokerRegistry1.start();
+        List<String> availableBrokers1 = brokerRegistry1.getAvailableBrokersAsync().get();
+        assertEquals(availableBrokers1.size(), 1);
+
+        CompletableFuture<List<String>> future = new CompletableFuture<>();
+        future.completeExceptionally(new MetadataStoreException("Mock exception"));
+        doReturn(future).when(lockManager).listLocks(anyString());
+
+        brokerRegistry2.start();
+
+        Awaitility.await().untilAsserted(() -> {
+            // The lock manager list locks will get exception, it will read the list from cache.
+            List<String> availableBrokers2 = brokerRegistry1.getAvailableBrokersAsync().get();
+            assertEquals(address.size(), 2);
+            assertEquals(availableBrokers2.size(), 2);
+        });
+    }
+
+    @Test
+    public void testGetAvailableBrokersAsyncAndReloadCache() throws Exception {
+        PulsarService pulsar1 = createPulsarService();
+        PulsarService pulsar2 = createPulsarService();
+        pulsar1.start();
+        pulsar2.start();
+
+        ScheduledExecutorService spyExecutor = spy(pulsar1.getLoadManagerExecutor());
+
+        doReturn(spyExecutor).when(pulsar1).getLoadManagerExecutor();
+        MetadataStoreExtended localMetadataStore = spy(pulsar1.getLocalMetadataStore());
+        doReturn(localMetadataStore).when(pulsar1).getLocalMetadataStore();
+
+        // Disable the register listener method, so the cache will not update.
+        doNothing().when(localMetadataStore).registerListener(any());
+
+        BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
+        brokerRegistry1.start();
+
+        assertEquals(brokerRegistry1.brokerLookupDataCache.size(), 1);
+
+        BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
+        brokerRegistry2.start();
+        assertEquals(brokerRegistry2.brokerLookupDataCache.size(), 2);
+        assertEquals(brokerRegistry1.brokerLookupDataCache.size(), 1);
+
+        // Sleep and check the cache not update.
+        TimeUnit.SECONDS.sleep(1);
+        assertEquals(brokerRegistry1.brokerLookupDataCache.size(), 1);
+
+        // Verify the cache reload only called in broker registry start method.
+        verify(brokerRegistry1, times(1)).reloadAllBrokerLookupCacheAsync();
+
+        // Trigger cache update.
+        List<String> availableBrokers = brokerRegistry1.getAvailableBrokersAsync().get();
+        assertEquals(availableBrokers.size(), 2);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(brokerRegistry1.brokerLookupDataCache.size(), 2);
+        });
+
+        // Verify the cache reloaded.
+        verify(brokerRegistry1, times(2)).reloadAllBrokerLookupCacheAsync();
+    }
+
 }
 

From 01335ab384fa0680aaf247f46482d6cbf05a3fc8 Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Tue, 13 Dec 2022 17:22:01 +0800
Subject: [PATCH 06/15] Add test group

---
 .../pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 4352a72dd657b..f01a9883b396b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -68,6 +68,7 @@
  * Unit test for {@link BrokerRegistry}.
  */
 @Slf4j
+@Test(groups = "broker")
 public class BrokerRegistryTest {
 
     private final List<PulsarService> pulsarServices = new CopyOnWriteArrayList<>();

From 582925b14e5f9cd91acde7eebb2d25c6f423f246 Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Thu, 15 Dec 2022 10:21:05 +0800
Subject: [PATCH 07/15] Change the lookup address to broker id

---
 .../extensions/BrokerRegistry.java            |  8 ++---
 .../extensions/BrokerRegistryImpl.java        | 34 +++++++++----------
 .../extensions/BrokerRegistryTest.java        |  2 +-
 3 files changed, 22 insertions(+), 22 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
index be751a1e2340e..37347bfe1d482 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
@@ -56,11 +56,11 @@ public interface BrokerRegistry extends AutoCloseable {
     void unregister() throws MetadataStoreException;
 
     /**
-     * Get the current broker lookup service address.
+     * Get the current broker ID.
      *
      * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
      */
-    String getLookupServiceAddress();
+    String getBrokerId();
 
     /**
      * Async get available brokers.
@@ -78,14 +78,14 @@ public interface BrokerRegistry extends AutoCloseable {
 
     /**
      * For each the broker lookup data.
-     * The key is lookupServiceAddress{@link BrokerRegistry#getLookupServiceAddress()}
+     * The key is lookupServiceAddress{@link BrokerRegistry#getBrokerId()}
      */
     void forEach(BiConsumer<String, BrokerLookupData> action);
 
     /**
      * Listen the broker register change.
      *
-     * @param listener Key is lookup service address{@link BrokerRegistry#getLookupServiceAddress()}
+     * @param listener Key is lookup service address{@link BrokerRegistry#getBrokerId()}
      *                 Value is notification type.
      */
     void listen(BiConsumer<String, NotificationType> listener);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index c4d941ec78a13..b7b4727882f1e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -64,7 +64,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
 
     private final String brokerZNodePath;
 
-    private final String lookupServiceAddress;
+    private final String brokerId;
 
     @VisibleForTesting
     protected final Map<String, BrokerLookupData> brokerLookupDataCache;
@@ -89,8 +89,8 @@ public BrokerRegistryImpl(PulsarService pulsar) {
         this.brokerLookupDataCache = new ConcurrentHashMap<>();
         this.listeners = new ArrayList<>();
         this.cacheReloadFuture = CompletableFuture.completedFuture(null);
-        this.lookupServiceAddress = pulsar.getLookupServiceAddress();
-        this.brokerZNodePath = LOOKUP_DATA_PATH + "/" + lookupServiceAddress;
+        this.brokerId = pulsar.getLookupServiceAddress();
+        this.brokerZNodePath = LOOKUP_DATA_PATH + "/" + brokerId;
         this.brokerLookupData = new BrokerLookupData(
                 pulsar.getSafeWebServiceAddress(),
                 pulsar.getWebServiceAddressTls(),
@@ -151,8 +151,8 @@ public synchronized void unregister() throws MetadataStoreException {
     }
 
     @Override
-    public String getLookupServiceAddress() {
-        return this.lookupServiceAddress;
+    public String getBrokerId() {
+        return this.brokerId;
     }
 
     @Override
@@ -216,10 +216,10 @@ private void handleMetadataStoreNotification(Notification t) {
                     log.debug("Handle notification: [{}]", t);
                 }
                 this.scheduler.submit(() -> {
-                    String lookupServiceAddress = t.getPath().substring(LOOKUP_DATA_PATH.length() + 1);
-                    this.updateBrokerLookupDataToLocalCache(lookupServiceAddress, t.getType());
+                    String brokerId = t.getPath().substring(LOOKUP_DATA_PATH.length() + 1);
+                    this.updateBrokerLookupDataToLocalCache(brokerId, t.getType());
                     for (BiConsumer<String, NotificationType> listener : listeners) {
-                        listener.accept(lookupServiceAddress, t.getType());
+                        listener.accept(brokerId, t.getType());
                     }
                 });
             } catch (RejectedExecutionException e) {
@@ -228,25 +228,25 @@ private void handleMetadataStoreNotification(Notification t) {
         }
     }
 
-    private void updateBrokerLookupDataToLocalCache(String lookupServiceAddress, NotificationType type) {
+    private void updateBrokerLookupDataToLocalCache(String brokerId, NotificationType type) {
         switch (type) {
             case Created, Modified, ChildrenChanged -> {
                 try {
                     Optional<BrokerLookupData> lookupData =
-                            brokerLookupDataLockManager.readLock(keyPath(lookupServiceAddress))
+                            brokerLookupDataLockManager.readLock(keyPath(brokerId))
                                     .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                     if (lookupData.isEmpty()) {
-                        brokerLookupDataCache.remove(lookupServiceAddress);
-                        log.info("[{}] Broker lookup data is not present", lookupServiceAddress);
+                        brokerLookupDataCache.remove(brokerId);
+                        log.info("[{}] Broker lookup data is not present", brokerId);
                         break;
                     }
-                    brokerLookupDataCache.put(lookupServiceAddress, lookupData.get());
+                    brokerLookupDataCache.put(brokerId, lookupData.get());
                 } catch (Exception e) {
                     log.warn("Error reading broker data from cache for broker - [{}], [{}]",
-                            lookupServiceAddress, e.getMessage());
+                            brokerId, e.getMessage());
                 }
             }
-            case Deleted -> brokerLookupDataCache.remove(lookupServiceAddress);
+            case Deleted -> brokerLookupDataCache.remove(brokerId);
         }
     }
 
@@ -296,7 +296,7 @@ private synchronized void triggerCacheReload() {
         }
     }
 
-    private String keyPath(String lookupServiceAddress) {
-        return String.format("%s/%s", LOOKUP_DATA_PATH, lookupServiceAddress);
+    private String keyPath(String brokerId) {
+        return String.format("%s/%s", LOOKUP_DATA_PATH, brokerId);
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index f01a9883b396b..1fea69d466f62 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -260,7 +260,7 @@ public void testRegisterAndLookupCache() throws Exception {
         assertEquals(address, brokerRegistry3.brokerLookupDataCache.keySet());
 
         Optional<BrokerLookupData> lookupDataOpt =
-                brokerRegistry1.lookupAsync(brokerRegistry2.getLookupServiceAddress()).get();
+                brokerRegistry1.lookupAsync(brokerRegistry2.getBrokerId()).get();
         assertTrue(lookupDataOpt.isPresent());
         assertEquals(lookupDataOpt.get().brokerVersion(), pulsar2.getBrokerVersion());
 

From b66cd7f433a0867dbec038708f58b219a1664635 Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Thu, 15 Dec 2022 10:52:16 +0800
Subject: [PATCH 08/15] Remove cache and add more tests

---
 .../extensions/BrokerRegistry.java            |   6 -
 .../extensions/BrokerRegistryImpl.java        | 143 +++---------------
 .../extensions/BrokerRegistryTest.java        | 132 ++++------------
 3 files changed, 51 insertions(+), 230 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
index 37347bfe1d482..c534c583521f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
@@ -76,12 +76,6 @@ public interface BrokerRegistry extends AutoCloseable {
      */
     CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker);
 
-    /**
-     * For each the broker lookup data.
-     * The key is lookupServiceAddress{@link BrokerRegistry#getBrokerId()}
-     */
-    void forEach(BiConsumer<String, BrokerLookupData> action);
-
     /**
      * Listen the broker register change.
      *
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index b7b4727882f1e..0b171865f0d48 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -22,11 +22,9 @@
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -39,7 +37,6 @@
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
@@ -52,7 +49,7 @@
 @Slf4j
 public class BrokerRegistryImpl implements BrokerRegistry {
 
-    private static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
 
     private final PulsarService pulsar;
 
@@ -62,13 +59,8 @@ public class BrokerRegistryImpl implements BrokerRegistry {
 
     private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
 
-    private final String brokerZNodePath;
-
     private final String brokerId;
 
-    @VisibleForTesting
-    protected final Map<String, BrokerLookupData> brokerLookupDataCache;
-
     private final ScheduledExecutorService scheduler;
 
     private final List<BiConsumer<String, NotificationType>> listeners;
@@ -79,18 +71,13 @@ public class BrokerRegistryImpl implements BrokerRegistry {
 
     private final AtomicBoolean started = new AtomicBoolean(false);
 
-    private volatile CompletableFuture<Void> cacheReloadFuture;
-
     public BrokerRegistryImpl(PulsarService pulsar) {
         this.pulsar = pulsar;
         this.conf = pulsar.getConfiguration();
         this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
         this.scheduler = pulsar.getLoadManagerExecutor();
-        this.brokerLookupDataCache = new ConcurrentHashMap<>();
         this.listeners = new ArrayList<>();
-        this.cacheReloadFuture = CompletableFuture.completedFuture(null);
         this.brokerId = pulsar.getLookupServiceAddress();
-        this.brokerZNodePath = LOOKUP_DATA_PATH + "/" + brokerId;
         this.brokerLookupData = new BrokerLookupData(
                 pulsar.getSafeWebServiceAddress(),
                 pulsar.getWebServiceAddressTls(),
@@ -111,11 +98,8 @@ public void start() throws PulsarServerException {
         pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
         try {
             this.register();
-            // Update all lookup data to cache
-            this.reloadAllBrokerLookupCacheAsync()
-                    .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
             this.started.set(true);
-        } catch (MetadataStoreException | ExecutionException | InterruptedException | TimeoutException e) {
+        } catch (MetadataStoreException e) {
             throw new PulsarServerException(e);
         }
     }
@@ -129,7 +113,7 @@ public boolean isStarted() {
     public synchronized void register() throws MetadataStoreException {
         if (!registered) {
             try {
-                this.brokerLookupDataLock = brokerLookupDataLockManager.acquireLock(brokerZNodePath, brokerLookupData)
+                this.brokerLookupDataLock = brokerLookupDataLockManager.acquireLock(keyPath(brokerId), brokerLookupData)
                         .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                 registered = true;
             } catch (InterruptedException | ExecutionException | TimeoutException e) {
@@ -157,22 +141,8 @@ public String getBrokerId() {
 
     @Override
     public CompletableFuture<List<String>> getAvailableBrokersAsync() {
-        CompletableFuture<List<String>> future = new CompletableFuture<>();
-        brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH)
-                .thenAccept(listLocks ->  {
-                    if (this.brokerLookupDataCache.size() != listLocks.size()
-                            || !this.brokerLookupDataCache.keySet().containsAll(listLocks)) {
-                        this.triggerCacheReload();
-                    }
-                    future.complete(Lists.newArrayList(listLocks));
-                })
-                .exceptionally(ex -> {
-                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
-                    log.warn("Error when trying to get active brokers, return cached active brokers.", realCause);
-                    future.complete(Lists.newArrayList(this.brokerLookupDataCache.keySet()));
-                    return null;
-                });
-        return future;
+        return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH)
+                .thenCompose(listLocks -> CompletableFuture.completedFuture(Lists.newArrayList(listLocks)));
     }
 
     @Override
@@ -180,11 +150,6 @@ public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker)
         return brokerLookupDataLockManager.readLock(keyPath(broker));
     }
 
-    @Override
-    public void forEach(BiConsumer<String, BrokerLookupData> action) {
-        this.brokerLookupDataCache.forEach(action);
-    }
-
     public void listen(BiConsumer<String, NotificationType> listener) {
         this.listeners.add(listener);
     }
@@ -198,7 +163,6 @@ public void close() throws PulsarServerException {
             this.unregister();
             brokerLookupDataLockManager.close();
             scheduler.shutdown();
-            this.brokerLookupDataCache.clear();
             this.listeners.clear();
         } catch (Exception ex) {
             if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
@@ -210,93 +174,34 @@ public void close() throws PulsarServerException {
     }
 
     private void handleMetadataStoreNotification(Notification t) {
-        if (t.getPath().startsWith(LOOKUP_DATA_PATH) && t.getPath().length() > LOOKUP_DATA_PATH.length()) {
-            try {
-                if (log.isDebugEnabled()) {
-                    log.debug("Handle notification: [{}]", t);
-                }
-                this.scheduler.submit(() -> {
-                    String brokerId = t.getPath().substring(LOOKUP_DATA_PATH.length() + 1);
-                    this.updateBrokerLookupDataToLocalCache(brokerId, t.getType());
-                    for (BiConsumer<String, NotificationType> listener : listeners) {
-                        listener.accept(brokerId, t.getType());
-                    }
-                });
-            } catch (RejectedExecutionException e) {
-                // Executor is shutting down
-            }
+        if (!isVerifiedNotification(t)) {
+            return;
         }
-    }
-
-    private void updateBrokerLookupDataToLocalCache(String brokerId, NotificationType type) {
-        switch (type) {
-            case Created, Modified, ChildrenChanged -> {
-                try {
-                    Optional<BrokerLookupData> lookupData =
-                            brokerLookupDataLockManager.readLock(keyPath(brokerId))
-                                    .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
-                    if (lookupData.isEmpty()) {
-                        brokerLookupDataCache.remove(brokerId);
-                        log.info("[{}] Broker lookup data is not present", brokerId);
-                        break;
-                    }
-                    brokerLookupDataCache.put(brokerId, lookupData.get());
-                } catch (Exception e) {
-                    log.warn("Error reading broker data from cache for broker - [{}], [{}]",
-                            brokerId, e.getMessage());
-                }
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("Handle notification: [{}]", t);
             }
-            case Deleted -> brokerLookupDataCache.remove(brokerId);
-        }
-    }
-
-    @VisibleForTesting
-    protected synchronized CompletableFuture<Void> reloadAllBrokerLookupCacheAsync() {
-        if (!cacheReloadFuture.isDone()) {
-            return cacheReloadFuture;
-        }
-        cacheReloadFuture = new CompletableFuture<>();
-        brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenAccept(activeBrokers -> {
-            final List<CompletableFuture<Void>> futures = new ArrayList<>();
-            for (String broker : activeBrokers) {
-                futures.add(brokerLookupDataLockManager.readLock(keyPath(broker)).thenAccept(lookupData -> {
-                    if (lookupData.isEmpty()) {
-                        brokerLookupDataCache.remove(broker);
-                        log.info("[{}] Broker lookup data is not present", broker);
-                        return;
-                    }
-                    // Replace or initialize the lookup data.
-                    brokerLookupDataCache.put(broker, lookupData.get());
-                }).exceptionally(ex -> {
-                    log.warn("Error reading broker lookup data from cache for broker - [{}], [{}]",
-                            broker, ex.getMessage());
-                    return null;
-                }));
+            if (listeners.isEmpty()) {
+                return;
             }
-            FutureUtil.waitForAll(futures).thenAccept(__ -> {
-                // Remove obsolete brokers.
-                for (final String broker : brokerLookupDataCache.keySet()) {
-                    if (!activeBrokers.contains(broker)) {
-                        brokerLookupDataCache.remove(broker);
-                    }
+            this.scheduler.submit(() -> {
+                String brokerId = t.getPath().substring(LOOKUP_DATA_PATH.length() + 1);
+                for (BiConsumer<String, NotificationType> listener : listeners) {
+                    listener.accept(brokerId, t.getType());
                 }
-                cacheReloadFuture.complete(null);
-            }).exceptionally(e -> {
-                log.warn("Error to reload the broker lookup cache, [{}]", e.getMessage());
-                cacheReloadFuture.complete(null);
-                return null;
             });
-        });
-        return cacheReloadFuture;
+        } catch (RejectedExecutionException e) {
+            // Executor is shutting down
+        }
     }
 
-    private synchronized void triggerCacheReload() {
-        if (cacheReloadFuture.isDone()) {
-            scheduler.submit(this::reloadAllBrokerLookupCacheAsync);
-        }
+    @VisibleForTesting
+    protected static boolean isVerifiedNotification(Notification t) {
+       return t.getPath().startsWith(LOOKUP_DATA_PATH + "/") && t.getPath().length() > LOOKUP_DATA_PATH.length() + 1;
     }
 
-    private String keyPath(String brokerId) {
+    @VisibleForTesting
+    protected static String keyPath(String brokerId) {
         return String.format("%s/%s", LOOKUP_DATA_PATH, brokerId);
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 1fea69d466f62..13b846a4f5eba 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -18,14 +18,10 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -38,7 +34,6 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import lombok.SneakyThrows;
@@ -51,10 +46,8 @@
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.stats.Metrics;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.api.coordination.CoordinationService;
-import org.apache.pulsar.metadata.api.coordination.LockManager;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.awaitility.Awaitility;
@@ -218,7 +211,7 @@ void cleanUp() {
     }
 
     @Test(timeOut = 30 * 1000)
-    public void testRegisterAndLookupCache() throws Exception {
+    public void testRegisterAndLookup() throws Exception {
         PulsarService pulsar1 = createPulsarService();
         PulsarService pulsar2 = createPulsarService();
         PulsarService pulsar3 = createPulsarService();
@@ -229,35 +222,29 @@ public void testRegisterAndLookupCache() throws Exception {
         BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
         BrokerRegistryImpl brokerRegistry3 = createBrokerRegistryImpl(pulsar3);
 
-        Set<String> address = new HashSet<>();
-        brokerRegistry1.listen((lookupServiceAddress, type) -> {
-            address.add(lookupServiceAddress);
+        Set<String> brokerIds = new HashSet<>();
+        brokerRegistry1.listen((brokerId, type) -> {
+            brokerIds.add(brokerId);
         });
 
         brokerRegistry1.start();
         brokerRegistry2.start();
 
-        assertEquals(brokerRegistry1.brokerLookupDataCache.size(), 2);
-        assertEquals(brokerRegistry2.brokerLookupDataCache.size(), 2);
-        assertEquals(address.size(), 2);
+        Awaitility.await().atMost(Duration.ofSeconds(5))
+                .untilAsserted(() -> assertEquals(brokerIds.size(), 2));
 
         assertEquals(brokerRegistry1.getAvailableBrokersAsync().get().size(), 2);
         assertEquals(brokerRegistry2.getAvailableBrokersAsync().get().size(), 2);
 
         // Check three broker cache are flush successes.
         brokerRegistry3.start();
-        assertEquals(brokerRegistry3.brokerLookupDataCache.size(), 3);
         assertEquals(brokerRegistry3.getAvailableBrokersAsync().get().size(), 3);
         Awaitility.await().atMost(Duration.ofSeconds(5))
-                        .untilAsserted(() -> {
-                            assertEquals(brokerRegistry1.brokerLookupDataCache.size(), 3);
-                            assertEquals(brokerRegistry2.brokerLookupDataCache.size(), 3);
-                            assertEquals(address.size(), 3);
-                        });
+                .untilAsserted(() -> assertEquals(brokerIds.size(), 3));
 
-        assertEquals(address, brokerRegistry1.brokerLookupDataCache.keySet());
-        assertEquals(address, brokerRegistry2.brokerLookupDataCache.keySet());
-        assertEquals(address, brokerRegistry3.brokerLookupDataCache.keySet());
+        assertEquals(brokerIds, new HashSet<>(brokerRegistry1.getAvailableBrokersAsync().get()));
+        assertEquals(brokerIds, new HashSet<>(brokerRegistry1.getAvailableBrokersAsync().get()));
+        assertEquals(brokerIds, new HashSet<>(brokerRegistry1.getAvailableBrokersAsync().get()));
 
         Optional<BrokerLookupData> lookupDataOpt =
                 brokerRegistry1.lookupAsync(brokerRegistry2.getBrokerId()).get();
@@ -271,7 +258,7 @@ public void testRegisterAndLookupCache() throws Exception {
     }
 
     @Test
-    public void testRegisterFail() throws Exception {
+    public void testRegisterFailWithSameBrokerId() throws Exception {
         PulsarService pulsar1 = createPulsarService();
         PulsarService pulsar2 = createPulsarService();
         pulsar1.start();
@@ -292,87 +279,22 @@ public void testRegisterFail() throws Exception {
     }
 
     @Test
-    public void testGetAvailableBrokersAsyncAndReadFromCache() throws Exception {
-        PulsarService pulsar1 = createPulsarService();
-        PulsarService pulsar2 = createPulsarService();
-        pulsar1.start();
-        pulsar2.start();
-
-        CoordinationService coordinationService = spy(pulsar1.getCoordinationService());
-
-        doReturn(coordinationService).when(pulsar1).getCoordinationService();
-
-        LockManager<BrokerLookupData> lockManager = spy(coordinationService.getLockManager(BrokerLookupData.class));
-
-        doReturn(lockManager).when(coordinationService).getLockManager(BrokerLookupData.class);
-
-        BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
-        Set<String> address = new HashSet<>();
-        brokerRegistry1.listen((lookupServiceAddress, type) -> {
-            address.add(lookupServiceAddress);
-        });
-        BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
-        brokerRegistry1.start();
-        List<String> availableBrokers1 = brokerRegistry1.getAvailableBrokersAsync().get();
-        assertEquals(availableBrokers1.size(), 1);
-
-        CompletableFuture<List<String>> future = new CompletableFuture<>();
-        future.completeExceptionally(new MetadataStoreException("Mock exception"));
-        doReturn(future).when(lockManager).listLocks(anyString());
-
-        brokerRegistry2.start();
-
-        Awaitility.await().untilAsserted(() -> {
-            // The lock manager list locks will get exception, it will read the list from cache.
-            List<String> availableBrokers2 = brokerRegistry1.getAvailableBrokersAsync().get();
-            assertEquals(address.size(), 2);
-            assertEquals(availableBrokers2.size(), 2);
-        });
+    public void testIsVerifiedNotification() {
+        assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created, "/")));
+        assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created,
+                BrokerRegistryImpl.LOOKUP_DATA_PATH + "xyz")));
+        assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created,
+                BrokerRegistryImpl.LOOKUP_DATA_PATH)));
+        assertTrue(BrokerRegistryImpl.isVerifiedNotification(
+                new Notification(NotificationType.Created, BrokerRegistryImpl.LOOKUP_DATA_PATH + "/brokerId")));
+        assertTrue(BrokerRegistryImpl.isVerifiedNotification(
+                new Notification(NotificationType.Created, BrokerRegistryImpl.LOOKUP_DATA_PATH + "/brokerId/xyz")));
     }
 
     @Test
-    public void testGetAvailableBrokersAsyncAndReloadCache() throws Exception {
-        PulsarService pulsar1 = createPulsarService();
-        PulsarService pulsar2 = createPulsarService();
-        pulsar1.start();
-        pulsar2.start();
-
-        ScheduledExecutorService spyExecutor = spy(pulsar1.getLoadManagerExecutor());
-
-        doReturn(spyExecutor).when(pulsar1).getLoadManagerExecutor();
-        MetadataStoreExtended localMetadataStore = spy(pulsar1.getLocalMetadataStore());
-        doReturn(localMetadataStore).when(pulsar1).getLocalMetadataStore();
-
-        // Disable the register listener method, so the cache will not update.
-        doNothing().when(localMetadataStore).registerListener(any());
-
-        BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
-        brokerRegistry1.start();
-
-        assertEquals(brokerRegistry1.brokerLookupDataCache.size(), 1);
-
-        BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
-        brokerRegistry2.start();
-        assertEquals(brokerRegistry2.brokerLookupDataCache.size(), 2);
-        assertEquals(brokerRegistry1.brokerLookupDataCache.size(), 1);
-
-        // Sleep and check the cache not update.
-        TimeUnit.SECONDS.sleep(1);
-        assertEquals(brokerRegistry1.brokerLookupDataCache.size(), 1);
-
-        // Verify the cache reload only called in broker registry start method.
-        verify(brokerRegistry1, times(1)).reloadAllBrokerLookupCacheAsync();
-
-        // Trigger cache update.
-        List<String> availableBrokers = brokerRegistry1.getAvailableBrokersAsync().get();
-        assertEquals(availableBrokers.size(), 2);
-        Awaitility.await().untilAsserted(() -> {
-            assertEquals(brokerRegistry1.brokerLookupDataCache.size(), 2);
-        });
-
-        // Verify the cache reloaded.
-        verify(brokerRegistry1, times(2)).reloadAllBrokerLookupCacheAsync();
+    public void testKeyPath() {
+        String keyPath = BrokerRegistryImpl.keyPath("brokerId");
+        assertEquals(keyPath, BrokerRegistryImpl.LOOKUP_DATA_PATH + "/brokerId");
     }
-
 }
 

From 48c1cbce35c3df6b2f6cbcda100f437cea69b5ed Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Fri, 16 Dec 2022 09:58:52 +0800
Subject: [PATCH 09/15] Address reviewer's comments

---
 .../broker/loadbalance/extensions/BrokerRegistryImpl.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 0b171865f0d48..24d3c3a6a46b4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -141,8 +141,7 @@ public String getBrokerId() {
 
     @Override
     public CompletableFuture<List<String>> getAvailableBrokersAsync() {
-        return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH)
-                .thenCompose(listLocks -> CompletableFuture.completedFuture(Lists.newArrayList(listLocks)));
+        return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenApply(Lists::newArrayList);
     }
 
     @Override
@@ -156,7 +155,7 @@ public void listen(BiConsumer<String, NotificationType> listener) {
 
     @Override
     public void close() throws PulsarServerException {
-        if (!started.compareAndSet(true, false)) {
+        if (!started.get()) {
             return;
         }
         try {
@@ -164,6 +163,7 @@ public void close() throws PulsarServerException {
             brokerLookupDataLockManager.close();
             scheduler.shutdown();
             this.listeners.clear();
+            started.set(false);
         } catch (Exception ex) {
             if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
                 throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex));

From 1d20a5fad0deb6fbca8b5695a37c35aec3a9f9d5 Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Thu, 22 Dec 2022 09:39:03 +0800
Subject: [PATCH 10/15] Add new available broker lookup data method

---
 .../loadbalance/extensions/BrokerRegistry.java  |  8 ++++++++
 .../extensions/BrokerRegistryImpl.java          | 15 +++++++++++++++
 .../extensions/BrokerRegistryTest.java          | 17 +++++++++++++++--
 3 files changed, 38 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
index c534c583521f5..36622a9c73d7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.loadbalance.extensions;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
@@ -76,6 +77,13 @@ public interface BrokerRegistry extends AutoCloseable {
      */
     CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker);
 
+    /**
+     * Get the map of brokerId->brokerLookupData.
+     *
+     * @return Map of broker lookup data.
+     */
+    CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync();
+
     /**
      * Listen the broker register change.
      *
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 24d3c3a6a46b4..25ff4c9fb891f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -22,9 +22,11 @@
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -37,6 +39,7 @@
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
@@ -149,6 +152,18 @@ public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker)
         return brokerLookupDataLockManager.readLock(keyPath(broker));
     }
 
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
+        return this.getAvailableBrokersAsync().thenCompose(availableBrokers -> {
+            Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String brokerId : availableBrokers) {
+                futures.add(this.lookupAsync(brokerId).thenAccept(lookupDataOpt ->
+                        lookupDataOpt.ifPresent(lookupData -> map.put(brokerId, lookupData))));
+            }
+            return FutureUtil.waitForAll(futures).thenCompose(__ -> CompletableFuture.completedFuture(map));
+        });
+    }
+
     public void listen(BiConsumer<String, NotificationType> listener) {
         this.listeners.add(listener);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 13b846a4f5eba..13f5c3ee80fb3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -243,12 +243,25 @@ public void testRegisterAndLookup() throws Exception {
                 .untilAsserted(() -> assertEquals(brokerIds.size(), 3));
 
         assertEquals(brokerIds, new HashSet<>(brokerRegistry1.getAvailableBrokersAsync().get()));
-        assertEquals(brokerIds, new HashSet<>(brokerRegistry1.getAvailableBrokersAsync().get()));
-        assertEquals(brokerIds, new HashSet<>(brokerRegistry1.getAvailableBrokersAsync().get()));
+        assertEquals(brokerIds, new HashSet<>(brokerRegistry2.getAvailableBrokersAsync().get()));
+        assertEquals(brokerIds, new HashSet<>(brokerRegistry3.getAvailableBrokersAsync().get()));
+        assertEquals(brokerIds, brokerRegistry1.getAvailableBrokerLookupDataAsync().get().keySet());
+        assertEquals(brokerIds, brokerRegistry2.getAvailableBrokerLookupDataAsync().get().keySet());
+        assertEquals(brokerIds, brokerRegistry3.getAvailableBrokerLookupDataAsync().get().keySet());
 
         Optional<BrokerLookupData> lookupDataOpt =
                 brokerRegistry1.lookupAsync(brokerRegistry2.getBrokerId()).get();
         assertTrue(lookupDataOpt.isPresent());
+        assertEquals(lookupDataOpt.get().getWebServiceUrl(), pulsar2.getSafeWebServiceAddress());
+        assertEquals(lookupDataOpt.get().getWebServiceUrlTls(), pulsar2.getWebServiceAddressTls());
+        assertEquals(lookupDataOpt.get().getPulsarServiceUrl(), pulsar2.getBrokerServiceUrl());
+        assertEquals(lookupDataOpt.get().getPulsarServiceUrlTls(), pulsar2.getBrokerServiceUrlTls());
+        assertEquals(lookupDataOpt.get().advertisedListeners(), pulsar2.getAdvertisedListeners());
+        assertEquals(lookupDataOpt.get().protocols(), pulsar2.getProtocolDataToAdvertise());
+        assertEquals(lookupDataOpt.get().persistentTopicsEnabled(), pulsar2.getConfiguration()
+                .isEnablePersistentTopics());
+        assertEquals(lookupDataOpt.get().nonPersistentTopicsEnabled(), pulsar2.getConfiguration()
+                .isEnableNonPersistentTopics());
         assertEquals(lookupDataOpt.get().brokerVersion(), pulsar2.getBrokerVersion());
 
         // Unregister and see the available brokers.

From 99ba81b096085d571a8962affe3c325d7a18c53e Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Fri, 30 Dec 2022 09:51:04 +0800
Subject: [PATCH 11/15] Add log to warn empty lookup data

---
 .../loadbalance/extensions/BrokerRegistryImpl.java    | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 25ff4c9fb891f..88dc5a171e1b9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -157,10 +157,15 @@ public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookup
             Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
             List<CompletableFuture<Void>> futures = new ArrayList<>();
             for (String brokerId : availableBrokers) {
-                futures.add(this.lookupAsync(brokerId).thenAccept(lookupDataOpt ->
-                        lookupDataOpt.ifPresent(lookupData -> map.put(brokerId, lookupData))));
+                futures.add(this.lookupAsync(brokerId).thenAccept(lookupDataOpt -> {
+                    if (lookupDataOpt.isPresent()) {
+                        map.put(brokerId, lookupDataOpt.get());
+                    } else {
+                        log.warn("Got an empty lookup data, brokerId: {}", brokerId);
+                    }
+                }));
             }
-            return FutureUtil.waitForAll(futures).thenCompose(__ -> CompletableFuture.completedFuture(map));
+            return FutureUtil.waitForAll(futures).thenApply(__ -> map);
         });
     }
 

From 0fa339201651e08b651d3031d4a8256fe12b0445 Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Tue, 3 Jan 2023 16:11:30 +0800
Subject: [PATCH 12/15] Use state field to store states.

---
 .../extensions/BrokerRegistryImpl.java        | 55 ++++++++-----
 .../extensions/BrokerRegistryTest.java        | 77 +++++++++++++++++++
 2 files changed, 113 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 88dc5a171e1b9..80389073f8108 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -32,7 +32,6 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -70,9 +69,14 @@ public class BrokerRegistryImpl implements BrokerRegistry {
 
     private volatile ResourceLock<BrokerLookupData> brokerLookupDataLock;
 
-    private volatile boolean registered = false;
+    protected enum State {
+        Init,
+        Started,
+        Registered,
+        Closed
+    }
 
-    private final AtomicBoolean started = new AtomicBoolean(false);
+    private State state;
 
     public BrokerRegistryImpl(PulsarService pulsar) {
         this.pulsar = pulsar;
@@ -91,17 +95,18 @@ public BrokerRegistryImpl(PulsarService pulsar) {
                 pulsar.getConfiguration().isEnablePersistentTopics(),
                 pulsar.getConfiguration().isEnableNonPersistentTopics(),
                 pulsar.getBrokerVersion());
+        this.state = State.Init;
     }
 
     @Override
-    public void start() throws PulsarServerException {
-        if (started.get()) {
+    public synchronized void start() throws PulsarServerException {
+        if (this.state != State.Init) {
             return;
         }
         pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
         try {
+            this.state = State.Started;
             this.register();
-            this.started.set(true);
         } catch (MetadataStoreException e) {
             throw new PulsarServerException(e);
         }
@@ -109,16 +114,16 @@ public void start() throws PulsarServerException {
 
     @Override
     public boolean isStarted() {
-        return this.started.get();
+        return this.state == State.Started || this.state == State.Registered;
     }
 
     @Override
     public synchronized void register() throws MetadataStoreException {
-        if (!registered) {
+        if (this.state == State.Started) {
             try {
                 this.brokerLookupDataLock = brokerLookupDataLockManager.acquireLock(keyPath(brokerId), brokerLookupData)
                         .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
-                registered = true;
+                this.state = State.Registered;
             } catch (InterruptedException | ExecutionException | TimeoutException e) {
                 throw MetadataStoreException.unwrap(e);
             }
@@ -127,10 +132,11 @@ public synchronized void register() throws MetadataStoreException {
 
     @Override
     public synchronized void unregister() throws MetadataStoreException {
-        if (registered) {
+        if (this.state == State.Registered) {
             try {
-                brokerLookupDataLock.release().get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
-                registered = false;
+                this.brokerLookupDataLock.release()
+                        .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+                this.state = State.Started;
             } catch (CompletionException | InterruptedException | ExecutionException | TimeoutException e) {
                 throw MetadataStoreException.unwrap(e);
             }
@@ -144,15 +150,24 @@ public String getBrokerId() {
 
     @Override
     public CompletableFuture<List<String>> getAvailableBrokersAsync() {
+        if (!this.isStarted()) {
+            return FutureUtil.failedFuture(new IllegalStateException("The registry already closed."));
+        }
         return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenApply(Lists::newArrayList);
     }
 
     @Override
     public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker) {
+        if (!this.isStarted()) {
+            return FutureUtil.failedFuture(new IllegalStateException("The registry already closed."));
+        }
         return brokerLookupDataLockManager.readLock(keyPath(broker));
     }
 
     public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
+        if (!this.isStarted()) {
+            return FutureUtil.failedFuture(new IllegalStateException("The registry already closed."));
+        }
         return this.getAvailableBrokersAsync().thenCompose(availableBrokers -> {
             Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
             List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -169,21 +184,23 @@ public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookup
         });
     }
 
-    public void listen(BiConsumer<String, NotificationType> listener) {
+    public synchronized void listen(BiConsumer<String, NotificationType> listener) {
+        if (this.state == State.Closed) {
+            throw new IllegalStateException("The registry already closed.");
+        }
         this.listeners.add(listener);
     }
 
     @Override
-    public void close() throws PulsarServerException {
-        if (!started.get()) {
+    public synchronized void close() throws PulsarServerException {
+        if (this.state == State.Closed) {
             return;
         }
         try {
             this.unregister();
-            brokerLookupDataLockManager.close();
-            scheduler.shutdown();
+            this.brokerLookupDataLockManager.close();
             this.listeners.clear();
-            started.set(false);
+            this.state = State.Closed;
         } catch (Exception ex) {
             if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
                 throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex));
@@ -194,7 +211,7 @@ public void close() throws PulsarServerException {
     }
 
     private void handleMetadataStoreNotification(Notification t) {
-        if (!isVerifiedNotification(t)) {
+        if (!this.isStarted() || !isVerifiedNotification(t)) {
             return;
         }
         try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 13f5c3ee80fb3..8a836294bd333 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -36,6 +36,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -46,11 +47,13 @@
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.awaitility.Awaitility;
+import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -291,6 +294,76 @@ public void testRegisterFailWithSameBrokerId() throws Exception {
         }
     }
 
+    @Test
+    public void testCloseRegister() throws Exception {
+        PulsarService pulsar1 = createPulsarService();
+        pulsar1.start();
+        BrokerRegistryImpl brokerRegistry = createBrokerRegistryImpl(pulsar1);
+        assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Init);
+
+        // Check state after stated.
+        brokerRegistry.start();
+        assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Registered);
+
+        // Add a listener
+        brokerRegistry.listen((brokerId, type) -> {
+            // Ignore.
+        });
+        assertTrue(brokerRegistry.isStarted());
+        List<BiConsumer<String, NotificationType>> listeners =
+                WhiteboxImpl.getInternalState(brokerRegistry, "listeners");
+        assertFalse(listeners.isEmpty());
+
+        // Check state after unregister.
+        brokerRegistry.unregister();
+        assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Started);
+
+        // Check state after re-register.
+        brokerRegistry.register();
+        assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Registered);
+
+        // Check state after close.
+        brokerRegistry.close();
+        assertFalse(brokerRegistry.isStarted());
+        assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Closed);
+        listeners = WhiteboxImpl.getInternalState(brokerRegistry, "listeners");
+        assertTrue(listeners.isEmpty());
+
+        try {
+            brokerRegistry.getAvailableBrokersAsync().get();
+            fail();
+        } catch (Exception ex) {
+            log.info("Failed to getAvailableBrokersAsync.", ex);
+            assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException);
+        }
+
+        try {
+            brokerRegistry.getAvailableBrokerLookupDataAsync().get();
+            fail();
+        } catch (Exception ex) {
+            log.info("Failed to getAvailableBrokerLookupDataAsync.", ex);
+            assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException);
+        }
+
+        try {
+            brokerRegistry.lookupAsync("test").get();
+            fail();
+        } catch (Exception ex) {
+            log.info("Failed to lookupAsync.", ex);
+            assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException);
+        }
+
+        try {
+            brokerRegistry.listen((brokerId, type) -> {
+                // Ignore.
+            });
+            fail();
+        } catch (Exception ex) {
+            log.info("Failed to lookupAsync.", ex);
+            assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException);
+        }
+    }
+
     @Test
     public void testIsVerifiedNotification() {
         assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created, "/")));
@@ -309,5 +382,9 @@ public void testKeyPath() {
         String keyPath = BrokerRegistryImpl.keyPath("brokerId");
         assertEquals(keyPath, BrokerRegistryImpl.LOOKUP_DATA_PATH + "/brokerId");
     }
+
+    public BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) {
+        return WhiteboxImpl.getInternalState(brokerRegistry, BrokerRegistryImpl.State.class);
+    }
 }
 

From d4c2326932cb99e5fb83478b7dae89778ca00ea9 Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Wed, 4 Jan 2023 18:58:50 +0800
Subject: [PATCH 13/15] Use try finally when close

---
 .../broker/loadbalance/extensions/BrokerRegistryImpl.java    | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 80389073f8108..d2294897f614b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -197,16 +197,17 @@ public synchronized void close() throws PulsarServerException {
             return;
         }
         try {
+            this.listeners.clear();
             this.unregister();
             this.brokerLookupDataLockManager.close();
-            this.listeners.clear();
-            this.state = State.Closed;
         } catch (Exception ex) {
             if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
                 throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex));
             } else {
                 throw new PulsarServerException(MetadataStoreException.unwrap(ex));
             }
+        } finally {
+            this.state = State.Closed;
         }
     }
 

From fdb9ebdafb6e57101c0f07208469d396cb4550ab Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Wed, 1 Feb 2023 11:21:15 +0800
Subject: [PATCH 14/15] Merge master into current branch

---
 .../broker/loadbalance/extensions/BrokerRegistryTest.java    | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 8a836294bd333..231fa0f8f7ccc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -143,6 +143,11 @@ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
             return null;
         }
 
+        @Override
+        public String setNamespaceBundleAffinity(String bundle, String broker) {
+            return null;
+        }
+
         @Override
         public void stop() throws PulsarServerException {
             // No-op

From 9c6d71561f8d842473cd306e33484ea9737d313d Mon Sep 17 00:00:00 2001
From: Demogorgon314 <kwang@streamnative.io>
Date: Wed, 1 Feb 2023 14:16:04 +0800
Subject: [PATCH 15/15] Address review comments

---
 .../extensions/BrokerRegistry.java            |  6 ++---
 .../extensions/BrokerRegistryImpl.java        | 24 +++++++++----------
 .../extensions/BrokerRegistryTest.java        |  6 ++---
 3 files changed, 17 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
index 36622a9c73d7f..8133d4c482752 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
@@ -85,12 +85,12 @@ public interface BrokerRegistry extends AutoCloseable {
     CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync();
 
     /**
-     * Listen the broker register change.
+     * Add listener to listen the broker register change.
      *
-     * @param listener Key is lookup service address{@link BrokerRegistry#getBrokerId()}
+     * @param listener Key is broker Id{@link BrokerRegistry#getBrokerId()}
      *                 Value is notification type.
      */
-    void listen(BiConsumer<String, NotificationType> listener);
+    void addListener(BiConsumer<String, NotificationType> listener);
 
     /**
      * Close the broker registry.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index d2294897f614b..de0d361316d8d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -150,24 +150,18 @@ public String getBrokerId() {
 
     @Override
     public CompletableFuture<List<String>> getAvailableBrokersAsync() {
-        if (!this.isStarted()) {
-            return FutureUtil.failedFuture(new IllegalStateException("The registry already closed."));
-        }
+        this.checkState();
         return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenApply(Lists::newArrayList);
     }
 
     @Override
     public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker) {
-        if (!this.isStarted()) {
-            return FutureUtil.failedFuture(new IllegalStateException("The registry already closed."));
-        }
+        this.checkState();
         return brokerLookupDataLockManager.readLock(keyPath(broker));
     }
 
     public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
-        if (!this.isStarted()) {
-            return FutureUtil.failedFuture(new IllegalStateException("The registry already closed."));
-        }
+        this.checkState();
         return this.getAvailableBrokersAsync().thenCompose(availableBrokers -> {
             Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
             List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -184,10 +178,8 @@ public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookup
         });
     }
 
-    public synchronized void listen(BiConsumer<String, NotificationType> listener) {
-        if (this.state == State.Closed) {
-            throw new IllegalStateException("The registry already closed.");
-        }
+    public synchronized void addListener(BiConsumer<String, NotificationType> listener) {
+        this.checkState();
         this.listeners.add(listener);
     }
 
@@ -242,4 +234,10 @@ protected static boolean isVerifiedNotification(Notification t) {
     protected static String keyPath(String brokerId) {
         return String.format("%s/%s", LOOKUP_DATA_PATH, brokerId);
     }
+
+    private void checkState() throws IllegalStateException {
+        if (this.state == State.Closed) {
+            throw new IllegalStateException("The registry already closed.");
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 231fa0f8f7ccc..23cd1257f6f09 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -231,7 +231,7 @@ public void testRegisterAndLookup() throws Exception {
         BrokerRegistryImpl brokerRegistry3 = createBrokerRegistryImpl(pulsar3);
 
         Set<String> brokerIds = new HashSet<>();
-        brokerRegistry1.listen((brokerId, type) -> {
+        brokerRegistry1.addListener((brokerId, type) -> {
             brokerIds.add(brokerId);
         });
 
@@ -311,7 +311,7 @@ public void testCloseRegister() throws Exception {
         assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Registered);
 
         // Add a listener
-        brokerRegistry.listen((brokerId, type) -> {
+        brokerRegistry.addListener((brokerId, type) -> {
             // Ignore.
         });
         assertTrue(brokerRegistry.isStarted());
@@ -359,7 +359,7 @@ public void testCloseRegister() throws Exception {
         }
 
         try {
-            brokerRegistry.listen((brokerId, type) -> {
+            brokerRegistry.addListener((brokerId, type) -> {
                 // Ignore.
             });
             fail();