Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192: Added VersionId in ServiceUnitStateData #19620

Merged
merged 2 commits into from
Mar 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -103,7 +103,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
NamespaceName.SYSTEM_NAMESPACE,
"loadbalancer-service-unit-state").toString();
private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30 * 1000; // 30sec

public static final long VERSION_ID_INIT = 1; // initial versionId
private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60;
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately
@@ -451,11 +451,25 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
}
}

private long getNextVersionId(String serviceUnit) {
var data = tableview.get(serviceUnit);
return getNextVersionId(data);
}

private long getNextVersionId(ServiceUnitStateData data) {
return data == null ? VERSION_ID_INIT : data.versionId() + 1;
}

public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, String broker) {
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
new IllegalStateException("Invalid channel state:" + channelState.name()));
}
EventType eventType = Assign;
eventCounters.get(eventType).getTotal().incrementAndGet();
CompletableFuture<String> getOwnerRequest = deferGetOwnerRequest(serviceUnit);
pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker))

pubAsync(serviceUnit, new ServiceUnitStateData(Assigning, broker, getNextVersionId(serviceUnit)))
.whenComplete((__, ex) -> {
if (ex != null) {
getOwnerRequests.remove(serviceUnit, getOwnerRequest);
@@ -469,16 +483,20 @@ public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, Str
}

public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
new IllegalStateException("Invalid channel state:" + channelState.name()));
}
EventType eventType = Unload;
eventCounters.get(eventType).getTotal().incrementAndGet();
String serviceUnit = unload.serviceUnit();
CompletableFuture<MessageId> future;
if (isTransferCommand(unload)) {
future = pubAsync(serviceUnit, new ServiceUnitStateData(
Assigning, unload.destBroker().get(), unload.sourceBroker()));
Assigning, unload.destBroker().get(), unload.sourceBroker(), getNextVersionId(serviceUnit)));
} else {
future = pubAsync(serviceUnit, new ServiceUnitStateData(
Releasing, unload.sourceBroker()));
Releasing, unload.sourceBroker(), getNextVersionId(serviceUnit)));
}

return future.whenComplete((__, ex) -> {
@@ -489,10 +507,15 @@ public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
}

public CompletableFuture<Void> publishSplitEventAsync(Split split) {
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
new IllegalStateException("Invalid channel state:" + channelState.name()));
}
EventType eventType = Split;
eventCounters.get(eventType).getTotal().incrementAndGet();
String serviceUnit = split.serviceUnit();
ServiceUnitStateData next = new ServiceUnitStateData(Splitting, split.sourceBroker());
ServiceUnitStateData next =
new ServiceUnitStateData(Splitting, split.sourceBroker(), getNextVersionId(serviceUnit));
return pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
if (ex != null) {
eventCounters.get(eventType).getFailure().incrementAndGet();
@@ -599,7 +622,8 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
if (isTargetBroker(data.broker())) {
ServiceUnitStateData next = new ServiceUnitStateData(
isTransferCommand(data) ? Releasing : Owned, data.broker(), data.sourceBroker());
isTransferCommand(data) ? Releasing : Owned, data.broker(), data.sourceBroker(),
getNextVersionId(data));
pubAsync(serviceUnit, next)
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
}
@@ -608,15 +632,16 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
if (isTransferCommand(data)) {
if (isTargetBroker(data.sourceBroker())) {
ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
ServiceUnitStateData next =
new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker(), getNextVersionId(data));
// TODO: when close, pass message to clients to connect to the new broker
closeServiceUnit(serviceUnit)
.thenCompose(__ -> pubAsync(serviceUnit, next))
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
}
} else {
if (isTargetBroker(data.broker())) {
ServiceUnitStateData next = new ServiceUnitStateData(Free, data.broker());
ServiceUnitStateData next = new ServiceUnitStateData(Free, data.broker(), getNextVersionId(data));
closeServiceUnit(serviceUnit)
.thenCompose(__ -> pubAsync(serviceUnit, next))
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
@@ -660,10 +685,6 @@ private void handleInitEvent(String serviceUnit) {
}

private CompletableFuture<MessageId> pubAsync(String serviceUnit, ServiceUnitStateData data) {
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
new IllegalStateException("Invalid channel state:" + channelState.name()));
}
CompletableFuture<MessageId> future = new CompletableFuture<>();
producer.newMessage()
.key(serviceUnit)
@@ -774,7 +795,7 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
return;
}
ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker());
ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), VERSION_ID_INIT);
NamespaceBundles targetNsBundle = splitBundlesPair.getLeft();
List<NamespaceBundle> splitBundles = Collections.unmodifiableList(splitBundlesPair.getRight());
List<NamespaceBundle> successPublishedBundles =
@@ -812,7 +833,8 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService,

