Skip to content

Commit

Permalink
[fix][broker] getOwnedServiceUnits NPE (#20625)
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 authored Jun 21, 2023
1 parent ffc84c9 commit 7bb3531
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private SplitManager splitManager;

private boolean started = false;
private volatile boolean started = false;

private final AssignCounter assignCounter = new AssignCounter();
@Getter
Expand All @@ -191,6 +191,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
* Get all the bundles that are owned by this broker.
*/
public Set<NamespaceBundle> getOwnedServiceUnits() {
if (!started) {
log.warn("Failed to get owned service units, load manager is not started.");
return Collections.emptySet();
}
Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
String brokerId = brokerRegistry.getBrokerId();
return entrySet.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,14 @@ private void assertOwnedServiceUnits(
assertEquals(status.broker_assignment, BrokerAssignment.shared);
}

@Test(timeOut = 30 * 1000)
public void testGetOwnedServiceUnitsWhenLoadManagerNotStart() {
ExtensibleLoadManagerImpl loadManager = new ExtensibleLoadManagerImpl();
Set<NamespaceBundle> ownedServiceUnits = loadManager.getOwnedServiceUnits();
assertNotNull(ownedServiceUnits);
assertTrue(ownedServiceUnits.isEmpty());
}

@Test(timeOut = 30 * 1000)
public void testTryAcquiringOwnership()
throws PulsarAdminException, ExecutionException, InterruptedException {
Expand Down

0 comments on commit 7bb3531

Please sign in to comment.