Skip to content

Commit

Permalink
[fix][broker] Immediately tombstone Deleted and Free state bundles (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn authored May 21, 2024
1 parent a66ff17 commit 949260f
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataA
}

public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
Optional<String> destinationBroker) {
Optional<String> destinationBroker,
boolean force) {
if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
Expand All @@ -686,7 +687,7 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
log.warn(msg);
throw new IllegalArgumentException(msg);
}
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, true);
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, force);
UnloadDecision unloadDecision =
new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
return unloadAsync(unloadDecision,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
Free, null, data.sourceBroker(), getNextVersionId(data));
unloadFuture = closeServiceUnit(serviceUnit, true);
}
// If the optimized bundle unload is disabled, disconnect the clients at time of RELEASE.
stateChangeListeners.notifyOnCompletion(unloadFuture
.thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
Expand All @@ -866,9 +867,12 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
}

if (isTargetBroker(data.sourceBroker())) {
stateChangeListeners.notifyOnCompletion(
data.force() ? closeServiceUnit(serviceUnit, true)
: CompletableFuture.completedFuture(0), serviceUnit, data)
// If data.force(), try closeServiceUnit and tombstone the bundle.
CompletableFuture<Void> future =
(data.force() ? closeServiceUnit(serviceUnit, true)
.thenCompose(__ -> tombstoneAsync(serviceUnit))
: CompletableFuture.completedFuture(0)).thenApply(__ -> null);
stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
Expand All @@ -880,9 +884,13 @@ private void handleDeleteEvent(String serviceUnit, ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.completeExceptionally(new IllegalStateException(serviceUnit + "has been deleted."));
}
stateChangeListeners.notify(serviceUnit, data, null);

if (isTargetBroker(data.sourceBroker())) {
log(null, serviceUnit, data, null);
stateChangeListeners.notifyOnCompletion(
tombstoneAsync(serviceUnit), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable
return;
}
switch (state) {
case Deleted, Owned, Init -> this.complete(serviceUnit, t);
case Init -> this.complete(serviceUnit, t);
default -> {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {}", data, serviceUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void beforeEvent(String serviceUnit, ServiceUnitStateData data) {
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
ServiceUnitState state = ServiceUnitStateData.state(data);

if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) {
if ((state == Owned || state == Assigning) && StringUtils.isBlank(data.sourceBroker())) {
if (log.isDebugEnabled()) {
log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit);
}
Expand All @@ -195,7 +195,17 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable
}

switch (state) {
case Free, Owned -> complete(serviceUnit, t);
case Free -> {
if (!data.force()) {
complete(serviceUnit, t);
}
}
case Init -> {
if (data.force()) {
complete(serviceUnit, t);
}
}
case Owned -> complete(serviceUnit, t);
case Releasing -> LatencyMetric.RELEASE.endMeasurement(serviceUnit);
case Assigning -> LatencyMetric.ASSIGN.endMeasurement(serviceUnit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
.unloadNamespaceBundleAsync(bundle, destinationBroker, false);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
Expand Down Expand Up @@ -1286,7 +1286,8 @@ public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBun
CompletableFuture<Void> future;
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
future = extensibleLoadManager.unloadNamespaceBundleAsync(
nsBundle, Optional.empty(), true);
} else {
future = ownershipCache.removeOwnership(nsBundle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,8 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle)

@Test(timeOut = 30 * 1000)
public void testSplitBundleAdminAPI() throws Exception {
String namespace = defaultTestNamespace;
final String namespace = "public/testSplitBundleAdminAPI";
admin.namespaces().createNamespace(namespace, 1);
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-split");
TopicName topicName = topicAndBundle.getLeft();
admin.topics().createPartitionedTopic(topicName.toString(), 10);
Expand Down Expand Up @@ -793,6 +794,30 @@ public boolean test(NamespaceBundle namespaceBundle) {
} catch (PulsarAdminException ex) {
assertTrue(ex.getMessage().contains("Invalid bundle range"));
}


// delete and retry
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
admin.namespaces().deleteNamespace(namespace);
});
admin.namespaces().createNamespace(namespace, 1);
admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null);

Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
BundlesData bundlesData = admin.namespaces().getBundles(namespace);
assertEquals(bundlesData.getNumBundles(), numBundles + 1);
String lowBundle = String.format("0x%08x", bundleRanges.get(0));
String midBundle = String.format("0x%08x", mid);
String highBundle = String.format("0x%08x", bundleRanges.get(1));
assertTrue(bundlesData.getBoundaries().contains(lowBundle));
assertTrue(bundlesData.getBoundaries().contains(midBundle));
assertTrue(bundlesData.getBoundaries().contains(highBundle));
assertEquals(splitCount.get(), 2);
});
}

