Skip to content

Commit c856344

Browse files
committed
Added SplitManager
1 parent 42c23e6 commit c856344

File tree

7 files changed

+368
-30
lines changed

7 files changed

+368
-30
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
4545
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
4646
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
47+
import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager;
4748
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
4849
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
4950
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
@@ -115,6 +116,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
115116

116117
private UnloadManager unloadManager;
117118

119+
private SplitManager splitManager;
120+
118121
private boolean started = false;
119122

120123
private final AssignCounter assignCounter = new AssignCounter();
@@ -164,7 +167,9 @@ public void start() throws PulsarServerException {
164167
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
165168
this.brokerRegistry.start();
166169
this.unloadManager = new UnloadManager();
170+
this.splitManager = new SplitManager(splitCounter);
167171
this.serviceUnitStateChannel.listen(unloadManager);
172+
this.serviceUnitStateChannel.listen(splitManager);
168173
this.serviceUnitStateChannel.start();
169174

170175
try {
@@ -217,7 +222,7 @@ public void start() throws PulsarServerException {
217222
pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel);
218223
this.unloadScheduler.start();
219224
this.splitScheduler = new SplitScheduler(
220-
pulsar, serviceUnitStateChannel, splitCounter, splitMetrics, context);
225+
pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
221226
this.splitScheduler.start();
222227
this.started = true;
223228
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+1
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,7 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
882882
return null;
883883
});
884884
}
885+
885886
public void handleMetadataSessionEvent(SessionEvent e) {
886887
if (e == SessionReestablished || e == SessionLost) {
887888
lastMetadataSessionEvent = e;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.manager;
20+
21+
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
22+
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
23+
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.TimeUnit;
27+
import lombok.extern.slf4j.Slf4j;
28+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
29+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
30+
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
31+
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
32+
33+
/**
34+
* Split manager.
35+
*/
36+
@Slf4j
37+
public class SplitManager implements StateChangeListener {
38+
39+
record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture<Void> future) {
40+
}
41+
42+
private final Map<String, InFlightSplitRequest> inFlightSplitRequests;
43+
44+
private final SplitCounter counter;
45+
46+
public SplitManager(SplitCounter splitCounter) {
47+
this.inFlightSplitRequests = new ConcurrentHashMap<>();
48+
this.counter = splitCounter;
49+
}
50+
51+
private void complete(String serviceUnit, Throwable ex) {
52+
inFlightSplitRequests.computeIfPresent(serviceUnit, (__, inFlightSplitRequest) -> {
53+
var future = inFlightSplitRequest.future;
54+
if (!future.isDone()) {
55+
if (ex != null) {
56+
counter.update(Failure, Unknown);
57+
future.completeExceptionally(ex);
58+
if (log.isDebugEnabled()) {
59+
log.debug("Complete exceptionally split bundle: {}", serviceUnit, ex);
60+
}
61+
} else {
62+
counter.update(inFlightSplitRequest.splitDecision);
63+
future.complete(null);
64+
if (log.isDebugEnabled()) {
65+
log.debug("Complete split bundle: {}", serviceUnit);
66+
}
67+
}
68+
}
69+
return null;
70+
});
71+
}
72+
73+
public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,
74+
String bundle,
75+
SplitDecision decision,
76+
long timeout,
77+
TimeUnit timeoutUnit) {
78+
79+
return eventPubFuture.thenCompose(__ -> inFlightSplitRequests.computeIfAbsent(bundle, ignore -> {
80+
if (log.isDebugEnabled()) {
81+
log.debug("Handle split bundle: {}, timeout: {} {}", bundle, timeout, timeoutUnit);
82+
}
83+
CompletableFuture<Void> future = new CompletableFuture<>();
84+
future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> {
85+
if (ex != null) {
86+
inFlightSplitRequests.remove(bundle);
87+
log.warn("Failed to wait for split for serviceUnit: {}", bundle, ex);
88+
}
89+
});
90+
return new InFlightSplitRequest(decision, future);
91+
}).future);
92+
}
93+
94+
@Override
95+
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
96+
ServiceUnitState state = ServiceUnitStateData.state(data);
97+
if (t != null && inFlightSplitRequests.containsKey(serviceUnit)) {
98+
this.complete(serviceUnit, t);
99+
return;
100+
}
101+
switch (state) {
102+
case Deleted, Owned, Init -> this.complete(serviceUnit, t);
103+
default -> {
104+
if (log.isDebugEnabled()) {
105+
log.debug("Handling {} for service unit {}", data, serviceUnit);
106+
}
107+
}
108+
}
109+
}
110+
111+
public void close() {
112+
inFlightSplitRequests.forEach((bundle, inFlightSplitRequest) -> {
113+
if (!inFlightSplitRequest.future.isDone()) {
114+
String msg = String.format("Splitting bundle: %s, but the manager already closed.", bundle);
115+
log.warn(msg);
116+
inFlightSplitRequest.future.completeExceptionally(new IllegalStateException(msg));
117+
}
118+
});
119+
inFlightSplitRequests.clear();
120+
}
121+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java

