Skip to content

Commit ed773f4

Browse files
committed
[improve][broker] Gracefully shut down load balancer extension
1 parent 96367e1 commit ed773f4

File tree

7 files changed

+139
-24
lines changed

7 files changed

+139
-24
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

+17-5
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,17 @@ public void closeMetadataServiceSession() throws Exception {
382382
localMetadataStore.close();
383383
}
384384

385+
private void closeLeaderElectionService() throws Exception {
386+
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
387+
ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService().close();
388+
} else {
389+
if (this.leaderElectionService != null) {
390+
this.leaderElectionService.close();
391+
this.leaderElectionService = null;
392+
}
393+
}
394+
}
395+
385396
@Override
386397
public void close() throws PulsarServerException {
387398
try {
@@ -502,10 +513,7 @@ public CompletableFuture<Void> closeAsync() {
502513
this.bkClientFactory = null;
503514
}
504515

505-
if (this.leaderElectionService != null) {
506-
this.leaderElectionService.close();
507-
this.leaderElectionService = null;
508-
}
516+
closeLeaderElectionService();
509517

510518
if (adminClient != null) {
511519
adminClient.close();
@@ -1316,7 +1324,11 @@ public boolean isRunning() {
13161324
* @return a reference of the current <code>LeaderElectionService</code> instance.
13171325
*/
13181326
public LeaderElectionService getLeaderElectionService() {
1319-
return this.leaderElectionService;
1327+
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
1328+
return ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService();
1329+
} else {
1330+
return this.leaderElectionService;
1331+
}
13201332
}
13211333

13221334
/**

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+16
Original file line numberDiff line numberDiff line change
@@ -380,12 +380,22 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
380380
}
381381

382382
public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
383+
return selectAsync(bundle, Optional.empty());
384+
}
385+
386+
public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle,
387+
Optional<Set<String>> excludeBrokerSet) {
383388
BrokerRegistry brokerRegistry = getBrokerRegistry();
384389
return brokerRegistry.getAvailableBrokerLookupDataAsync()
385390
.thenCompose(availableBrokers -> {
386391
LoadManagerContext context = this.getContext();
387392

388393
Map<String, BrokerLookupData> availableBrokerCandidates = new HashMap<>(availableBrokers);
394+
if (excludeBrokerSet.isPresent()) {
395+
for (String exclude : excludeBrokerSet.get()) {
396+
availableBrokerCandidates.remove(exclude);
397+
}
398+
}
389399

390400
// Filter out brokers that do not meet the rules.
391401
List<BrokerFilter> filterPipeline = getBrokerFilterPipeline();
@@ -685,4 +695,10 @@ private void monitor() {
685695
log.error("Failed to get the channel ownership.", e);
686696
}
687697
}
698+
699+
public void disableBroker() throws Exception {
700+
serviceUnitStateChannel.cleanOwnerships();
701+
leaderElectionService.close();
702+
brokerRegistry.unregister();
703+
}
688704
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> to
7474

7575
@Override
7676
public void disableBroker() throws Exception {
77-
this.loadManager.getBrokerRegistry().unregister();
77+
this.loadManager.disableBroker();
7878
}
7979

8080
@Override

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java

+5
Original file line numberDiff line numberDiff line change
@@ -206,4 +206,9 @@ public interface ServiceUnitStateChannel extends Closeable {
206206
* Cancels the ownership monitor.
207207
*/
208208
void cancelOwnershipMonitor();
209+
210+
/**
211+
* Cleans the service unit ownerships from the current broker's channel.
212+
*/
213+
void cleanOwnerships();
209214
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+50-13
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
110110

111111
public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD;
112112
private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30 * 1000; // 30sec
113+
114+
private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
115+
private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100;
116+
private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000;
113117
public static final long VERSION_ID_INIT = 1; // initial versionId
114118
private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60;
115119
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins
@@ -694,6 +698,8 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
694698
if (isTargetBroker(data.dstBroker())) {
695699
log(null, serviceUnit, data, null);
696700
lastOwnEventHandledAt = System.currentTimeMillis();
701+
} else if (data.force() && isTargetBroker(data.sourceBroker())) {
702+
closeServiceUnit(serviceUnit);
697703
}
698704
}
699705

@@ -1114,13 +1120,13 @@ private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitState
11141120
Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
11151121
true, getNextVersionId(orphanData));
11161122
} else {
1117-
return new ServiceUnitStateData(Owned, selectedBroker, true, getNextVersionId(orphanData));
1123+
return new ServiceUnitStateData(Owned, selectedBroker, orphanData.dstBroker(),
1124+
true, getNextVersionId(orphanData));
11181125
}
11191126
}
11201127

