@@ -58,12 +58,14 @@ public class DefaultNamespaceBundleSplitStrategyImpl implements NamespaceBundleS
58
58
private final Set <SplitDecision > decisionCache ;
59
59
private final Map <String , Integer > namespaceBundleCount ;
60
60
private final Map <String , Integer > splitConditionHitCounts ;
61
+ private final Map <String , String > splittingBundles ;
61
62
private final SplitCounter counter ;
62
63
63
64
public DefaultNamespaceBundleSplitStrategyImpl (SplitCounter counter ) {
64
65
decisionCache = new HashSet <>();
65
66
namespaceBundleCount = new HashMap <>();
66
67
splitConditionHitCounts = new HashMap <>();
68
+ splittingBundles = new HashMap <>();
67
69
this .counter = counter ;
68
70
69
71
}
@@ -72,6 +74,7 @@ public DefaultNamespaceBundleSplitStrategyImpl(SplitCounter counter) {
72
74
public Set <SplitDecision > findBundlesToSplit (LoadManagerContext context , PulsarService pulsar ) {
73
75
decisionCache .clear ();
74
76
namespaceBundleCount .clear ();
77
+ splittingBundles .clear ();
75
78
final ServiceConfiguration conf = pulsar .getConfiguration ();
76
79
int maxBundleCount = conf .getLoadBalancerNamespaceMaximumBundles ();
77
80
long maxBundleTopics = conf .getLoadBalancerNamespaceBundleMaxTopics ();
@@ -83,6 +86,15 @@ public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarS
83
86
boolean debug = log .isDebugEnabled () || conf .isLoadBalancerDebugModeEnabled ();
84
87
var channel = ServiceUnitStateChannelImpl .get (pulsar );
85
88
89
+ for (var etr : channel .getOwnershipEntrySet ()) {
90
+ var eData = etr .getValue ();
91
+ if (eData .state () == ServiceUnitState .Splitting ) {
92
+ String bundle = etr .getKey ();
93
+ final String bundleRange = LoadManagerShared .getBundleRangeFromBundleName (bundle );
94
+ splittingBundles .put (bundle , bundleRange );
95
+ }
96
+ }
97
+
86
98
Map <String , NamespaceBundleStats > bundleStatsMap = pulsar .getBrokerService ().getBundleStats ();
87
99
NamespaceBundleFactory namespaceBundleFactory =
88
100
pulsar .getNamespaceService ().getNamespaceBundleFactory ();
@@ -185,15 +197,15 @@ public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarS
185
197
186
198
var ranges = bundleRange .split ("_" );
187
199
var foundSplittingBundle = false ;
188
- for (var etr : channel . getOwnershipEntrySet ()) {
189
- var eBundle = etr .getKey ();
190
- var eData = etr . getValue ();
191
- if ( eData . state () == ServiceUnitState . Splitting && eBundle . startsWith ( namespace )) {
192
- final String eRange = LoadManagerShared . getBundleRangeFromBundleName ( eBundle );
193
- if ( eRange . startsWith ( ranges [ 0 ]) || eRange .endsWith (ranges [1 ])) {
200
+ for (var etr : splittingBundles . entrySet ()) {
201
+ var splittingBundle = etr .getKey ();
202
+ if ( splittingBundle . startsWith ( namespace )) {
203
+ var splittingBundleRange = etr . getValue ();
204
+ if ( splittingBundleRange . startsWith ( ranges [ 0 ])
205
+ || splittingBundleRange .endsWith (ranges [1 ])) {
194
206
if (debug ) {
195
207
log .info (String .format (CANNOT_SPLIT_BUNDLE_MSG
196
- + " (parent) bundle:%s is in Splitting state." , bundle , eBundle ));
208
+ + " (parent) bundle:%s is in Splitting state." , bundle , splittingBundle ));
197
209
}
198
210
foundSplittingBundle = true ;
199
211
break ;
0 commit comments