Skip to content

Commit a12f554

Browse files
authored
[improve][broker] PIP-192: Added VersionId in ServiceUnitStateData (#19620)
1 parent 145e985 commit a12f554

File tree

7 files changed

+251
-143
lines changed

7 files changed

+251
-143
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+41-18
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
103103
NamespaceName.SYSTEM_NAMESPACE,
104104
"loadbalancer-service-unit-state").toString();
105105
private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30 * 1000; // 30sec
106-
106+
public static final long VERSION_ID_INIT = 1; // initial versionId
107107
private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60;
108108
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins
109109
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately
@@ -451,11 +451,25 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
451451
}
452452
}
453453

454+
private long getNextVersionId(String serviceUnit) {
455+
var data = tableview.get(serviceUnit);
456+
return getNextVersionId(data);
457+
}
458+
459+
private long getNextVersionId(ServiceUnitStateData data) {
460+
return data == null ? VERSION_ID_INIT : data.versionId() + 1;
461+
}
462+
454463
public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, String broker) {
464+
if (!validateChannelState(Started, true)) {
465+
return CompletableFuture.failedFuture(
466+
new IllegalStateException("Invalid channel state:" + channelState.name()));
467+
}
455468
EventType eventType = Assign;
456469
eventCounters.get(eventType).getTotal().incrementAndGet();
457470
CompletableFuture<String> getOwnerRequest = deferGetOwnerRequest(serviceUnit);
458-
pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker))
471+
472+
pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker, getNextVersionId(serviceUnit)))
459473
.whenComplete((__, ex) -> {
460474
if (ex != null) {
461475
getOwnerRequests.remove(serviceUnit, getOwnerRequest);
@@ -469,16 +483,20 @@ public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, Str
469483
}
470484

471485
public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
486+
if (!validateChannelState(Started, true)) {
487+
return CompletableFuture.failedFuture(
488+
new IllegalStateException("Invalid channel state:" + channelState.name()));
489+
}
472490
EventType eventType = Unload;
473491
eventCounters.get(eventType).getTotal().incrementAndGet();
474492
String serviceUnit = unload.serviceUnit();
475493
CompletableFuture<MessageId> future;
476494
if (isTransferCommand(unload)) {
477495
future = pubAsync(serviceUnit, new ServiceUnitStateData(
478-
Assigning, unload.destBroker().get(), unload.sourceBroker()));
496+
Assigning, unload.destBroker().get(), unload.sourceBroker(), getNextVersionId(serviceUnit)));
479497
} else {
480498
future = pubAsync(serviceUnit, new ServiceUnitStateData(
481-
Releasing, unload.sourceBroker()));
499+
Releasing, unload.sourceBroker(), getNextVersionId(serviceUnit)));
482500
}
483501