1121-
private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData) {
1122-
1123-
Optional<String> selectedBroker = selectBroker(serviceUnit);
1128+
private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) {
1129+
Optional<String> selectedBroker = selectBroker(serviceUnit, inactiveBroker);
11241130
if (selectedBroker.isPresent()) {
11251131
var override = getOverrideInactiveBrokerStateData(orphanData, selectedBroker.get());
11261132
log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
@@ -1140,8 +1146,37 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa
11401146
}
11411147
}
11421148

1149+
public void cleanOwnerships() {
1150+
doCleanup(lookupServiceAddress);
1151+
long started = System.currentTimeMillis();
1152+
while (System.currentTimeMillis() - started < OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS) {
1153+
boolean cleaned = true;
1154+
for (var data : tableview.values()) {
1155+
if (data.state() == Owned && data.dstBroker().equals(lookupServiceAddress)) {
1156+
cleaned = false;
1157+
break;
1158+
}
1159+
}
1160+
if (cleaned) {
1161+
try {
1162+
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS);
1163+
} catch (InterruptedException e) {
1164+
log.warn("Interrupted while gracefully waiting for the cleanup convergence.");
1165+
}
1166+
break;
1167+
} else {
1168+
try {
1169+
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
1170+
} catch (InterruptedException e) {
1171+
log.warn("Interrupted while delaying the next service unit clean-up. Cleaning broker:{}",
1172+
lookupServiceAddress);
1173+
}
1174+
}
1175+
}
1176+
}
1177+
11431178

