diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java index 94ac87f7cf704..8133d4c482752 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.loadbalance.extensions; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; @@ -38,6 +39,11 @@ public interface BrokerRegistry extends AutoCloseable { */ void start() throws PulsarServerException; + /** + * Return the broker registry is started. + */ + boolean isStarted(); + /** * Register local broker to metadata store. */ @@ -51,11 +57,11 @@ public interface BrokerRegistry extends AutoCloseable { void unregister() throws MetadataStoreException; /** - * Get the current broker lookup service address. + * Get the current broker ID. * * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port */ - String getLookupServiceAddress(); + String getBrokerId(); /** * Async get available brokers. @@ -72,18 +78,19 @@ public interface BrokerRegistry extends AutoCloseable { CompletableFuture> lookupAsync(String broker); /** - * For each the broker lookup data. - * The key is lookupServiceAddress{@link BrokerRegistry#getLookupServiceAddress()} + * Get the map of brokerId->brokerLookupData. + * + * @return Map of broker lookup data. */ - void forEach(BiConsumer action); + CompletableFuture> getAvailableBrokerLookupDataAsync(); /** - * Listen the broker register change. + * Add listener to listen the broker register change. * - * @param listener Key is lookup service address{@link BrokerRegistry#getLookupServiceAddress()} + * @param listener Key is broker Id{@link BrokerRegistry#getBrokerId()} * Value is notification type. */ - void listen(BiConsumer listener); + void addListener(BiConsumer listener); /** * Close the broker registry. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java new file mode 100644 index 0000000000000..de0d361316d8d --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; +import org.apache.pulsar.metadata.api.coordination.LockManager; +import org.apache.pulsar.metadata.api.coordination.ResourceLock; + +/** + * The broker registry impl, base on the LockManager. + */ +@Slf4j +public class BrokerRegistryImpl implements BrokerRegistry { + + protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers"; + + private final PulsarService pulsar; + + private final ServiceConfiguration conf; + + private final BrokerLookupData brokerLookupData; + + private final LockManager brokerLookupDataLockManager; + + private final String brokerId; + + private final ScheduledExecutorService scheduler; + + private final List> listeners; + + private volatile ResourceLock brokerLookupDataLock; + + protected enum State { + Init, + Started, + Registered, + Closed + } + + private State state; + + public BrokerRegistryImpl(PulsarService pulsar) { + this.pulsar = pulsar; + this.conf = pulsar.getConfiguration(); + this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class); + this.scheduler = pulsar.getLoadManagerExecutor(); + this.listeners = new ArrayList<>(); + this.brokerId = pulsar.getLookupServiceAddress(); + this.brokerLookupData = new BrokerLookupData( + pulsar.getSafeWebServiceAddress(), + pulsar.getWebServiceAddressTls(), + pulsar.getBrokerServiceUrl(), + pulsar.getBrokerServiceUrlTls(), + pulsar.getAdvertisedListeners(), + pulsar.getProtocolDataToAdvertise(), + pulsar.getConfiguration().isEnablePersistentTopics(), + pulsar.getConfiguration().isEnableNonPersistentTopics(), + pulsar.getBrokerVersion()); + this.state = State.Init; + } + + @Override + public synchronized void start() throws PulsarServerException { + if (this.state != State.Init) { + return; + } + pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); + try { + this.state = State.Started; + this.register(); + } catch (MetadataStoreException e) { + throw new PulsarServerException(e); + } + } + + @Override + public boolean isStarted() { + return this.state == State.Started || this.state == State.Registered; + } + + @Override + public synchronized void register() throws MetadataStoreException { + if (this.state == State.Started) { + try { + this.brokerLookupDataLock = brokerLookupDataLockManager.acquireLock(keyPath(brokerId), brokerLookupData) + .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + this.state = State.Registered; + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw MetadataStoreException.unwrap(e); + } + } + } + + @Override + public synchronized void unregister() throws MetadataStoreException { + if (this.state == State.Registered) { + try { + this.brokerLookupDataLock.release() + .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + this.state = State.Started; + } catch (CompletionException | InterruptedException | ExecutionException | TimeoutException e) { + throw MetadataStoreException.unwrap(e); + } + } + } + + @Override + public String getBrokerId() { + return this.brokerId; + } + + @Override + public CompletableFuture> getAvailableBrokersAsync() { + this.checkState(); + return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenApply(Lists::newArrayList); + } + + @Override + public CompletableFuture> lookupAsync(String broker) { + this.checkState(); + return brokerLookupDataLockManager.readLock(keyPath(broker)); + } + + public CompletableFuture> getAvailableBrokerLookupDataAsync() { + this.checkState(); + return this.getAvailableBrokersAsync().thenCompose(availableBrokers -> { + Map map = new ConcurrentHashMap<>(); + List> futures = new ArrayList<>(); + for (String brokerId : availableBrokers) { + futures.add(this.lookupAsync(brokerId).thenAccept(lookupDataOpt -> { + if (lookupDataOpt.isPresent()) { + map.put(brokerId, lookupDataOpt.get()); + } else { + log.warn("Got an empty lookup data, brokerId: {}", brokerId); + } + })); + } + return FutureUtil.waitForAll(futures).thenApply(__ -> map); + }); + } + + public synchronized void addListener(BiConsumer listener) { + this.checkState(); + this.listeners.add(listener); + } + + @Override + public synchronized void close() throws PulsarServerException { + if (this.state == State.Closed) { + return; + } + try { + this.listeners.clear(); + this.unregister(); + this.brokerLookupDataLockManager.close(); + } catch (Exception ex) { + if (ex.getCause() instanceof MetadataStoreException.NotFoundException) { + throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex)); + } else { + throw new PulsarServerException(MetadataStoreException.unwrap(ex)); + } + } finally { + this.state = State.Closed; + } + } + + private void handleMetadataStoreNotification(Notification t) { + if (!this.isStarted() || !isVerifiedNotification(t)) { + return; + } + try { + if (log.isDebugEnabled()) { + log.debug("Handle notification: [{}]", t); + } + if (listeners.isEmpty()) { + return; + } + this.scheduler.submit(() -> { + String brokerId = t.getPath().substring(LOOKUP_DATA_PATH.length() + 1); + for (BiConsumer listener : listeners) { + listener.accept(brokerId, t.getType()); + } + }); + } catch (RejectedExecutionException e) { + // Executor is shutting down + } + } + + @VisibleForTesting + protected static boolean isVerifiedNotification(Notification t) { + return t.getPath().startsWith(LOOKUP_DATA_PATH + "/") && t.getPath().length() > LOOKUP_DATA_PATH.length() + 1; + } + + @VisibleForTesting + protected static String keyPath(String brokerId) { + return String.format("%s/%s", LOOKUP_DATA_PATH, brokerId); + } + + private void checkState() throws IllegalStateException { + if (this.state == State.Closed) { + throw new IllegalStateException("The registry already closed."); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java new file mode 100644 index 0000000000000..23cd1257f6f09 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.loadbalance.ResourceUnit; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; +import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.awaitility.Awaitility; +import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * Unit test for {@link BrokerRegistry}. + */ +@Slf4j +@Test(groups = "broker") +public class BrokerRegistryTest { + + private final List pulsarServices = new CopyOnWriteArrayList<>(); + private final List brokerRegistries = new CopyOnWriteArrayList<>(); + + private ExecutorService executor; + + private LocalBookkeeperEnsemble bkEnsemble; + + + // Make sure the load manager don't register itself to `/loadbalance/brokers/{lookupServiceAddress}` + public static class MockLoadManager implements LoadManager { + + @Override + public void start() throws PulsarServerException { + // No-op + } + + @Override + public boolean isCentralized() { + return false; + } + + @Override + public Optional getLeastLoaded(ServiceUnitId su) throws Exception { + return Optional.empty(); + } + + @Override + public LoadManagerReport generateLoadReport() throws Exception { + return null; + } + + @Override + public void setLoadReportForceUpdateFlag() { + // No-op + } + + @Override + public void writeLoadReportOnZookeeper() throws Exception { + // No-op + } + + @Override + public void writeResourceQuotasToZooKeeper() throws Exception { + // No-op + } + + @Override + public List getLoadBalancingMetrics() { + return null; + } + + @Override + public void doLoadShedding() { + // No-op + } + + @Override + public void doNamespaceBundleSplit() throws Exception { + // No-op + } + + @Override + public void disableBroker() throws Exception { + // No-op + } + + @Override + public Set getAvailableBrokers() throws Exception { + return null; + } + + @Override + public CompletableFuture> getAvailableBrokersAsync() { + return null; + } + + @Override + public String setNamespaceBundleAffinity(String bundle, String broker) { + return null; + } + + @Override + public void stop() throws PulsarServerException { + // No-op + } + + @Override + public void initialize(PulsarService pulsar) { + // No-op + } + } + + @BeforeClass + void setup() throws Exception { + executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, + new LinkedBlockingQueue<>()); + // Start local bookkeeper ensemble + bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble.start(); + } + + @SneakyThrows + private PulsarService createPulsarService() { + ServiceConfiguration config = new ServiceConfiguration(); + config.setLoadBalancerEnabled(false); + config.setLoadManagerClassName(MockLoadManager.class.getName()); + config.setClusterName("use"); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); + config.setBrokerShutdownTimeoutMs(0L); + config.setBrokerServicePort(Optional.of(0)); + config.setAdvertisedAddress("localhost"); + PulsarService pulsar = spy(new PulsarService(config)); + pulsarServices.add(pulsar); + return pulsar; + } + + private BrokerRegistryImpl createBrokerRegistryImpl(PulsarService pulsar) { + BrokerRegistryImpl brokerRegistry = spy(new BrokerRegistryImpl(pulsar)); + brokerRegistries.add(brokerRegistry); + return brokerRegistry; + } + + @AfterClass(alwaysRun = true) + void shutdown() throws Exception { + executor.shutdownNow(); + bkEnsemble.stop(); + } + + @AfterMethod(alwaysRun = true) + void cleanUp() { + log.info("Cleaning up the broker registry..."); + brokerRegistries.forEach(brokerRegistry -> { + if (brokerRegistry.isStarted()) { + try { + brokerRegistry.close(); + } catch (PulsarServerException e) { + throw new RuntimeException(e); + } + } + }); + brokerRegistries.clear(); + log.info("Cleaning up the pulsar services..."); + pulsarServices.forEach(pulsarService -> { + if (pulsarService.isRunning()) { + pulsarService.shutdownNow(); + } + }); + pulsarServices.clear(); + } + + @Test(timeOut = 30 * 1000) + public void testRegisterAndLookup() throws Exception { + PulsarService pulsar1 = createPulsarService(); + PulsarService pulsar2 = createPulsarService(); + PulsarService pulsar3 = createPulsarService(); + pulsar1.start(); + pulsar2.start(); + pulsar3.start(); + BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1); + BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2); + BrokerRegistryImpl brokerRegistry3 = createBrokerRegistryImpl(pulsar3); + + Set brokerIds = new HashSet<>(); + brokerRegistry1.addListener((brokerId, type) -> { + brokerIds.add(brokerId); + }); + + brokerRegistry1.start(); + brokerRegistry2.start(); + + Awaitility.await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertEquals(brokerIds.size(), 2)); + + assertEquals(brokerRegistry1.getAvailableBrokersAsync().get().size(), 2); + assertEquals(brokerRegistry2.getAvailableBrokersAsync().get().size(), 2); + + // Check three broker cache are flush successes. + brokerRegistry3.start(); + assertEquals(brokerRegistry3.getAvailableBrokersAsync().get().size(), 3); + Awaitility.await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertEquals(brokerIds.size(), 3)); + + assertEquals(brokerIds, new HashSet<>(brokerRegistry1.getAvailableBrokersAsync().get())); + assertEquals(brokerIds, new HashSet<>(brokerRegistry2.getAvailableBrokersAsync().get())); + assertEquals(brokerIds, new HashSet<>(brokerRegistry3.getAvailableBrokersAsync().get())); + assertEquals(brokerIds, brokerRegistry1.getAvailableBrokerLookupDataAsync().get().keySet()); + assertEquals(brokerIds, brokerRegistry2.getAvailableBrokerLookupDataAsync().get().keySet()); + assertEquals(brokerIds, brokerRegistry3.getAvailableBrokerLookupDataAsync().get().keySet()); + + Optional lookupDataOpt = + brokerRegistry1.lookupAsync(brokerRegistry2.getBrokerId()).get(); + assertTrue(lookupDataOpt.isPresent()); + assertEquals(lookupDataOpt.get().getWebServiceUrl(), pulsar2.getSafeWebServiceAddress()); + assertEquals(lookupDataOpt.get().getWebServiceUrlTls(), pulsar2.getWebServiceAddressTls()); + assertEquals(lookupDataOpt.get().getPulsarServiceUrl(), pulsar2.getBrokerServiceUrl()); + assertEquals(lookupDataOpt.get().getPulsarServiceUrlTls(), pulsar2.getBrokerServiceUrlTls()); + assertEquals(lookupDataOpt.get().advertisedListeners(), pulsar2.getAdvertisedListeners()); + assertEquals(lookupDataOpt.get().protocols(), pulsar2.getProtocolDataToAdvertise()); + assertEquals(lookupDataOpt.get().persistentTopicsEnabled(), pulsar2.getConfiguration() + .isEnablePersistentTopics()); + assertEquals(lookupDataOpt.get().nonPersistentTopicsEnabled(), pulsar2.getConfiguration() + .isEnableNonPersistentTopics()); + assertEquals(lookupDataOpt.get().brokerVersion(), pulsar2.getBrokerVersion()); + + // Unregister and see the available brokers. + brokerRegistry1.unregister(); + assertEquals(brokerRegistry2.getAvailableBrokersAsync().get().size(), 2); + + } + + @Test + public void testRegisterFailWithSameBrokerId() throws Exception { + PulsarService pulsar1 = createPulsarService(); + PulsarService pulsar2 = createPulsarService(); + pulsar1.start(); + pulsar2.start(); + + doReturn(pulsar1.getLookupServiceAddress()).when(pulsar2).getLookupServiceAddress(); + BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1); + BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2); + brokerRegistry1.start(); + try { + brokerRegistry2.start(); + fail(); + } catch (Exception ex) { + log.info("Broker registry start failed.", ex); + assertTrue(ex instanceof PulsarServerException); + assertTrue(ex.getMessage().contains("LockBusyException")); + } + } + + @Test + public void testCloseRegister() throws Exception { + PulsarService pulsar1 = createPulsarService(); + pulsar1.start(); + BrokerRegistryImpl brokerRegistry = createBrokerRegistryImpl(pulsar1); + assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Init); + + // Check state after stated. + brokerRegistry.start(); + assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Registered); + + // Add a listener + brokerRegistry.addListener((brokerId, type) -> { + // Ignore. + }); + assertTrue(brokerRegistry.isStarted()); + List> listeners = + WhiteboxImpl.getInternalState(brokerRegistry, "listeners"); + assertFalse(listeners.isEmpty()); + + // Check state after unregister. + brokerRegistry.unregister(); + assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Started); + + // Check state after re-register. + brokerRegistry.register(); + assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Registered); + + // Check state after close. + brokerRegistry.close(); + assertFalse(brokerRegistry.isStarted()); + assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Closed); + listeners = WhiteboxImpl.getInternalState(brokerRegistry, "listeners"); + assertTrue(listeners.isEmpty()); + + try { + brokerRegistry.getAvailableBrokersAsync().get(); + fail(); + } catch (Exception ex) { + log.info("Failed to getAvailableBrokersAsync.", ex); + assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException); + } + + try { + brokerRegistry.getAvailableBrokerLookupDataAsync().get(); + fail(); + } catch (Exception ex) { + log.info("Failed to getAvailableBrokerLookupDataAsync.", ex); + assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException); + } + + try { + brokerRegistry.lookupAsync("test").get(); + fail(); + } catch (Exception ex) { + log.info("Failed to lookupAsync.", ex); + assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException); + } + + try { + brokerRegistry.addListener((brokerId, type) -> { + // Ignore. + }); + fail(); + } catch (Exception ex) { + log.info("Failed to lookupAsync.", ex); + assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException); + } + } + + @Test + public void testIsVerifiedNotification() { + assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created, "/"))); + assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created, + BrokerRegistryImpl.LOOKUP_DATA_PATH + "xyz"))); + assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created, + BrokerRegistryImpl.LOOKUP_DATA_PATH))); + assertTrue(BrokerRegistryImpl.isVerifiedNotification( + new Notification(NotificationType.Created, BrokerRegistryImpl.LOOKUP_DATA_PATH + "/brokerId"))); + assertTrue(BrokerRegistryImpl.isVerifiedNotification( + new Notification(NotificationType.Created, BrokerRegistryImpl.LOOKUP_DATA_PATH + "/brokerId/xyz"))); + } + + @Test + public void testKeyPath() { + String keyPath = BrokerRegistryImpl.keyPath("brokerId"); + assertEquals(keyPath, BrokerRegistryImpl.LOOKUP_DATA_PATH + "/brokerId"); + } + + public BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) { + return WhiteboxImpl.getInternalState(brokerRegistry, BrokerRegistryImpl.State.class); + } +} +