+31-16
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.pulsar.broker.ServiceConfiguration;
3535
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
3636
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
37+
import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager;
3738
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
3839
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
3940
import org.apache.pulsar.broker.loadbalance.extensions.strategy.DefaultNamespaceBundleSplitStrategyImpl;
@@ -61,6 +62,8 @@ public class SplitScheduler implements LoadManagerScheduler {
6162

6263
private final SplitCounter counter;
6364

65+
private final SplitManager splitManager;
66+
6467
private final AtomicReference<List<Metrics>> splitMetrics;
6568

6669
private volatile ScheduledFuture<?> task;
@@ -69,12 +72,14 @@ public class SplitScheduler implements LoadManagerScheduler {
6972

7073
public SplitScheduler(PulsarService pulsar,
7174
ServiceUnitStateChannel serviceUnitStateChannel,
75+
SplitManager splitManager,
7276
SplitCounter counter,
7377
AtomicReference<List<Metrics>> splitMetrics,
7478
LoadManagerContext context,
7579
NamespaceBundleSplitStrategy bundleSplitStrategy) {
7680
this.pulsar = pulsar;
7781
this.loadManagerExecutor = pulsar.getLoadManagerExecutor();
82+
this.splitManager = splitManager;
7883
this.counter = counter;
7984
this.splitMetrics = splitMetrics;
8085
this.context = context;
@@ -85,10 +90,11 @@ public SplitScheduler(PulsarService pulsar,
8590

8691
public SplitScheduler(PulsarService pulsar,
8792
ServiceUnitStateChannel serviceUnitStateChannel,
93+
SplitManager splitManager,
8894
SplitCounter counter,
8995
AtomicReference<List<Metrics>> splitMetrics,
9096
LoadManagerContext context) {
91-
this(pulsar, serviceUnitStateChannel, counter, splitMetrics, context,
97+
this(pulsar, serviceUnitStateChannel, splitManager, counter, splitMetrics, context,
9298
new DefaultNamespaceBundleSplitStrategyImpl(counter));
9399
}
94100

@@ -110,27 +116,36 @@ public void execute() {
110116
synchronized (bundleSplitStrategy) {
111117
final Set<SplitDecision> decisions = bundleSplitStrategy.findBundlesToSplit(context, pulsar);
112118
if (!decisions.isEmpty()) {
119+
120+
// currently following the unloading timeout
121+
var asyncOpTimeoutMs = conf.getNamespaceBundleUnloadingTimeoutMs();
113122
List<CompletableFuture<Void>> futures = new ArrayList<>();
114123
for (SplitDecision decision : decisions) {
115124
if (decision.getLabel() == Success) {
116125
var split = decision.getSplit();
117-
futures.add(serviceUnitStateChannel.publishSplitEventAsync(split)
118-
.whenComplete((__, e) -> {
119-
if (e == null) {
120-
counter.update(decision);
121-
log.info("Published Split Event for {}", split);
122-
} else {
123-
counter.update(Failure, Unknown);
124-
log.error("Failed to publish Split Event for {}", split);
125-
}
126-
}));
126+
futures.add(
127+
splitManager.waitAsync(
128+
serviceUnitStateChannel.publishSplitEventAsync(split)
129+
.whenComplete((__, e) -> {
130+
if (e == null) {
131+
log.info("Published Split Event for {}", split);
132+
} else {
133+
counter.update(Failure, Unknown);
134+
log.error("Failed to publish Split Event for {}", split);
135+
}
136+
}),
137+
split.serviceUnit(),
138+
decision,
139+
asyncOpTimeoutMs, TimeUnit.MILLISECONDS)
140+
);
127141
}
128142
}
129-
FutureUtil.waitForAll(futures).exceptionally(ex -> {
130-
log.error("Failed to wait for split events to persist.", ex);
131-
counter.update(Failure, Unknown);
132-
return null;
133-
});
143+
try {
144+
FutureUtil.waitForAll(futures)
145+
.get(asyncOpTimeoutMs, TimeUnit.MILLISECONDS);
146+
} catch (Throwable e) {
147+
log.error("Failed to wait for split events to persist.", e);
148+
}
134149
} else {
135150
if (debugMode) {
136151
log.info("BundleSplitStrategy returned no bundles to split.");

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java

+1-8
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,7 @@ public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarS
7979
pulsar.getNamespaceService().getNamespaceBundleFactory();
8080

8181
// clean bundleHighTrafficFrequency
82-
var bundleHighTrafficIterator =
83-
bundleHighTrafficFrequency.entrySet().iterator();
84-
while (bundleHighTrafficIterator.hasNext()) {
85-
String bundle = bundleHighTrafficIterator.next().getKey();
86-
if (!bundleStatsMap.containsKey(bundle)) {
87-
bundleHighTrafficIterator.remove();
88-
}
89-
}
82+
bundleHighTrafficFrequency.keySet().retainAll(bundleStatsMap.keySet());
9083

9184
for (var entry : bundleStatsMap.entrySet()) {
9285
final String bundle = entry.getKey();

0 commit comments

Comments
 (0)