Skip to content

Commit a7000c5

Browse files
committed
repeat metadata and cache update
1 parent 72fcb47 commit a7000c5

File tree

2 files changed

+42
-39
lines changed

2 files changed

+42
-39
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+41-38
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.apache.pulsar.common.naming.NamespaceBundle;
9090
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
9191
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
92+
import org.apache.pulsar.common.naming.NamespaceBundles;
9293
import org.apache.pulsar.common.naming.NamespaceName;
9394
import org.apache.pulsar.common.naming.TopicDomain;
9495
import org.apache.pulsar.common.naming.TopicName;
@@ -852,8 +853,10 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
852853
long startTime,
853854
CompletableFuture<Void> completionFuture) {
854855
ownChildBundles(childBundles, parentData)
855-
.thenCompose(__ -> updateSplitNamespaceBundlesAsync(
856+
.thenCompose(__ -> getSplitNamespaceBundles(
856857
namespaceService, bundleFactory, algorithm, parentBundle, childBundles, boundaries))
858+
.thenCompose(namespaceBundles -> updateSplitNamespaceBundlesAsync(
859+
namespaceService, bundleFactory, parentBundle, namespaceBundles))
857860
.thenAccept(__ -> // Update bundled_topic cache for load-report-generation
858861
pulsar.getBrokerService().refreshTopicToStatsMaps(parentBundle))
859862
.thenAccept(__ -> pubAsync(parentBundle.toString(), new ServiceUnitStateData(
@@ -913,69 +916,69 @@ private CompletableFuture<Void> ownChildBundles(List<NamespaceBundle> childBundl
913916
}
914917
}
915918

916-
private CompletableFuture<Void> updateSplitNamespaceBundlesAsync(
917-
NamespaceService namespaceService,
918-
NamespaceBundleFactory bundleFactory,
919-
NamespaceBundleSplitAlgorithm algorithm,
920-
NamespaceBundle parentBundle,
921-
List<NamespaceBundle> childBundles,
922-
List<Long> boundaries) {
923-
CompletableFuture<Void> updateSplitNamespaceBundlesFuture = new CompletableFuture<>();
924-
var namespaceName = parentBundle.getNamespaceObject();
919+
private CompletableFuture<NamespaceBundles> getSplitNamespaceBundles(NamespaceService namespaceService,
920+
NamespaceBundleFactory bundleFactory,
921+
NamespaceBundleSplitAlgorithm algorithm,
922+
NamespaceBundle parentBundle,
923+
List<NamespaceBundle> childBundles,
924+
List<Long> boundaries) {
925+
CompletableFuture future = new CompletableFuture();
925926
final var debug = debug();
926927
var targetNsBundle = bundleFactory.getBundles(parentBundle.getNamespaceObject());
927-
boolean updated = false;
928+
boolean found = false;
928929
try {
929930
targetNsBundle.validateBundle(parentBundle);
930931
} catch (IllegalArgumentException e) {
931932
if (debug) {
932933
log.info("Namespace bundles do not contain the parent bundle:{}",
933934
parentBundle);
934935
}
935-
for (var childBundle : childBundles){
936+
for (var childBundle : childBundles) {
936937
try {
937938
targetNsBundle.validateBundle(childBundle);
938939
if (debug) {
939940
log.info("Namespace bundles contain the child bundle:{}",
940941
childBundle);
941942
}
942943
} catch (Exception ex) {
943-
updateSplitNamespaceBundlesFuture.completeExceptionally(
944+
future.completeExceptionally(
944945
new BrokerServiceException.ServiceUnitNotReadyException(
945946
"Namespace bundles do not contain the child bundle:" + childBundle, e));
946-
return updateSplitNamespaceBundlesFuture;
947+
return future;
947948
}
948949
}
949-
updated = true;
950+
found = true;
950951
} catch (Exception e) {
951-
updateSplitNamespaceBundlesFuture.completeExceptionally(
952+
future.completeExceptionally(
952953
new BrokerServiceException.ServiceUnitNotReadyException(
953954
"Failed to validate the parent bundle in the namespace bundles.", e));
954-
return updateSplitNamespaceBundlesFuture;
955+
return future;
955956
}
956-
if (updated) {
957-
updateSplitNamespaceBundlesFuture.complete(null);
957+
if (found) {
958+
future.complete(targetNsBundle);
959+
return future;
958960
} else {
959-
namespaceService.getSplitBoundary(parentBundle, algorithm, boundaries)
960-
.thenApply(splitBundlesPair -> splitBundlesPair.getLeft())
961-
.thenCompose(splitNamespaceBundles ->
962-
namespaceService.updateNamespaceBundles(
963-
namespaceName, splitNamespaceBundles)
964-
.thenCompose(__ -> namespaceService.updateNamespaceBundlesForPolicies(
965-
namespaceName, splitNamespaceBundles)))
966-
.thenAccept(__ -> {
967-
bundleFactory.invalidateBundleCache(parentBundle.getNamespaceObject());
968-
if (debug) {
969-
log.info("Successfully updated split namespace bundles and namespace bundle cache.");
970-
}
971-
updateSplitNamespaceBundlesFuture.complete(null);
972-
})
973-
.exceptionally(ex -> {
974-
updateSplitNamespaceBundlesFuture.completeExceptionally(ex);
975-
return null;
976-
});
961+
return namespaceService.getSplitBoundary(parentBundle, algorithm, boundaries)
962+
.thenApply(splitBundlesPair -> splitBundlesPair.getLeft());
977963
}
978-
return updateSplitNamespaceBundlesFuture;
964+
}
965+
966+
private CompletableFuture<Void> updateSplitNamespaceBundlesAsync(
967+
NamespaceService namespaceService,
968+
NamespaceBundleFactory bundleFactory,
969+
NamespaceBundle parentBundle,
970+
NamespaceBundles splitNamespaceBundles) {
971+
var namespaceName = parentBundle.getNamespaceObject();
972+
return namespaceService.updateNamespaceBundles(
973+
namespaceName, splitNamespaceBundles)
974+
.thenCompose(__ -> namespaceService.updateNamespaceBundlesForPolicies(
975+
namespaceName, splitNamespaceBundles))
976+
.thenAccept(__ -> {
977+
bundleFactory.invalidateBundleCache(parentBundle.getNamespaceObject());
978+
if (debug()) {
979+
log.info("Successfully updated split namespace bundles and namespace bundle cache.");
980+
}
981+
});
979982
}
980983

981984
public void handleMetadataSessionEvent(SessionEvent e) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1316,7 +1316,7 @@ public void splitAndRetryFailureTest() throws Exception {
13161316
doReturn(CompletableFuture.completedFuture(List.of("test-topic-1", "test-topic-2")))
13171317
.when(namespaceService).getOwnedTopicListForNamespaceBundle(any());
13181318
return future;
1319-
}).when(namespaceService).updateNamespaceBundles(any(), any());
1319+
}).when(namespaceService).updateNamespaceBundlesForPolicies(any(), any());
13201320
doReturn(namespaceService).when(pulsar1).getNamespaceService();
13211321
doReturn(CompletableFuture.completedFuture(List.of("test-topic-1", "test-topic-2")))
13221322
.when(namespaceService).getOwnedTopicListForNamespaceBundle(any());

0 commit comments

Comments
 (0)