diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 31b7fa391f245..5236efacb4dc2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -401,6 +401,10 @@ public void start() throws PulsarServerException { // Start load management service (even if load balancing is disabled) this.loadManager.set(LoadManager.create(this)); + // needs load management service and before start broker service, + this.startNamespaceService(); + schemaRegistryService = SchemaRegistryService.create(this); + this.defaultOffloader = createManagedLedgerOffloader( OffloadPolicies.create(this.getConfiguration().getProperties())); @@ -458,8 +462,6 @@ public Boolean get() { } this.webService.addStaticResources("/static", "/static"); - schemaRegistryService = SchemaRegistryService.create(this); - webService.start(); // Refresh addresses, since the port might have been dynamically assigned @@ -474,8 +476,8 @@ public Boolean get() { this.webSocketService.setLocalCluster(clusterData); } - // needs load management service - this.startNamespaceService(); + // Initialize namespace service, after service url assigned. Should init zk and refresh self owner info. + this.nsService.initialize(); // Start the leader election service startLeaderElectionService(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index d4e90be705d99..039ea6b3088f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -73,7 +73,6 @@ import java.net.URI; import java.net.URL; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -158,13 +157,19 @@ public NamespaceService(PulsarService pulsar) { host = pulsar.getAdvertisedAddress(); this.config = pulsar.getConfiguration(); this.loadManager = pulsar.getLoadManager(); - ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getSafeBrokerServiceUrl()); this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); this.ownershipCache = new OwnershipCache(pulsar, bundleFactory, this); this.namespaceClients = new ConcurrentOpenHashMap<>(); this.bundleOwnershipListeners = new CopyOnWriteArrayList<>(); } + public void initialize() { + ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getSafeBrokerServiceUrl()); + if (!getOwnershipCache().refreshSelfOwnerInfo()) { + throw new RuntimeException("Failed to refresh self owner info."); + } + } + public CompletableFuture> getBrokerServiceUrlAsync(TopicName topic, boolean authoritative) { return getBundleAsync(topic) @@ -424,7 +429,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle, try { checkNotNull(candidateBroker); - if (pulsar.getSafeWebServiceAddress().equals(candidateBroker)) { + if (candidateBroker.equals(pulsar.getSafeWebServiceAddress())) { // invalidate namespace policies and try to load latest policies to avoid data-discrepancy if broker // doesn't receive watch on policies changes final String policyPath = AdminResource.path(POLICIES, bundle.getNamespaceObject().toString()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 6b6ee36242dfc..50e96fadf51fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -73,7 +73,7 @@ public class OwnershipCache { /** * The NamespaceEphemeralData objects that can be associated with the current owner */ - private final NamespaceEphemeralData selfOwnerInfo; + private NamespaceEphemeralData selfOwnerInfo; /** * The NamespaceEphemeralData objects that can be associated with the current owner, when the broker is disabled. @@ -111,6 +111,8 @@ public class OwnershipCache { */ private NamespaceService namespaceService; + private final PulsarService pulsar; + private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader { @SuppressWarnings("deprecation") @@ -156,6 +158,7 @@ public CompletableFuture asyncLoad(String namespaceBundleZNode, Exe */ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory, NamespaceService namespaceService) { this.namespaceService = namespaceService; + this.pulsar = pulsar; this.ownerBrokerUrl = pulsar.getSafeBrokerServiceUrl(); this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls(); this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls, @@ -211,6 +214,11 @@ public CompletableFuture tryAcquiringOwnership(Namespace CompletableFuture future = new CompletableFuture<>(); + if (!refreshSelfOwnerInfo()) { + future.completeExceptionally(new RuntimeException("Namespace service does not ready for acquiring ownership")); + return future; + } + LOG.info("Trying to acquire ownership of {}", bundle); // Doing a get() on the ownedBundlesCache will trigger an async ZK write to acquire the lock over the @@ -367,4 +375,12 @@ public void updateBundleState(NamespaceBundle bundle, boolean isActive) throws E public NamespaceEphemeralData getSelfOwnerInfo() { return selfOwnerInfo; } + + public synchronized boolean refreshSelfOwnerInfo() { + if (selfOwnerInfo.getNativeUrl() == null) { + this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), + pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(), false); + } + return selfOwnerInfo.getNativeUrl() != null; + } }