1144-
private void doCleanup(String broker) {
1179+
private synchronized void doCleanup(String broker) {
11451180
long startTime = System.nanoTime();
11461181
log.info("Started ownership cleanup for the inactive broker:{}", broker);
11471182
int orphanServiceUnitCleanupCnt = 0;
@@ -1153,13 +1188,13 @@ private void doCleanup(String broker) {
11531188
var state = state(stateData);
11541189
if (StringUtils.equals(broker, stateData.dstBroker())) {
11551190
if (isActiveState(state)) {
1156-
overrideOwnership(serviceUnit, stateData);
1191+
overrideOwnership(serviceUnit, stateData, broker);
11571192
orphanServiceUnitCleanupCnt++;
11581193
}
11591194

11601195
} else if (StringUtils.equals(broker, stateData.sourceBroker())) {
11611196
if (isInFlightState(state)) {
1162-
overrideOwnership(serviceUnit, stateData);
1197+
overrideOwnership(serviceUnit, stateData, broker);
11631198
orphanServiceUnitCleanupCnt++;
11641199
}
11651200
}
@@ -1194,18 +1229,20 @@ private void doCleanup(String broker) {
11941229

11951230
}
11961231

1197-
private Optional<String> selectBroker(String serviceUnit) {
1232+
private Optional<String> selectBroker(String serviceUnit, String inactiveBroker) {
11981233
try {
1199-
return loadManager.selectAsync(getNamespaceBundle(serviceUnit))
1234+
return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Optional.of(Set.of(inactiveBroker)))
12001235
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
12011236
} catch (Throwable e) {
12021237
log.error("Failed to select a broker for serviceUnit:{}", serviceUnit);
12031238
}
12041239
return Optional.empty();
12051240
}
12061241

1207-
private Optional<ServiceUnitStateData> getRollForwardStateData(String serviceUnit, long nextVersionId) {
1208-
Optional<String> selectedBroker = selectBroker(serviceUnit);
1242+
private Optional<ServiceUnitStateData> getRollForwardStateData(String serviceUnit,
1243+
String inactiveBroker,
1244+
long nextVersionId) {
1245+
Optional<String> selectedBroker = selectBroker(serviceUnit, inactiveBroker);
12091246
if (selectedBroker.isEmpty()) {
12101247
return Optional.empty();
12111248
}
@@ -1220,7 +1257,7 @@ private Optional<ServiceUnitStateData> getOverrideInFlightStateData(
12201257
var state = orphanData.state();
12211258
switch (state) {
12221259
case Assigning: {
1223-
return getRollForwardStateData(serviceUnit, nextVersionId);
1260+
return getRollForwardStateData(serviceUnit, orphanData.dstBroker(), nextVersionId);
12241261
}
12251262
case Splitting: {
12261263
return Optional.of(new ServiceUnitStateData(Splitting,
@@ -1233,7 +1270,7 @@ private Optional<ServiceUnitStateData> getOverrideInFlightStateData(
12331270
// rollback to the src
12341271
return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true, nextVersionId));
12351272
} else {
1236-
return getRollForwardStateData(serviceUnit, nextVersionId);
1273+
return getRollForwardStateData(serviceUnit, orphanData.sourceBroker(), nextVersionId);
12371274
}
12381275
}
12391276
default: {

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

+45
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,51 @@ SplitDecision.Reason.Unknown, new AtomicLong(6))
880880
assertEquals(actual, expected);
881881
}
882882

883+
@Test
884+
public void testDisableBroker() throws Exception {
885+
// Test rollback to modular load manager.
886+
ServiceConfiguration defaultConf = getDefaultConf();
887+
defaultConf.setAllowAutoTopicCreation(true);
888+
defaultConf.setForceDeleteNamespaceAllowed(true);
889+
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
890+
defaultConf.setLoadBalancerSheddingEnabled(false);
891+
try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) {
892+
var pulsar3 = additionalPulsarTestContext.getPulsarService();
893+
ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl)
894+
FieldUtils.readField(pulsar3.getLoadManager().get(), "loadManager", true));
895+
String topic = "persistent://public/default/test";
896+
897+
String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
898+
TopicName topicName = TopicName.get("test");
899+
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
900+
if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) {
901+
admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(),
902+
pulsar3.getLookupServiceAddress());
903+
lookupResult1 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
904+
}
905+
String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic);
906+
String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
907+
908+
assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
909+
assertEquals(lookupResult1, lookupResult2);
910+
assertEquals(lookupResult1, lookupResult3);
911+
912+
913+
assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
914+
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
915+
assertTrue(ternaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
916+
917+
ternaryLoadManager.disableBroker();
918+
919+
assertFalse(ternaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
920+
if (primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()) {
921+
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
922+
} else {
923+
assertTrue(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
924+
}
925+
}
926+
}
927+
883928
private static abstract class MockBrokerFilter implements BrokerFilter {
884929

885930
@Override

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ public void transferTestWhenDestBrokerFails()
515515

516516
// recovered, check the monitor update state : Assigned -> Owned
517517
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
518-
.when(loadManager).selectAsync(any());
518+
.when(loadManager).selectAsync(any(), any());
519519
FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
520520
FieldUtils.writeDeclaredField(channel1,
521521
"inFlightStateWaitingTimeInMillis", 1 , true);
@@ -735,7 +735,7 @@ public void handleBrokerDeletionEventTest()
735735
var owner1 = channel1.getOwnerAsync(bundle1);
736736
var owner2 = channel2.getOwnerAsync(bundle2);
737737
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
738-
.when(loadManager).selectAsync(any());
738+
.when(loadManager).selectAsync(any(), any());
739739
assertTrue(owner1.get().isEmpty());
740740
assertTrue(owner2.get().isEmpty());
741741

@@ -1101,7 +1101,7 @@ public void assignTestWhenDestBrokerProducerFails()
11011101
FieldUtils.writeDeclaredField(channel2,
11021102
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
11031103
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
1104-
.when(loadManager).selectAsync(any());
1104+
.when(loadManager).selectAsync(any(), any());
11051105
channel1.publishAssignEventAsync(bundle, lookupServiceAddress2);
11061106
// channel1 is broken. the assign won't be complete.
11071107
waitUntilState(channel1, bundle);
@@ -1440,7 +1440,7 @@ public void testOverrideInactiveBrokerStateData()
14401440

14411441
// test stable metadata state
14421442
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
1443-
.when(loadManager).selectAsync(any());
1443+
.when(loadManager).selectAsync(any(), any());
14441444
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
14451445
followerChannel.handleMetadataSessionEvent(SessionReestablished);
14461446
FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
@@ -1505,7 +1505,7 @@ public void testOverrideOrphanStateData()
15051505

15061506
// test stable metadata state
15071507
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
1508-
.when(loadManager).selectAsync(any());
1508+
.when(loadManager).selectAsync(any(), any());
15091509
FieldUtils.writeDeclaredField(leaderChannel, "inFlightStateWaitingTimeInMillis",
15101510
-1, true);
15111511
FieldUtils.writeDeclaredField(followerChannel, "inFlightStateWaitingTimeInMillis",

0 commit comments

Comments
 (0)