Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192 updated metrics and cleanup broker selector #19945

Merged
merged 11 commits into from
Apr 5, 2023
Original file line number Diff line number Diff line change
@@ -392,8 +392,8 @@ static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName nam
}
}

static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName,
String cluster) throws IOException {
public static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName,
String cluster) throws IOException {
createNamespaceIfAbsent(resources, namespaceName, cluster, DEFAULT_BUNDLE_NUMBER);
}

Original file line number Diff line number Diff line change
@@ -159,8 +159,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
private final UnloadCounter unloadCounter = new UnloadCounter();
private final SplitCounter splitCounter = new SplitCounter();

// record load metrics
private final AtomicReference<List<Metrics>> brokerLoadMetrics = new AtomicReference<>();
// record unload metrics
private final AtomicReference<List<Metrics>> unloadMetrics = new AtomicReference();
// record split metrics
@@ -332,7 +330,6 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
.thenApply(Optional::of);
} else {
assignCounter.incrementEmpty();
throw new IllegalStateException(
"Failed to select the new owner broker for bundle: " + bundle);
}
@@ -362,11 +359,17 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
return CompletableFuture.completedFuture(brokerLookupData);
}));
});
future.whenComplete((r, t) -> lookupRequests.remove(bundle));
future.whenComplete((r, t) -> {
if (t != null) {
assignCounter.incrementFailure();
}
lookupRequests.remove(bundle);
}
);
return future;
}

private CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenCompose(availableBrokers -> {
@@ -627,20 +630,12 @@ void playFollower() {
}
}

void updateBrokerLoadMetrics(BrokerLoadData loadData) {
this.brokerLoadMetrics.set(loadData.toMetrics(pulsar.getAdvertisedAddress()));
}

private void updateUnloadMetrics(UnloadDecision decision) {
unloadCounter.update(decision);
this.unloadMetrics.set(unloadCounter.toMetrics(pulsar.getAdvertisedAddress()));
}

public List<Metrics> getMetrics() {
List<Metrics> metricsCollection = new ArrayList<>();

if (this.brokerLoadMetrics.get() != null) {
metricsCollection.addAll(this.brokerLoadMetrics.get());
if (this.brokerLoadDataReporter != null) {
metricsCollection.addAll(brokerLoadDataReporter.generateLoadData()
.toMetrics(pulsar.getAdvertisedAddress()));
}
if (this.unloadMetrics.get() != null) {
metricsCollection.addAll(this.unloadMetrics.get());

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.models;

import static org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter.Label.Empty;
import static org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter.Label.Failure;
import static org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter.Label.Skip;
import static org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter.Label.Success;
import java.util.ArrayList;
@@ -35,7 +35,7 @@ public class AssignCounter {

enum Label {
Success,
Empty,
Failure,
Skip,
}

@@ -44,7 +44,7 @@ enum Label {
public AssignCounter() {
breakdownCounters = Map.of(
Success, new AtomicLong(),
Empty, new AtomicLong(),
Failure, new AtomicLong(),
Skip, new AtomicLong()
);
}
@@ -54,8 +54,8 @@ public void incrementSuccess() {
breakdownCounters.get(Success).incrementAndGet();
}

public void incrementEmpty() {
breakdownCounters.get(Empty).incrementAndGet();
public void incrementFailure() {
breakdownCounters.get(Failure).incrementAndGet();
}

public void incrementSkip() {
Original file line number Diff line number Diff line change
@@ -98,9 +98,8 @@ public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarS

if (!channel.isOwner(bundle)) {
if (debug) {
log.error(String.format(CANNOT_SPLIT_BUNDLE_MSG
log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG
+ " This broker is not the owner.", bundle));
counter.update(Failure, Unknown);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need update counter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This can happen a lot, and we don't want to add noise in failure count.

}
continue;
}
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -81,6 +82,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.lookup.LookupResult;
@@ -640,8 +642,8 @@ public void testRoleChange()
@Test
public void testGetMetrics() throws Exception {
{
var brokerLoadMetrics = (AtomicReference<List<Metrics>>)
FieldUtils.readDeclaredField(primaryLoadManager, "brokerLoadMetrics", true);
var brokerLoadDataReporter = mock(BrokerLoadDataReporter.class);
FieldUtils.writeDeclaredField(primaryLoadManager, "brokerLoadDataReporter", brokerLoadDataReporter, true);
BrokerLoadData loadData = new BrokerLoadData();
SystemResourceUsage usage = new SystemResourceUsage();
var cpu = new ResourceUsage(1.0, 100.0);
@@ -655,7 +657,7 @@ public void testGetMetrics() throws Exception {
usage.setBandwidthIn(bandwidthIn);
usage.setBandwidthOut(bandwidthOut);
loadData.update(usage, 1, 2, 3, 4, 5, 6, conf);
brokerLoadMetrics.set(loadData.toMetrics(pulsar.getAdvertisedAddress()));
doReturn(loadData).when(brokerLoadDataReporter).generateLoadData();
}
{
var unloadMetrics = (AtomicReference<List<Metrics>>)
@@ -705,16 +707,17 @@ SplitDecision.Reason.Unknown, new AtomicLong(6))
{
AssignCounter assignCounter = new AssignCounter();
assignCounter.incrementSuccess();
assignCounter.incrementEmpty();
assignCounter.incrementEmpty();
assignCounter.incrementFailure();
assignCounter.incrementFailure();
assignCounter.incrementSkip();
assignCounter.incrementSkip();
assignCounter.incrementSkip();
FieldUtils.writeDeclaredField(primaryLoadManager, "assignCounter", assignCounter, true);
}

{

FieldUtils.writeDeclaredField(channel1, "lastOwnedServiceUnitCountAt", System.currentTimeMillis(), true);
FieldUtils.writeDeclaredField(channel1, "totalOwnedServiceUnitCnt", 10, true);
FieldUtils.writeDeclaredField(channel1, "totalInactiveBrokerCleanupCnt", 1, true);
FieldUtils.writeDeclaredField(channel1, "totalServiceUnitTombstoneCleanupCnt", 2, true);
FieldUtils.writeDeclaredField(channel1, "totalOrphanServiceUnitCleanupCnt", 3, true);
@@ -723,21 +726,21 @@ SplitDecision.Reason.Unknown, new AtomicLong(6))
FieldUtils.writeDeclaredField(channel1, "totalInactiveBrokerCleanupIgnoredCnt", 6, true);
FieldUtils.writeDeclaredField(channel1, "totalInactiveBrokerCleanupCancelledCnt", 7, true);

Map<ServiceUnitState, AtomicLong> ownerLookUpCounters = new LinkedHashMap<>();
Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters> ownerLookUpCounters = new LinkedHashMap<>();
Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters> handlerCounters = new LinkedHashMap<>();
Map<ServiceUnitStateChannelImpl.EventType, ServiceUnitStateChannelImpl.Counters> eventCounters =
new LinkedHashMap<>();
int i = 1;
int j = 0;
for (var state : ServiceUnitState.values()) {
ownerLookUpCounters.put(state, new AtomicLong(i));
ownerLookUpCounters.put(state,
new ServiceUnitStateChannelImpl.Counters(
new AtomicLong(j + 1), new AtomicLong(j + 2)));
handlerCounters.put(state,
new ServiceUnitStateChannelImpl.Counters(
new AtomicLong(j + 1), new AtomicLong(j + 2)));
i++;
j += 2;
}
i = 0;
int i = 0;
for (var type : ServiceUnitStateChannelImpl.EventType.values()) {
eventCounters.put(type,
new ServiceUnitStateChannelImpl.Counters(
@@ -774,22 +777,31 @@ SplitDecision.Reason.Unknown, new AtomicLong(6))
dimensions=[{broker=localhost, metric=bundlesSplit, reason=Bandwidth, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=4}]
dimensions=[{broker=localhost, metric=bundlesSplit, reason=Admin, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=5}]
dimensions=[{broker=localhost, metric=bundlesSplit, reason=Unknown, result=Failure}], metrics=[{brk_lb_bundles_split_breakdown_total=6}]
dimensions=[{broker=localhost, metric=assign, result=Empty}], metrics=[{brk_lb_assign_broker_breakdown_total=2}]
dimensions=[{broker=localhost, metric=assign, result=Failure}], metrics=[{brk_lb_assign_broker_breakdown_total=2}]
dimensions=[{broker=localhost, metric=assign, result=Skip}], metrics=[{brk_lb_assign_broker_breakdown_total=3}]
dimensions=[{broker=localhost, metric=assign, result=Success}], metrics=[{brk_lb_assign_broker_breakdown_total=1}]
dimensions=[{broker=localhost, metric=sunitStateChn, state=Init}], metrics=[{brk_sunit_state_chn_owner_lookup_total=1}]
dimensions=[{broker=localhost, metric=sunitStateChn, state=Free}], metrics=[{brk_sunit_state_chn_owner_lookup_total=2}]
dimensions=[{broker=localhost, metric=sunitStateChn, state=Owned}], metrics=[{brk_sunit_state_chn_owner_lookup_total=3}]
dimensions=[{broker=localhost, metric=sunitStateChn, state=Assigning}], metrics=[{brk_sunit_state_chn_owner_lookup_total=4}]
dimensions=[{broker=localhost, metric=sunitStateChn, state=Releasing}], metrics=[{brk_sunit_state_chn_owner_lookup_total=5}]
dimensions=[{broker=localhost, metric=sunitStateChn, state=Splitting}], metrics=[{brk_sunit_state_chn_owner_lookup_total=6}]
dimensions=[{broker=localhost, metric=sunitStateChn, state=Deleted}], metrics=[{brk_sunit_state_chn_owner_lookup_total=7}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Init}], metrics=[{brk_sunit_state_chn_owner_lookup_total=1}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Init}], metrics=[{brk_sunit_state_chn_owner_lookup_total=2}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Free}], metrics=[{brk_sunit_state_chn_owner_lookup_total=3}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Free}], metrics=[{brk_sunit_state_chn_owner_lookup_total=4}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Owned}], metrics=[{brk_sunit_state_chn_owner_lookup_total=5}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Owned}], metrics=[{brk_sunit_state_chn_owner_lookup_total=6}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Assigning}], metrics=[{brk_sunit_state_chn_owner_lookup_total=7}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Assigning}], metrics=[{brk_sunit_state_chn_owner_lookup_total=8}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Releasing}], metrics=[{brk_sunit_state_chn_owner_lookup_total=9}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Releasing}], metrics=[{brk_sunit_state_chn_owner_lookup_total=10}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Splitting}], metrics=[{brk_sunit_state_chn_owner_lookup_total=11}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Splitting}], metrics=[{brk_sunit_state_chn_owner_lookup_total=12}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Total, state=Deleted}], metrics=[{brk_sunit_state_chn_owner_lookup_total=13}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Failure, state=Deleted}], metrics=[{brk_sunit_state_chn_owner_lookup_total=14}]
dimensions=[{broker=localhost, event=Assign, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=1}]
dimensions=[{broker=localhost, event=Assign, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=2}]
dimensions=[{broker=localhost, event=Split, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=3}]
dimensions=[{broker=localhost, event=Split, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=4}]
dimensions=[{broker=localhost, event=Unload, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=5}]
dimensions=[{broker=localhost, event=Unload, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=6}]
dimensions=[{broker=localhost, event=Override, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=7}]
dimensions=[{broker=localhost, event=Override, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_event_publish_ops_total=8}]
dimensions=[{broker=localhost, event=Init, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=1}]
dimensions=[{broker=localhost, event=Init, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=2}]
dimensions=[{broker=localhost, event=Free, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=3}]
@@ -808,7 +820,8 @@ SplitDecision.Reason.Unknown, new AtomicLong(6))
dimensions=[{broker=localhost, metric=sunitStateChn, result=Skip}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=6}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Cancel}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=7}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Schedule}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=5}]
dimensions=[{broker=localhost, metric=sunitStateChn}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=1, brk_sunit_state_chn_orphan_su_cleanup_ops_total=3, brk_sunit_state_chn_su_tombstone_cleanup_ops_total=2}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Success}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=1}]
dimensions=[{broker=localhost, metric=sunitStateChn}], metrics=[{brk_sunit_state_chn_orphan_su_cleanup_ops_total=3, brk_sunit_state_chn_owned_su_total=10, brk_sunit_state_chn_su_tombstone_cleanup_ops_total=2}]
""".split("\n"));
var actual = primaryLoadManager.getMetrics().stream().map(m -> m.toString()).collect(Collectors.toSet());
assertEquals(actual, expected);
Original file line number Diff line number Diff line change
@@ -75,11 +75,11 @@
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistryImpl;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Producer;
@@ -116,7 +116,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {

private BrokerRegistryImpl registry;

private BrokerSelectionStrategy brokerSelector;
private ExtensibleLoadManagerImpl loadManager;

@BeforeClass
@Override
@@ -135,7 +135,7 @@ protected void setup() throws Exception {
loadManagerContext = mock(LoadManagerContext.class);
doReturn(mock(LoadDataStore.class)).when(loadManagerContext).brokerLoadDataStore();
doReturn(mock(LoadDataStore.class)).when(loadManagerContext).topBundleLoadDataStore();
brokerSelector = mock(BrokerSelectionStrategy.class);
loadManager = mock(ExtensibleLoadManagerImpl.class);
additionalPulsarTestContext = createAdditionalPulsarTestContext(getDefaultConf());
pulsar2 = additionalPulsarTestContext.getPulsarService();

@@ -496,7 +496,7 @@ public void transferTestWhenDestBrokerFails()
assertEquals(0, getOwnerRequests2.size());

// recovered, check the monitor update state : Assigned -> Owned
doReturn(Optional.of(lookupServiceAddress1)).when(brokerSelector).select(any(), any(), any());
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1))).when(loadManager).selectAsync(any());
FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1 , true);
@@ -714,7 +714,7 @@ public void handleBrokerDeletionEventTest()

var owner1 = channel1.getOwnerAsync(bundle1);
var owner2 = channel2.getOwnerAsync(bundle2);
doReturn(Optional.of(lookupServiceAddress2)).when(brokerSelector).select(any(), any(), any());
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))).when(loadManager).selectAsync(any());

assertTrue(owner1.get().isEmpty());
assertTrue(owner2.get().isEmpty());
@@ -1076,7 +1076,7 @@ public void assignTestWhenDestBrokerFails()
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
doReturn(Optional.of(lookupServiceAddress2)).when(brokerSelector).select(any(), any(), any());
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))).when(loadManager).selectAsync(any());
channel1.publishAssignEventAsync(bundle, lookupServiceAddress2);
// channel1 is broken. the assign won't be complete.
waitUntilState(channel1, bundle);
@@ -1418,11 +1418,12 @@ private static void cleanOpsCounters(ServiceUnitStateChannel channel)
}

var ownerLookUpCounters =
(Map<ServiceUnitStateChannelImpl.EventType, AtomicLong>)
(Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
FieldUtils.readDeclaredField(channel, "ownerLookUpCounters", true);

for(var val : ownerLookUpCounters.values()){
val.set(0);
val.getFailure().set(0);
val.getTotal().set(0);
}
}

@@ -1518,20 +1519,20 @@ private static void validateOwnerLookUpCounters(ServiceUnitStateChannel channel,
)
throws IllegalAccessException {
var ownerLookUpCounters =
(Map<ServiceUnitState, AtomicLong>)
(Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
FieldUtils.readDeclaredField(channel, "ownerLookUpCounters", true);

Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> { // wait until true
assertEquals(assigned, ownerLookUpCounters.get(Assigning).get());
assertEquals(owned, ownerLookUpCounters.get(Owned).get());
assertEquals(released, ownerLookUpCounters.get(Releasing).get());
assertEquals(splitting, ownerLookUpCounters.get(Splitting).get());
assertEquals(free, ownerLookUpCounters.get(Free).get());
assertEquals(deleted, ownerLookUpCounters.get(Deleted).get());
assertEquals(init, ownerLookUpCounters.get(Init).get());
assertEquals(assigned, ownerLookUpCounters.get(Assigning).getTotal().get());
assertEquals(owned, ownerLookUpCounters.get(Owned).getTotal().get());
assertEquals(released, ownerLookUpCounters.get(Releasing).getTotal().get());
assertEquals(splitting, ownerLookUpCounters.get(Splitting).getTotal().get());
assertEquals(free, ownerLookUpCounters.get(Free).getTotal().get());
assertEquals(deleted, ownerLookUpCounters.get(Deleted).getTotal().get());
assertEquals(init, ownerLookUpCounters.get(Init).getTotal().get());
});
}

@@ -1565,7 +1566,7 @@ ServiceUnitStateChannelImpl createChannel(PulsarService pulsar)

doReturn(loadManagerContext).when(channel).getContext();
doReturn(registry).when(channel).getBrokerRegistry();
doReturn(brokerSelector).when(channel).getBrokerSelector();
doReturn(loadManager).when(channel).getLoadManager();


var leaderElectionService = new LeaderElectionService(
Original file line number Diff line number Diff line change
@@ -169,7 +169,7 @@ public void testNoBundleOwner() {
var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
var expected = Set.of();
assertEquals(actual, expected);
verify(counter, times(2)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
}

public void testError() throws Exception {