Skip to content

Commit

Permalink
[fix][broker] Check the broker is available for the SLA monitor bundl…
Browse files Browse the repository at this point in the history
…e when the ExtensibleLoadManager is enabled (apache#22485)

(cherry picked from commit d0b9d47)
(cherry picked from commit c1a8596)
  • Loading branch information
Demogorgon314 authored and heesung-sn committed Jun 26, 2024
1 parent 90494cf commit 825a0bd
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -461,30 +461,20 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
owner = serviceUnitStateChannel.getChannelOwnerAsync();
} else {
String candidateBrokerId = getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
if (candidateBrokerId != null) {
owner = CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
} else {
owner = getOrSelectOwnerAsync(serviceUnit, bundle).thenApply(Optional::ofNullable);
}
owner = getHeartbeatOrSLAMonitorBrokerId(serviceUnit).thenCompose(candidateBrokerId -> {
if (candidateBrokerId != null) {
return CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
}
return getOrSelectOwnerAsync(serviceUnit, bundle).thenApply(Optional::ofNullable);
});
}
return getBrokerLookupData(owner, bundle);
});
}

private String getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) {
// Check if this is Heartbeat or SLAMonitor namespace
String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit);
if (candidateBroker == null) {
candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit);
}
if (candidateBroker == null) {
candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit);
}
if (candidateBroker != null) {
return candidateBroker.substring(candidateBroker.lastIndexOf('/') + 1);
}
return candidateBroker;
private CompletableFuture<String> getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) {
return pulsar.getNamespaceService().getHeartbeatOrSLAMonitorBrokerId(serviceUnit,
cb -> brokerRegistry.lookupAsync(cb).thenApply(Optional::isPresent));
}

private CompletableFuture<String> getOrSelectOwnerAsync(ServiceUnitId serviceUnit,
Expand Down Expand Up @@ -631,11 +621,12 @@ public CompletableFuture<Optional<String>> getOwnershipAsync(Optional<ServiceUni
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
return serviceUnitStateChannel.getChannelOwnerAsync();
}
String candidateBroker = getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
if (candidateBroker != null) {
return CompletableFuture.completedFuture(Optional.of(candidateBroker));
}
return serviceUnitStateChannel.getOwnerAsync(bundle);
return getHeartbeatOrSLAMonitorBrokerId(serviceUnit).thenCompose(candidateBroker -> {
if (candidateBroker != null) {
return CompletableFuture.completedFuture(Optional.of(candidateBroker));
}
return serviceUnitStateChannel.getOwnerAsync(bundle);
});
}

public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataAsync(ServiceUnitId bundleUnit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -499,6 +500,38 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
});
}

/**
* Check if this is Heartbeat or SLAMonitor namespace and return the broker id.
*
* @param serviceUnit the service unit
* @param isBrokerActive the function to check if the broker is active
* @return the broker id
*/
public CompletableFuture<String> getHeartbeatOrSLAMonitorBrokerId(
ServiceUnitId serviceUnit, Function<String, CompletableFuture<Boolean>> isBrokerActive) {
String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit);
if (candidateBroker != null) {
return CompletableFuture.completedFuture(candidateBroker);
}
candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit);
if (candidateBroker != null) {
return CompletableFuture.completedFuture(candidateBroker);
}
candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit);
if (candidateBroker != null) {
// Check if the broker is available
final String finalCandidateBroker = candidateBroker;
return isBrokerActive.apply(candidateBroker).thenApply(isActive -> {
if (isActive) {
return finalCandidateBroker;
} else {
return null;
}
});
}
return CompletableFuture.completedFuture(null);
}

private void searchForCandidateBroker(NamespaceBundle bundle,
CompletableFuture<Optional<LookupResult>> lookupFuture,
LookupOptions options) {
Expand Down Expand Up @@ -526,17 +559,9 @@ private void searchForCandidateBroker(NamespaceBundle bundle,

try {
// check if this is Heartbeat or SLAMonitor namespace
candidateBroker = checkHeartbeatNamespace(bundle);
if (candidateBroker == null) {
candidateBroker = checkHeartbeatNamespaceV2(bundle);
}
if (candidateBroker == null) {
String broker = getSLAMonitorBrokerName(bundle);
// checking if the broker is up and running
if (broker != null && isBrokerActive(broker)) {
candidateBroker = broker;
}
}
candidateBroker = getHeartbeatOrSLAMonitorBrokerId(bundle, cb ->
CompletableFuture.completedFuture(isBrokerActive(cb)))
.get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS);

if (candidateBroker == null) {
Optional<LeaderBroker> currentLeader = pulsar.getLeaderElectionService().getCurrentLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand All @@ -71,6 +72,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -101,6 +103,10 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -663,6 +669,47 @@ public void testDeployAndRollbackLoadManager() throws Exception {
pulsar.getBrokerId(), pulsar.getBrokerServiceUrl());
}
}
// Check if the broker is available
var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get();
var loadManager4 = spy((ExtensibleLoadManagerImpl)
FieldUtils.readField(wrapper, "loadManager", true));
loadManager4.getBrokerRegistry().unregister();

NamespaceName slaMonitorNamespace =
getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration());
String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test");
String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
assertNotNull(result);
log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result);
assertNotEquals(result, pulsar4.getBrokerServiceUrl());

Producer<String> producer = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create();
producer.send("t1");

// Test re-register broker and check the lookup result
loadManager4.getBrokerRegistry().register();

result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
assertNotNull(result);
log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result);
assertEquals(result, pulsar4.getBrokerServiceUrl());

producer.send("t2");
Producer<String> producer1 = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create();
producer1.send("t3");

producer.close();
producer1.close();
@Cleanup
Consumer<String> consumer = pulsar.getClient().newConsumer(Schema.STRING)
.topic(slaMonitorTopic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("test")
.subscribe();
// receive message t1 t2 t3
assertEquals(consumer.receive().getValue(), "t1");
assertEquals(consumer.receive().getValue(), "t2");
assertEquals(consumer.receive().getValue(), "t3");
}
}
}
Expand Down

0 comments on commit 825a0bd

Please sign in to comment.