484502
return future.whenComplete((__, ex) -> {
@@ -489,10 +507,15 @@ public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
489507
}
490508

491509
public CompletableFuture<Void> publishSplitEventAsync(Split split) {
510+
if (!validateChannelState(Started, true)) {
511+
return CompletableFuture.failedFuture(
512+
new IllegalStateException("Invalid channel state:" + channelState.name()));
513+
}
492514
EventType eventType = Split;
493515
eventCounters.get(eventType).getTotal().incrementAndGet();
494516
String serviceUnit = split.serviceUnit();
495-
ServiceUnitStateData next = new ServiceUnitStateData(Splitting, split.sourceBroker());
517+
ServiceUnitStateData next =
518+
new ServiceUnitStateData(Splitting, split.sourceBroker(), getNextVersionId(serviceUnit));
496519
return pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
497520
if (ex != null) {
498521
eventCounters.get(eventType).getFailure().incrementAndGet();
@@ -599,7 +622,8 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
599622
private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
600623
if (isTargetBroker(data.broker())) {
601624
ServiceUnitStateData next = new ServiceUnitStateData(
602-
isTransferCommand(data) ? Releasing : Owned, data.broker(), data.sourceBroker());
625+
isTransferCommand(data) ? Releasing : Owned, data.broker(), data.sourceBroker(),
626+
getNextVersionId(data));
603627
pubAsync(serviceUnit, next)
604628
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
605629
}
@@ -608,15 +632,16 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
608632
private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
609633
if (isTransferCommand(data)) {
610634
if (isTargetBroker(data.sourceBroker())) {
611-
ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
635+
ServiceUnitStateData next =
636+
new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker(), getNextVersionId(data));
612637
// TODO: when close, pass message to clients to connect to the new broker
613638
closeServiceUnit(serviceUnit)
614639
.thenCompose(__ -> pubAsync(serviceUnit, next))
615640
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
616641
}
617642
} else {
618643
if (isTargetBroker(data.broker())) {
619-
ServiceUnitStateData next = new ServiceUnitStateData(Free, data.broker());
644+
ServiceUnitStateData next = new ServiceUnitStateData(Free, data.broker(), getNextVersionId(data));
620645
closeServiceUnit(serviceUnit)
621646
.thenCompose(__ -> pubAsync(serviceUnit, next))
622647
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
@@ -660,10 +685,6 @@ private void handleInitEvent(String serviceUnit) {
660685
}
661686

662687
private CompletableFuture<MessageId> pubAsync(String serviceUnit, ServiceUnitStateData data) {
663-
if (!validateChannelState(Started, true)) {
664-
return CompletableFuture.failedFuture(
665-
new IllegalStateException("Invalid channel state:" + channelState.name()));
666-
}
667688
CompletableFuture<MessageId> future = new CompletableFuture<>();
668689
producer.newMessage()
669690
.key(serviceUnit)
@@ -774,7 +795,7 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
774795
updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
775796
return;
776797
}
777-
ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker());
798+
ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), VERSION_ID_INIT);
778799
NamespaceBundles targetNsBundle = splitBundlesPair.getLeft();
779800
List<NamespaceBundle> splitBundles = Collections.unmodifiableList(splitBundlesPair.getRight());
780801
List<NamespaceBundle> successPublishedBundles =
@@ -812,7 +833,8 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
812833

