@@ -58,12 +58,14 @@ public class DefaultNamespaceBundleSplitStrategyImpl implements NamespaceBundleS
58
58
private final Set <SplitDecision > decisionCache ;
59
59
private final Map <String , Integer > namespaceBundleCount ;
60
60
private final Map <String , Integer > splitConditionHitCounts ;
61
+ private final Set <String > splittingBundles ;
61
62
private final SplitCounter counter ;
62
63
63
64
public DefaultNamespaceBundleSplitStrategyImpl (SplitCounter counter ) {
64
65
decisionCache = new HashSet <>();
65
66
namespaceBundleCount = new HashMap <>();
66
67
splitConditionHitCounts = new HashMap <>();
68
+ splittingBundles = new HashSet <>();
67
69
this .counter = counter ;
68
70
69
71
}
@@ -72,6 +74,7 @@ public DefaultNamespaceBundleSplitStrategyImpl(SplitCounter counter) {
72
74
public Set <SplitDecision > findBundlesToSplit (LoadManagerContext context , PulsarService pulsar ) {
73
75
decisionCache .clear ();
74
76
namespaceBundleCount .clear ();
77
+ splittingBundles .clear ();
75
78
final ServiceConfiguration conf = pulsar .getConfiguration ();
76
79
int maxBundleCount = conf .getLoadBalancerNamespaceMaximumBundles ();
77
80
long maxBundleTopics = conf .getLoadBalancerNamespaceBundleMaxTopics ();
@@ -83,6 +86,13 @@ public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarS
83
86
boolean debug = log .isDebugEnabled () || conf .isLoadBalancerDebugModeEnabled ();
84
87
var channel = ServiceUnitStateChannelImpl .get (pulsar );
85
88
89
+ for (var etr : channel .getOwnershipEntrySet ()) {
90
+ var eData = etr .getValue ();
91
+ if (eData .state () == ServiceUnitState .Splitting ) {
92
+ splittingBundles .add (etr .getKey ());
93
+ }
94
+ }
95
+
86
96
Map <String , NamespaceBundleStats > bundleStatsMap = pulsar .getBrokerService ().getBundleStats ();
87
97
NamespaceBundleFactory namespaceBundleFactory =
88
98
pulsar .getNamespaceService ().getNamespaceBundleFactory ();
@@ -185,15 +195,14 @@ public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarS
185
195
186
196
var ranges = bundleRange .split ("_" );
187
197
var foundSplittingBundle = false ;
188
- for (var etr : channel .getOwnershipEntrySet ()) {
189
- var eBundle = etr .getKey ();
190
- var eData = etr .getValue ();
191
- if (eData .state () == ServiceUnitState .Splitting && eBundle .startsWith (namespace )) {
192
- final String eRange = LoadManagerShared .getBundleRangeFromBundleName (eBundle );
193
- if (eRange .startsWith (ranges [0 ]) || eRange .endsWith (ranges [1 ])) {
198
+ for (var splittingBundle : splittingBundles ) {
199
+ if (splittingBundle .startsWith (namespace )) {
200
+ final String splittingBundleRange = LoadManagerShared .getBundleRangeFromBundleName (splittingBundle );
201
+ if (splittingBundleRange .startsWith (ranges [0 ])
202
+ || splittingBundleRange .endsWith (ranges [1 ])) {
194
203
if (debug ) {
195
204
log .info (String .format (CANNOT_SPLIT_BUNDLE_MSG
196
- + " (parent) bundle:%s is in Splitting state." , bundle , eBundle ));
205
+ + " (parent) bundle:%s is in Splitting state." , bundle , splittingBundle ));
197
206
}
198
207
foundSplittingBundle = true ;
199
208
break ;
0 commit comments