updateFuture.thenAccept(r -> {
// Delete the old bundle
pubAsync(serviceUnit, new ServiceUnitStateData(Deleted, data.broker())).thenRun(() -> {
pubAsync(serviceUnit, new ServiceUnitStateData(Deleted, data.broker(), getNextVersionId(data)))
.thenRun(() -> {
// Update bundled_topic cache for load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
// TODO: Update the load data immediately if needed.
@@ -938,7 +960,7 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa

Optional<String> selectedBroker = brokerSelector.select(availableBrokers, null, getContext());
if (selectedBroker.isPresent()) {
var override = new ServiceUnitStateData(Owned, selectedBroker.get(), true);
var override = new ServiceUnitStateData(Owned, selectedBroker.get(), true, getNextVersionId(orphanData));
log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
serviceUnit, orphanData, override);
pubAsync(serviceUnit, override).whenComplete((__, e) -> {
@@ -1007,19 +1029,20 @@ private void doCleanup(String broker) throws ExecutionException, InterruptedExce
private Optional<ServiceUnitStateData> getOverrideStateData(String serviceUnit, ServiceUnitStateData orphanData,
Set<String> availableBrokers,
LoadManagerContext context) {
long nextVersionId = getNextVersionId(orphanData);
if (isTransferCommand(orphanData)) {
// rollback to the src
return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true));
return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true, nextVersionId));
} else if (orphanData.state() == Assigning) { // assign
// roll-forward to another broker
Optional<String> selectedBroker = brokerSelector.select(availableBrokers, null, context);
if (selectedBroker.isEmpty()) {
return Optional.empty();
}
return Optional.of(new ServiceUnitStateData(Owned, selectedBroker.get(), true));
return Optional.of(new ServiceUnitStateData(Owned, selectedBroker.get(), true, nextVersionId));
} else if (orphanData.state() == Splitting || orphanData.state() == Releasing) {
// rollback to the target broker for split and unload
return Optional.of(new ServiceUnitStateData(Owned, orphanData.broker(), true));
return Optional.of(new ServiceUnitStateData(Owned, orphanData.broker(), true, nextVersionId));
} else {
var msg = String.format("Failed to get the overrideStateData from serviceUnit=%s, orphanData=%s",
serviceUnit, orphanData);
Original file line number Diff line number Diff line change
@@ -50,14 +50,19 @@ public void checkBrokers(boolean check) {
public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to) {
if (to == null) {
return false;
} else if (to.force()) {
return false;
}

// Skip the compaction case where from = null and to.versionId > 1
if (from != null && from.versionId() + 1 != to.versionId()) {
return true;
}
Comment on lines +56 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to filter out the invalidate data when reading from the topic? Otherwise, we might get an inconsistent data view with the compacted data.

Copy link
Contributor Author

@heesung-sn heesung-sn Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when reading msgs from the table views or strategic compaction, we do apply this same strategy to filter out invalid states.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it. The table view also apply this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.


if (to.force()) {
return false;
}

ServiceUnitState prevState = state(from);
ServiceUnitState state = state(to);

if (!ServiceUnitState.isValidTransition(prevState, state)) {
return true;
}
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.channel;


import java.util.Objects;
import org.apache.commons.lang3.StringUtils;

@@ -28,7 +27,7 @@
*/

public record ServiceUnitStateData(
ServiceUnitState state, String broker, String sourceBroker, boolean force, long timestamp) {
ServiceUnitState state, String broker, String sourceBroker, boolean force, long timestamp, long versionId) {

public ServiceUnitStateData {
Objects.requireNonNull(state);
@@ -37,16 +36,16 @@ public record ServiceUnitStateData(
}
}

public ServiceUnitStateData(ServiceUnitState state, String broker, String sourceBroker) {
this(state, broker, sourceBroker, false, System.currentTimeMillis());
public ServiceUnitStateData(ServiceUnitState state, String broker, String sourceBroker, long versionId) {
this(state, broker, sourceBroker, false, System.currentTimeMillis(), versionId);
}

public ServiceUnitStateData(ServiceUnitState state, String broker) {
this(state, broker, null, false, System.currentTimeMillis());
public ServiceUnitStateData(ServiceUnitState state, String broker, long versionId) {
this(state, broker, null, false, System.currentTimeMillis(), versionId);
}

public ServiceUnitStateData(ServiceUnitState state, String broker, boolean force) {
this(state, broker, null, force, System.currentTimeMillis());
public ServiceUnitStateData(ServiceUnitState state, String broker, boolean force, long versionId) {
this(state, broker, null, force, System.currentTimeMillis(), versionId);
}

public static ServiceUnitState state(ServiceUnitStateData data) {
Original file line number Diff line number Diff line change
@@ -924,26 +924,26 @@ public void conflictAndCompactionTest() throws ExecutionException, InterruptedEx
@Test(priority = 11)
public void ownerLookupCountTests() throws IllegalAccessException {

overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, "b1"));
overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, "b1", 1));
channel1.getOwnerAsync(bundle);
channel1.getOwnerAsync(bundle);

overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, "b1"));
overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, "b1", 1));
channel1.getOwnerAsync(bundle);
channel1.getOwnerAsync(bundle);
channel1.getOwnerAsync(bundle);

overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, "b1"));
overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, "b1", 1));
channel1.getOwnerAsync(bundle);
channel1.getOwnerAsync(bundle);

overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, "b1"));
overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, "b1", 1));
channel1.getOwnerAsync(bundle);

overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, "b1"));
overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, "b1", 1));
channel1.getOwnerAsync(bundle);

overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, "b1"));
overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, "b1", 1));
channel1.getOwnerAsync(bundle);
channel1.getOwnerAsync(bundle);

Loading