813834
updateFuture.thenAccept(r -> {
814835
// Delete the old bundle
815-
pubAsync(serviceUnit, new ServiceUnitStateData(Deleted, data.broker())).thenRun(() -> {
836+
pubAsync(serviceUnit, new ServiceUnitStateData(Deleted, data.broker(), getNextVersionId(data)))
837+
.thenRun(() -> {
816838
// Update bundled_topic cache for load-report-generation
817839
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
818840
// TODO: Update the load data immediately if needed.
@@ -938,7 +960,7 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa
938960

939961
Optional<String> selectedBroker = brokerSelector.select(availableBrokers, null, getContext());
940962
if (selectedBroker.isPresent()) {
941-
var override = new ServiceUnitStateData(Owned, selectedBroker.get(), true);
963+
var override = new ServiceUnitStateData(Owned, selectedBroker.get(), true, getNextVersionId(orphanData));
942964
log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
943965
serviceUnit, orphanData, override);
944966
pubAsync(serviceUnit, override).whenComplete((__, e) -> {
@@ -1007,19 +1029,20 @@ private void doCleanup(String broker) throws ExecutionException, InterruptedExce
10071029
private Optional<ServiceUnitStateData> getOverrideStateData(String serviceUnit, ServiceUnitStateData orphanData,
10081030
Set<String> availableBrokers,
10091031
LoadManagerContext context) {
1032+
long nextVersionId = getNextVersionId(orphanData);
10101033
if (isTransferCommand(orphanData)) {
10111034
// rollback to the src
1012-
return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true));
1035+
return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true, nextVersionId));
10131036
} else if (orphanData.state() == Assigning) { // assign
10141037
// roll-forward to another broker
10151038
Optional<String> selectedBroker = brokerSelector.select(availableBrokers, null, context);
10161039
if (selectedBroker.isEmpty()) {
10171040
return Optional.empty();
10181041
}
1019-
return Optional.of(new ServiceUnitStateData(Owned, selectedBroker.get(), true));
1042+
return Optional.of(new ServiceUnitStateData(Owned, selectedBroker.get(), true, nextVersionId));
10201043
} else if (orphanData.state() == Splitting || orphanData.state() == Releasing) {
10211044
// rollback to the target broker for split and unload
1022-
return Optional.of(new ServiceUnitStateData(Owned, orphanData.broker(), true));
1045+
return Optional.of(new ServiceUnitStateData(Owned, orphanData.broker(), true, nextVersionId));
10231046
} else {
10241047
var msg = String.format("Failed to get the overrideStateData from serviceUnit=%s, orphanData=%s",
10251048
serviceUnit, orphanData);

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,19 @@ public void checkBrokers(boolean check) {
5050
public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to) {
5151
if (to == null) {
5252
return false;
53-
} else if (to.force()) {
54-
return false;
5553
}
5654

55+
// Skip the compaction case where from = null and to.versionId > 1
56+
if (from != null && from.versionId() + 1 != to.versionId()) {
57+
return true;
58+
}
59+
60+
if (to.force()) {
61+
return false;
62+
}
5763

5864
ServiceUnitState prevState = state(from);
5965
ServiceUnitState state = state(to);
60-
6166
if (!ServiceUnitState.isValidTransition(prevState, state)) {
6267
return true;
6368
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.extensions.channel;
2020

21-
2221
import java.util.Objects;
2322
import org.apache.commons.lang3.StringUtils;
2423

@@ -28,7 +27,7 @@
2827
*/
2928

3029
public record ServiceUnitStateData(
31-
ServiceUnitState state, String broker, String sourceBroker, boolean force, long timestamp) {
30+
ServiceUnitState state, String broker, String sourceBroker, boolean force, long timestamp, long versionId) {
3231

3332
public ServiceUnitStateData {
3433
Objects.requireNonNull(state);
@@ -37,16 +36,16 @@ public record ServiceUnitStateData(
3736
}
3837
}
3938

40-
public ServiceUnitStateData(ServiceUnitState state, String broker, String sourceBroker) {
41-
this(state, broker, sourceBroker, false, System.currentTimeMillis());
39+
public ServiceUnitStateData(ServiceUnitState state, String broker, String sourceBroker, long versionId) {
40+
this(state, broker, sourceBroker, false, System.currentTimeMillis(), versionId);
4241
}
4342

44-
public ServiceUnitStateData(ServiceUnitState state, String broker) {
45-
this(state, broker, null, false, System.currentTimeMillis());
43+
public ServiceUnitStateData(ServiceUnitState state, String broker, long versionId) {
44+
this(state, broker, null, false, System.currentTimeMillis(), versionId);
4645
}
4746

48-
public ServiceUnitStateData(ServiceUnitState state, String broker, boolean force) {
49-
this(state, broker, null, force, System.currentTimeMillis());
47+
public ServiceUnitStateData(ServiceUnitState state, String broker, boolean force, long versionId) {
48+
this(state, broker, null, force, System.currentTimeMillis(), versionId);
5049
}
5150

5251
public static ServiceUnitState state(ServiceUnitStateData data) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -924,26 +924,26 @@ public void conflictAndCompactionTest() throws ExecutionException, InterruptedEx
924924
@Test(priority = 11)
925925
public void ownerLookupCountTests() throws IllegalAccessException {
926926

927-
overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, "b1"));
927+
overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, "b1", 1));
928928
channel1.getOwnerAsync(bundle);
929929
channel1.getOwnerAsync(bundle);
930930

931-
overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, "b1"));
931+
overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, "b1", 1));
932932
channel1.getOwnerAsync(bundle);
933933
channel1.getOwnerAsync(bundle);
934934
channel1.getOwnerAsync(bundle);
935935

936-
overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, "b1"));
936+
overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, "b1", 1));
937937
channel1.getOwnerAsync(bundle);
938938
channel1.getOwnerAsync(bundle);
939939

940-
overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, "b1"));
940+
overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, "b1", 1));
941941
channel1.getOwnerAsync(bundle);
942942

943-
overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, "b1"));
943+
overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, "b1", 1));
944944
channel1.getOwnerAsync(bundle);
945945

946-
overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, "b1"));
946+
overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, "b1", 1));
947947
channel1.getOwnerAsync(bundle);
948948
channel1.getOwnerAsync(bundle);
949949

0 commit comments

Comments
 (0)