@Test(timeOut = 30 * 1000)
Expand Down Expand Up @@ -1656,7 +1681,11 @@ public void testTryAcquiringOwnership()
NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get();
assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl())
.contains(namespaceEphemeralData.getNativeUrl()));
admin.namespaces().deleteNamespace(namespace);
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
admin.namespaces().deleteNamespace(namespace, true);
});
}

@Test(timeOut = 30 * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,11 +576,11 @@ public void splitAndRetryTest() throws Exception {
childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()));
channel1.publishSplitEventAsync(split);

waitUntilState(channel1, bundle, Deleted);
waitUntilState(channel2, bundle, Deleted);
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);

validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0);
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
// Verify the retry count
Expand Down Expand Up @@ -620,7 +620,7 @@ public void splitAndRetryTest() throws Exception {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
validateMonitorCounters(leader,
0,
1,
0,
0,
0,
0,
Expand Down Expand Up @@ -1236,15 +1236,15 @@ public void splitTestWhenProducerFails()

var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;

waitUntilStateWithMonitor(leader, bundle, Deleted);
waitUntilStateWithMonitor(channel1, bundle, Deleted);
waitUntilStateWithMonitor(channel2, bundle, Deleted);
waitUntilStateWithMonitor(leader, bundle, Init);
waitUntilStateWithMonitor(channel1, bundle, Init);
waitUntilStateWithMonitor(channel2, bundle, Init);

var ownerAddr1 = channel1.getOwnerAsync(bundle);
var ownerAddr2 = channel2.getOwnerAsync(bundle);

assertTrue(ownerAddr1.isCompletedExceptionally());
assertTrue(ownerAddr2.isCompletedExceptionally());
assertTrue(ownerAddr1.get().isEmpty());
assertTrue(ownerAddr2.get().isEmpty());


FieldUtils.writeDeclaredField(channel1,
Expand Down Expand Up @@ -1428,13 +1428,15 @@ public void splitAndRetryFailureTest() throws Exception {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
((ServiceUnitStateChannelImpl) leader)
.monitorOwnerships(List.of(brokerId1, brokerId2));
waitUntilState(leader, bundle3, Deleted);
waitUntilState(channel1, bundle3, Deleted);
waitUntilState(channel2, bundle3, Deleted);

waitUntilState(leader, bundle3, Init);
waitUntilState(channel1, bundle3, Init);
waitUntilState(channel2, bundle3, Init);


validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 1, 0);

validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 1, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 1, 0, 1, 0);
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);

Expand Down Expand Up @@ -1464,7 +1466,7 @@ public void splitAndRetryFailureTest() throws Exception {

validateMonitorCounters(leader,
0,
1,
0,
1,
0,
0,
Expand Down Expand Up @@ -1542,7 +1544,7 @@ public void testOverrideInactiveBrokerStateData()
waitUntilNewOwner(channel2, ownedBundle, brokerId2);
assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());

// clean-up
FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true);
Expand Down Expand Up @@ -1605,7 +1607,7 @@ public void testOverrideOrphanStateData()
waitUntilNewOwner(channel2, ownedBundle, broker);
assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());

// clean-up
FieldUtils.writeDeclaredField(channel1,
Expand Down Expand Up @@ -1663,8 +1665,10 @@ public void testActiveGetOwner() throws Exception {
"inFlightStateWaitingTimeInMillis", 20 * 1000, true);
start = System.currentTimeMillis();
assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
assertTrue(System.currentTimeMillis() - start < 20_000);
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);

assertTrue(System.currentTimeMillis() - start < 20_000);
// simulate ownership cleanup(brokerId1 selected owner) by the leader channel
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,40 +123,23 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 1);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(inFlightUnloadRequests.size(), 0);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
assertEquals(inFlightUnloadRequests.size(), 1);

// Success with Init state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
bundle, decision, 5, TimeUnit.SECONDS);
inFlightUnloadRequests = getinFlightUnloadRequests(manager);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 1);

// Success with Init state.
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 0);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
future.get();

// Success with Owned state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
bundle, decision, 5, TimeUnit.SECONDS);
inFlightUnloadRequests = getinFlightUnloadRequests(manager);
assertEquals(inFlightUnloadRequests.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 0);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
future.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,15 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null);
new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, true, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

// Success with Init state.
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null);
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, false, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, true, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);
Expand All @@ -136,17 +140,30 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
bundle, unloadDecision, 5, TimeUnit.SECONDS);
inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);

future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2);

// Success with Free state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
bundle, unloadDecision, 5, TimeUnit.SECONDS);
inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
assertEquals(inFlightUnloadRequestMap.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, true, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, false, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 3);


}

@Test
Expand Down

0 comments on commit 949260f

Please sign in to comment.