From 110af17b8127c5535970f4a9e06b82446b65afe9 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Thu, 11 Apr 2024 23:15:41 +0800 Subject: [PATCH 1/5] Check the broker is available for sla monitor bundle --- .../extensions/ExtensibleLoadManagerImpl.java | 46 +++++++++++-------- .../ExtensibleLoadManagerImplTest.java | 15 ++++++ 2 files changed, 43 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 3210578d8290a..bf7109283ba5d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -496,30 +496,39 @@ public CompletableFuture> assign(Optional { + if (candidateBrokerId != null) { + return CompletableFuture.completedFuture(Optional.of(candidateBrokerId)); + } + return getOrSelectOwnerAsync(serviceUnit, bundle).thenApply(Optional::ofNullable); + }); } return getBrokerLookupData(owner, bundle); }); } - private String getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) { + private CompletableFuture getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) { // Check if this is Heartbeat or SLAMonitor namespace String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit); - if (candidateBroker == null) { - candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit); + if (candidateBroker != null) { + return CompletableFuture.completedFuture(candidateBroker); } - if (candidateBroker == null) { - candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit); + candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit); + if (candidateBroker != null) { + return CompletableFuture.completedFuture(candidateBroker); } + candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit); if (candidateBroker != null) { - return candidateBroker.substring(candidateBroker.lastIndexOf('/') + 1); + // Check if the broker is available + final String finalCandidateBroker = candidateBroker; + return brokerRegistry.getAvailableBrokersAsync().thenApply(brokers -> { + if (brokers.contains(finalCandidateBroker)) { + return finalCandidateBroker; + } + return null; + }); } - return candidateBroker; + return CompletableFuture.completedFuture(null); } private CompletableFuture getOrSelectOwnerAsync(ServiceUnitId serviceUnit, @@ -666,11 +675,12 @@ public CompletableFuture> getOwnershipAsync(Optional { + if (candidateBroker != null) { + return CompletableFuture.completedFuture(Optional.of(candidateBroker)); + } + return serviceUnitStateChannel.getOwnerAsync(bundle); + }); } public CompletableFuture> getOwnershipWithLookupDataAsync(ServiceUnitId bundleUnit) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index aee57f9d26093..65ac0df6750c2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -50,6 +50,7 @@ import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -998,6 +999,20 @@ public void testDeployAndRollbackLoadManager() throws Exception { pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); } } + // Check if the broker is available + var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get(); + var loadManager4 = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(wrapper, "loadManager", true)); + loadManager4.getBrokerRegistry().unregister(); + + NamespaceName slaMonitorNamespace = + getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration()); + String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test"); + String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(result); + assertNotEquals(result, pulsar4.getBrokerServiceUrl()); + + loadManager4.getBrokerRegistry().register(); } } } From 7ba78cb8a18100809a2959923211fd2defc6bfb9 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 12 Apr 2024 09:30:53 +0800 Subject: [PATCH 2/5] Use lookup async and check broker re-register --- .../extensions/ExtensibleLoadManagerImpl.java | 11 +++++++---- .../channel/ServiceUnitStateChannelImpl.java | 1 + .../extensions/ExtensibleLoadManagerImplTest.java | 4 ++++ 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index bf7109283ba5d..b916aef7d42d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -521,11 +521,14 @@ private CompletableFuture getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId if (candidateBroker != null) { // Check if the broker is available final String finalCandidateBroker = candidateBroker; - return brokerRegistry.getAvailableBrokersAsync().thenApply(brokers -> { - if (brokers.contains(finalCandidateBroker)) { - return finalCandidateBroker; + return brokerRegistry.lookupAsync(candidateBroker).thenApply(brokerLookupData -> { + if (brokerLookupData.isEmpty()) { + if (debug(conf, log)) { + log.info("The SLA Monitor broker {} is not available.", finalCandidateBroker); + } + return null; } - return null; + return finalCandidateBroker; }); } return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 71ddb3acb28b7..c1a600afd5540 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1224,6 +1224,7 @@ private void handleBrokerCreationEvent(String broker) { broker, cleanupJobs.size()); } } + // TODO: Unload the current SLA Monitor bundle own by other broker. } private void handleBrokerDeletionEvent(String broker) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 65ac0df6750c2..7ecbfffc09b06 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1013,6 +1013,10 @@ public void testDeployAndRollbackLoadManager() throws Exception { assertNotEquals(result, pulsar4.getBrokerServiceUrl()); loadManager4.getBrokerRegistry().register(); + + result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(result); + assertEquals(result, pulsar4.getBrokerServiceUrl()); } } } From d778185367fc8b45e5880ada572707bd587ad0a8 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 12 Apr 2024 10:03:54 +0800 Subject: [PATCH 3/5] Add more test --- .../channel/ServiceUnitStateChannelImpl.java | 1 - .../ExtensibleLoadManagerImplTest.java | 25 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index c1a600afd5540..71ddb3acb28b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1224,7 +1224,6 @@ private void handleBrokerCreationEvent(String broker) { broker, cleanupJobs.size()); } } - // TODO: Unload the current SLA Monitor bundle own by other broker. } private void handleBrokerDeletionEvent(String broker) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 7ecbfffc09b06..f745759f04dd8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -114,6 +114,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; @@ -130,6 +131,7 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; @@ -1010,13 +1012,36 @@ public void testDeployAndRollbackLoadManager() throws Exception { String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test"); String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); assertNotNull(result); + log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); assertNotEquals(result, pulsar4.getBrokerServiceUrl()); + Producer producer = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); + producer.send("t1"); + + // Test re-register broker and check the lookup result loadManager4.getBrokerRegistry().register(); result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); assertNotNull(result); + log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); assertEquals(result, pulsar4.getBrokerServiceUrl()); + + producer.send("t2"); + Producer producer1 = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); + producer1.send("t3"); + + producer.close(); + producer1.close(); + @Cleanup + Consumer consumer = pulsar.getClient().newConsumer(Schema.STRING) + .topic(slaMonitorTopic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test") + .subscribe(); + // receive message t1 t2 t3 + assertEquals(consumer.receive().getValue(), "t1"); + assertEquals(consumer.receive().getValue(), "t2"); + assertEquals(consumer.receive().getValue(), "t3"); } } } From fcd8c57aa6463382b3ae0408fb48a1a0f8900bc8 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 12 Apr 2024 11:52:33 +0800 Subject: [PATCH 4/5] Fix code style --- .../loadbalance/extensions/ExtensibleLoadManagerImplTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index f745759f04dd8..79ad248c7d1f2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -131,7 +131,6 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; -import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; From 9d8533870f7130eeda2da7482b0c331f00fca67a Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Sat, 13 Apr 2024 13:31:28 +0800 Subject: [PATCH 5/5] Refactor getHeartbeatOrSLAMonitorBrokerId --- .../extensions/ExtensibleLoadManagerImpl.java | 26 +--------- .../broker/namespace/NamespaceService.java | 47 ++++++++++++++----- 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index b916aef7d42d1..aa40200281c78 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -508,30 +508,8 @@ public CompletableFuture> assign(Optional getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) { - // Check if this is Heartbeat or SLAMonitor namespace - String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit); - if (candidateBroker != null) { - return CompletableFuture.completedFuture(candidateBroker); - } - candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit); - if (candidateBroker != null) { - return CompletableFuture.completedFuture(candidateBroker); - } - candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit); - if (candidateBroker != null) { - // Check if the broker is available - final String finalCandidateBroker = candidateBroker; - return brokerRegistry.lookupAsync(candidateBroker).thenApply(brokerLookupData -> { - if (brokerLookupData.isEmpty()) { - if (debug(conf, log)) { - log.info("The SLA Monitor broker {} is not available.", finalCandidateBroker); - } - return null; - } - return finalCandidateBroker; - }); - } - return CompletableFuture.completedFuture(null); + return pulsar.getNamespaceService().getHeartbeatOrSLAMonitorBrokerId(serviceUnit, + cb -> brokerRegistry.lookupAsync(cb).thenApply(Optional::isPresent)); } private CompletableFuture getOrSelectOwnerAsync(ServiceUnitId serviceUnit, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index dec8b098dddac..a38e3c3c6bb86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -46,6 +46,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -536,6 +537,38 @@ private CompletableFuture> findBrokerServiceUrl( }); } + /** + * Check if this is Heartbeat or SLAMonitor namespace and return the broker id. + * + * @param serviceUnit the service unit + * @param isBrokerActive the function to check if the broker is active + * @return the broker id + */ + public CompletableFuture getHeartbeatOrSLAMonitorBrokerId( + ServiceUnitId serviceUnit, Function> isBrokerActive) { + String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit); + if (candidateBroker != null) { + return CompletableFuture.completedFuture(candidateBroker); + } + candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit); + if (candidateBroker != null) { + return CompletableFuture.completedFuture(candidateBroker); + } + candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit); + if (candidateBroker != null) { + // Check if the broker is available + final String finalCandidateBroker = candidateBroker; + return isBrokerActive.apply(candidateBroker).thenApply(isActive -> { + if (isActive) { + return finalCandidateBroker; + } else { + return null; + } + }); + } + return CompletableFuture.completedFuture(null); + } + private void searchForCandidateBroker(NamespaceBundle bundle, CompletableFuture> lookupFuture, LookupOptions options) { @@ -563,17 +596,9 @@ private void searchForCandidateBroker(NamespaceBundle bundle, try { // check if this is Heartbeat or SLAMonitor namespace - candidateBroker = checkHeartbeatNamespace(bundle); - if (candidateBroker == null) { - candidateBroker = checkHeartbeatNamespaceV2(bundle); - } - if (candidateBroker == null) { - String broker = getSLAMonitorBrokerName(bundle); - // checking if the broker is up and running - if (broker != null && isBrokerActive(broker)) { - candidateBroker = broker; - } - } + candidateBroker = getHeartbeatOrSLAMonitorBrokerId(bundle, cb -> + CompletableFuture.completedFuture(isBrokerActive(cb))) + .get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS); if (candidateBroker == null) { Optional currentLeader = pulsar.getLeaderElectionService().getCurrentLeader();