Skip to content

Commit

Permalink
[fix][broker] Handle the case when getOwnedServiceUnits fails grace…
Browse files Browse the repository at this point in the history
…fully (apache#23119)
  • Loading branch information
Demogorgon314 authored and Technoboy- committed Aug 6, 2024
1 parent 2d162ba commit fff41c3
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private final String brokerId;
private final Map<String, CompletableFuture<Void>> cleanupJobs;
private final StateChangeListeners stateChangeListeners;
private ExtensibleLoadManagerImpl loadManager;
private BrokerRegistry brokerRegistry;
private LeaderElectionService leaderElectionService;
private TableView<ServiceUnitStateData> tableview;
Expand Down Expand Up @@ -296,7 +295,6 @@ public synchronized void start() throws PulsarServerException {
log.warn("Failed to find the channel leader.");
}
this.channelState = LeaderElectionServiceStarted;
loadManager = getLoadManager();

if (producer != null) {
producer.close();
Expand Down Expand Up @@ -561,6 +559,9 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
}

private Optional<String> getOwner(String serviceUnit) {
if (!validateChannelState(Started, true)) {
throw new IllegalStateException("Invalid channel state:" + channelState.name());
}
ServiceUnitStateData data = tableview.get(serviceUnit);
ServiceUnitState state = state(data);
switch (state) {
Expand Down Expand Up @@ -1715,6 +1716,9 @@ public void listen(StateChangeListener listener) {

@Override
public Set<Map.Entry<String, ServiceUnitStateData>> getOwnershipEntrySet() {
if (!validateChannelState(Started, true)) {
throw new IllegalStateException("Invalid channel state:" + channelState.name());
}
return tableview.entrySet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,11 @@ public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener
}
}
pulsar.runWhenReadyForIncomingRequests(() -> {
getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners));
try {
getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners));
} catch (Exception e) {
LOG.error("Failed to notify namespace bundle ownership listener", e);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.junit.Assert.assertThrows;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -1759,6 +1760,18 @@ public void testActiveGetOwner() throws Exception {

}

@Test(priority = 20)
public void testGetOwnershipEntrySetBeforeChannelStart() {
var tmpChannel = new ServiceUnitStateChannelImpl(pulsar1);
try {
tmpChannel.getOwnershipEntrySet();
fail();
} catch (Exception e) {
assertTrue(e instanceof IllegalStateException);
assertEquals("Invalid channel state:Constructed", e.getMessage());
}
}


private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertTrue;

@Test(groups = "broker")
Expand Down Expand Up @@ -102,6 +104,25 @@ public void unLoad(NamespaceBundle bundle) {
deleteNamespaceWithRetry(namespace, false);
}

@Test
public void testAddNamespaceBundleOwnershipListenerBeforeLBStart() {
NamespaceService namespaceService = spy(new NamespaceService(pulsar));
doThrow(new IllegalStateException("The LM is not initialized"))
.when(namespaceService).getOwnedServiceUnits();
namespaceService.addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() {
@Override
public void onLoad(NamespaceBundle bundle) {}

@Override
public void unLoad(NamespaceBundle bundle) {}

@Override
public boolean test(NamespaceBundle namespaceBundle) {
return false;
}
});
}

@Test
public void testGetAllPartitions() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString();
Expand Down

0 comments on commit fff41c3

Please sign in to comment.