Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Check the broker is available for the SLA monitor bundle when the ExtensibleLoadManager is enabled #22485

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -496,30 +496,20 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
owner = serviceUnitStateChannel.getChannelOwnerAsync();
} else {
String candidateBrokerId = getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
if (candidateBrokerId != null) {
owner = CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
} else {
owner = getOrSelectOwnerAsync(serviceUnit, bundle).thenApply(Optional::ofNullable);
}
owner = getHeartbeatOrSLAMonitorBrokerId(serviceUnit).thenCompose(candidateBrokerId -> {
if (candidateBrokerId != null) {
return CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
}
return getOrSelectOwnerAsync(serviceUnit, bundle).thenApply(Optional::ofNullable);
});
}
return getBrokerLookupData(owner, bundle);
});
}

private String getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) {
// Check if this is Heartbeat or SLAMonitor namespace
String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit);
if (candidateBroker == null) {
candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit);
}
if (candidateBroker == null) {
candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit);
}
if (candidateBroker != null) {
return candidateBroker.substring(candidateBroker.lastIndexOf('/') + 1);
}
return candidateBroker;
private CompletableFuture<String> getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) {
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
return pulsar.getNamespaceService().getHeartbeatOrSLAMonitorBrokerId(serviceUnit,
cb -> brokerRegistry.lookupAsync(cb).thenApply(Optional::isPresent));
}

private CompletableFuture<String> getOrSelectOwnerAsync(ServiceUnitId serviceUnit,
Expand Down Expand Up @@ -666,11 +656,12 @@ public CompletableFuture<Optional<String>> getOwnershipAsync(Optional<ServiceUni
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
return serviceUnitStateChannel.getChannelOwnerAsync();
}
String candidateBroker = getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
if (candidateBroker != null) {
return CompletableFuture.completedFuture(Optional.of(candidateBroker));
}
return serviceUnitStateChannel.getOwnerAsync(bundle);
return getHeartbeatOrSLAMonitorBrokerId(serviceUnit).thenCompose(candidateBroker -> {
if (candidateBroker != null) {
return CompletableFuture.completedFuture(Optional.of(candidateBroker));
}
return serviceUnitStateChannel.getOwnerAsync(bundle);
});
}

public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataAsync(ServiceUnitId bundleUnit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -536,6 +537,38 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
});
}

/**
* Check if this is Heartbeat or SLAMonitor namespace and return the broker id.
*
* @param serviceUnit the service unit
* @param isBrokerActive the function to check if the broker is active
* @return the broker id
*/
public CompletableFuture<String> getHeartbeatOrSLAMonitorBrokerId(
ServiceUnitId serviceUnit, Function<String, CompletableFuture<Boolean>> isBrokerActive) {
String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit);
if (candidateBroker != null) {
return CompletableFuture.completedFuture(candidateBroker);
}
candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit);
if (candidateBroker != null) {
return CompletableFuture.completedFuture(candidateBroker);
}
candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit);
if (candidateBroker != null) {
// Check if the broker is available
final String finalCandidateBroker = candidateBroker;
return isBrokerActive.apply(candidateBroker).thenApply(isActive -> {
if (isActive) {
return finalCandidateBroker;
} else {
return null;
}
});
}
return CompletableFuture.completedFuture(null);
}

private void searchForCandidateBroker(NamespaceBundle bundle,
CompletableFuture<Optional<LookupResult>> lookupFuture,
LookupOptions options) {
Expand Down Expand Up @@ -563,17 +596,9 @@ private void searchForCandidateBroker(NamespaceBundle bundle,

try {
// check if this is Heartbeat or SLAMonitor namespace
candidateBroker = checkHeartbeatNamespace(bundle);
if (candidateBroker == null) {
candidateBroker = checkHeartbeatNamespaceV2(bundle);
}
if (candidateBroker == null) {
String broker = getSLAMonitorBrokerName(bundle);
// checking if the broker is up and running
if (broker != null && isBrokerActive(broker)) {
candidateBroker = broker;
}
}
candidateBroker = getHeartbeatOrSLAMonitorBrokerId(bundle, cb ->
CompletableFuture.completedFuture(isBrokerActive(cb)))
.get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS);

if (candidateBroker == null) {
Optional<LeaderBroker> currentLeader = pulsar.getLeaderElectionService().getCurrentLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -113,6 +114,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -998,6 +1000,47 @@ public void testDeployAndRollbackLoadManager() throws Exception {
pulsar.getBrokerId(), pulsar.getBrokerServiceUrl());
}
}
// Check if the broker is available
var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get();
var loadManager4 = spy((ExtensibleLoadManagerImpl)
FieldUtils.readField(wrapper, "loadManager", true));
loadManager4.getBrokerRegistry().unregister();

NamespaceName slaMonitorNamespace =
getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration());
String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test");
String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
assertNotNull(result);
log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result);
assertNotEquals(result, pulsar4.getBrokerServiceUrl());

Producer<String> producer = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create();
producer.send("t1");

// Test re-register broker and check the lookup result
loadManager4.getBrokerRegistry().register();

result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
assertNotNull(result);
log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result);
assertEquals(result, pulsar4.getBrokerServiceUrl());

producer.send("t2");
Producer<String> producer1 = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create();
producer1.send("t3");

producer.close();
producer1.close();
@Cleanup
Consumer<String> consumer = pulsar.getClient().newConsumer(Schema.STRING)
.topic(slaMonitorTopic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("test")
.subscribe();
// receive message t1 t2 t3
assertEquals(consumer.receive().getValue(), "t1");
assertEquals(consumer.receive().getValue(), "t2");
assertEquals(consumer.receive().getValue(), "t3");
}
}
}
Expand Down
Loading