Skip to content

Commit 4ab4463

Browse files
[improve][broker] PIP-192: Add metrics for unload operation (#19749)
PIP: #16691 ### Motivation Raising a PR to implement #16691. We need to support metrics for unload/transfer operations in Load Manager Extension. ### Modifications In this PR: * Change the `findBundlesForUnloading` method return type from `UnloadDecision` to `Set<UnloadDecision>`. * The `UnloadDecision` no longer contains all unload objects. Each unload object has its own reason. * Add units test to verify the unload counter.
1 parent e0098ee commit 4ab4463

File tree

12 files changed

+440
-369
lines changed

12 files changed

+440
-369
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
2222
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
23+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success;
24+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Admin;
2325
import com.google.common.annotations.VisibleForTesting;
2426
import java.io.IOException;
2527
import java.util.ArrayList;
@@ -204,8 +206,8 @@ public void start() throws PulsarServerException {
204206
});
205207
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
206208
this.brokerRegistry.start();
207-
this.unloadManager = new UnloadManager();
208209
this.splitManager = new SplitManager(splitCounter);
210+
this.unloadManager = new UnloadManager(unloadCounter);
209211
this.serviceUnitStateChannel.listen(unloadManager);
210212
this.serviceUnitStateChannel.listen(splitManager);
211213
this.leaderElectionService.start();
@@ -265,7 +267,8 @@ public void start() throws PulsarServerException {
265267
interval, TimeUnit.MILLISECONDS);
266268

267269
this.unloadScheduler = new UnloadScheduler(
268-
pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel);
270+
pulsar, pulsar.getLoadManagerExecutor(), unloadManager,
271+
context, serviceUnitStateChannel, antiAffinityGroupPolicyHelper, unloadCounter, unloadMetrics);
269272
this.unloadScheduler.start();
270273
this.splitScheduler = new SplitScheduler(
271274
pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
@@ -401,16 +404,21 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
401404
log.warn(msg);
402405
throw new IllegalArgumentException(msg);
403406
}
404-
return unloadAsync(new Unload(sourceBroker, bundle.toString(), destinationBroker),
407+
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker);
408+
UnloadDecision unloadDecision =
409+
new UnloadDecision(unload, Success, Admin);
410+
return unloadAsync(unloadDecision,
405411
conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
406412
});
407413
}
408414

409-
private CompletableFuture<Void> unloadAsync(Unload unload,
415+
private CompletableFuture<Void> unloadAsync(UnloadDecision unloadDecision,
410416
long timeout,
411417
TimeUnit timeoutUnit) {
418+
Unload unload = unloadDecision.getUnload();
412419
CompletableFuture<Void> future = serviceUnitStateChannel.publishUnloadEventAsync(unload);
413-
return unloadManager.waitAsync(future, unload.serviceUnit(), timeout, timeoutUnit);
420+
return unloadManager.waitAsync(future, unload.serviceUnit(), unloadDecision, timeout, timeoutUnit)
421+
.thenRun(() -> unloadCounter.updateUnloadBrokerCount(1));
414422
}
415423

416424
@Override

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java

+17-8
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,29 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.extensions.manager;
2020

21+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
22+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
2123
import java.util.Map;
2224
import java.util.concurrent.CompletableFuture;
2325
import java.util.concurrent.ConcurrentHashMap;
2426
import java.util.concurrent.TimeUnit;
2527
import lombok.extern.slf4j.Slf4j;
2628
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
2729
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
30+
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
31+
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
2832

2933
/**
3034
* Unload manager.
3135
*/
3236
@Slf4j
3337
public class UnloadManager implements StateChangeListener {
3438

39+
private final UnloadCounter counter;
3540
private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
3641

37-
public UnloadManager() {
42+
public UnloadManager(UnloadCounter counter) {
43+
this.counter = counter;
3844
this.inFlightUnloadRequest = new ConcurrentHashMap<>();
3945
}
4046

@@ -43,14 +49,8 @@ private void complete(String serviceUnit, Throwable ex) {
4349
if (!future.isDone()) {
4450
if (ex != null) {
4551
future.completeExceptionally(ex);
46-
if (log.isDebugEnabled()) {
47-
log.debug("Complete exceptionally unload bundle: {}", serviceUnit, ex);
48-
}
4952
} else {
5053
future.complete(null);
51-
if (log.isDebugEnabled()) {
52-
log.debug("Complete unload bundle: {}", serviceUnit);
53-
}
5454
}
5555
}
5656
return null;
@@ -59,6 +59,7 @@ private void complete(String serviceUnit, Throwable ex) {
5959

6060
public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,
6161
String bundle,
62+
UnloadDecision decision,
6263
long timeout,
6364
TimeUnit timeoutUnit) {
6465

@@ -74,7 +75,15 @@ public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,
7475
}
7576
});
7677
return future;
77-
}));
78+
})).whenComplete((__, ex) -> {
79+
if (ex != null) {
80+
counter.update(Failure, Unknown);
81+
log.warn("Failed to unload bundle: {}", bundle, ex);
82+
return;
83+
}
84+
log.info("Complete unload bundle: {}", bundle);
85+
counter.update(decision);
86+
});
7887
}
7988

8089
@Override

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java

+53-19
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
2222
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Skip;
2323
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success;
24+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Admin;
2425
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Balanced;
2526
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.CoolDown;
2627
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBrokers;
@@ -30,11 +31,13 @@
3031
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
3132
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
3233
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
34+
import com.google.common.annotations.VisibleForTesting;
3335
import java.util.ArrayList;
3436
import java.util.HashMap;
3537
import java.util.List;
3638
import java.util.Map;
37-
import org.apache.commons.lang3.mutable.MutableLong;
39+
import java.util.concurrent.atomic.AtomicLong;
40+
import lombok.Getter;
3841
import org.apache.pulsar.common.stats.Metrics;
3942

4043
/**
@@ -45,36 +48,63 @@ public class UnloadCounter {
4548
long unloadBrokerCount = 0;
4649
long unloadBundleCount = 0;
4750

48-
final Map<UnloadDecision.Label, Map<UnloadDecision.Reason, MutableLong>> breakdownCounters;
51+
@Getter
52+
@VisibleForTesting
53+
final Map<UnloadDecision.Label, Map<UnloadDecision.Reason, AtomicLong>> breakdownCounters;
4954

55+
@Getter
56+
@VisibleForTesting
5057
double loadAvg;
58+
@Getter
59+
@VisibleForTesting
5160
double loadStd;
5261

62+
private volatile long updatedAt = 0;
63+
5364
public UnloadCounter() {
5465
breakdownCounters = Map.of(
5566
Success, Map.of(
56-
Overloaded, new MutableLong(),
57-
Underloaded, new MutableLong()),
67+
Overloaded, new AtomicLong(),
68+
Underloaded, new AtomicLong(),
69+
Admin, new AtomicLong()),
5870
Skip, Map.of(
59-
Balanced, new MutableLong(),
60-
NoBundles, new MutableLong(),
61-
CoolDown, new MutableLong(),
62-
OutDatedData, new MutableLong(),
63-
NoLoadData, new MutableLong(),
64-
NoBrokers, new MutableLong(),
65-
Unknown, new MutableLong()),
71+
Balanced, new AtomicLong(),
72+
NoBundles, new AtomicLong(),
73+
CoolDown, new AtomicLong(),
74+
OutDatedData, new AtomicLong(),
75+
NoLoadData, new AtomicLong(),
76+
NoBrokers, new AtomicLong(),
77+
Unknown, new AtomicLong()),
6678
Failure, Map.of(
67-
Unknown, new MutableLong())
79+
Unknown, new AtomicLong())
6880
);
6981
}
7082

7183
public void update(UnloadDecision decision) {
72-
var unloads = decision.getUnloads();
73-
unloadBrokerCount += unloads.keySet().size();
74-
unloadBundleCount += unloads.values().size();
75-
breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment();
76-
loadAvg = decision.loadAvg;
77-
loadStd = decision.loadStd;
84+
if (decision.getLabel() == Success) {
85+
unloadBundleCount++;
86+
}
87+
breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet();
88+
updatedAt = System.currentTimeMillis();
89+
}
90+
91+
public void update(UnloadDecision.Label label, UnloadDecision.Reason reason) {
92+
if (label == Success) {
93+
unloadBundleCount++;
94+
}
95+
breakdownCounters.get(label).get(reason).incrementAndGet();
96+
updatedAt = System.currentTimeMillis();
97+
}
98+
99+
public void updateLoadData(double loadAvg, double loadStd) {
100+
this.loadAvg = loadAvg;
101+
this.loadStd = loadStd;
102+
updatedAt = System.currentTimeMillis();
103+
}
104+
105+
public void updateUnloadBrokerCount(int unloadBrokerCount) {
106+
this.unloadBrokerCount += unloadBrokerCount;
107+
updatedAt = System.currentTimeMillis();
78108
}
79109

80110
public List<Metrics> toMetrics(String advertisedBrokerAddress) {
@@ -125,4 +155,8 @@ public List<Metrics> toMetrics(String advertisedBrokerAddress) {
125155

126156
return metrics;
127157
}
128-
}
158+
159+
public long updatedAt() {
160+
return updatedAt;
161+
}
162+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadDecision.java

+11-51
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,21 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.extensions.models;
2020

21-
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
2221
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Skip;
2322
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success;
24-
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Balanced;
25-
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBundles;
26-
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoLoadData;
27-
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
28-
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
29-
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
30-
import com.google.common.collect.ArrayListMultimap;
31-
import com.google.common.collect.Multimap;
23+
import lombok.AllArgsConstructor;
3224
import lombok.Data;
3325

3426
/**
3527
* Defines the information required to unload or transfer a service unit(e.g. bundle).
3628
*/
3729
@Data
30+
@AllArgsConstructor
3831
public class UnloadDecision {
39-
Multimap<String, Unload> unloads;
32+
Unload unload;
4033
Label label;
4134
Reason reason;
42-
Double loadAvg;
43-
Double loadStd;
35+
4436
public enum Label {
4537
Success,
4638
Skip,
@@ -55,62 +47,30 @@ public enum Reason {
5547
OutDatedData,
5648
NoLoadData,
5749
NoBrokers,
50+
Admin,
5851
Unknown
5952
}
6053

6154
public UnloadDecision() {
62-
unloads = ArrayListMultimap.create();
55+
unload = null;
6356
label = null;
6457
reason = null;
65-
loadAvg = null;
66-
loadStd = null;
6758
}
6859

6960
public void clear() {
70-
unloads.clear();
61+
unload = null;
7162
label = null;
7263
reason = null;
73-
loadAvg = null;
74-
loadStd = null;
75-
}
76-
77-
public void skip(int numOfOverloadedBrokers,
78-
int numOfUnderloadedBrokers,
79-
int numOfBrokersWithEmptyLoadData,
80-
int numOfBrokersWithFewBundles) {
81-
label = Skip;
82-
if (numOfOverloadedBrokers == 0 && numOfUnderloadedBrokers == 0) {
83-
reason = Balanced;
84-
} else if (numOfBrokersWithEmptyLoadData > 0) {
85-
reason = NoLoadData;
86-
} else if (numOfBrokersWithFewBundles > 0) {
87-
reason = NoBundles;
88-
} else {
89-
reason = Unknown;
90-
}
9164
}
9265

9366
public void skip(Reason reason) {
9467
label = Skip;
9568
this.reason = reason;
9669
}
9770

98-
public void succeed(
99-
int numOfOverloadedBrokers,
100-
int numOfUnderloadedBrokers) {
101-
102-
label = Success;
103-
if (numOfOverloadedBrokers > numOfUnderloadedBrokers) {
104-
reason = Overloaded;
105-
} else {
106-
reason = Underloaded;
107-
}
108-
}
109-
110-
111-
public void fail() {
112-
label = Failure;
113-
reason = Unknown;
71+
public void succeed(Reason reason) {
72+
this.label = Success;
73+
this.reason = reason;
11474
}
11575

116-
}
76+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java

-3
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ public boolean canUnload(
5454
String bundle,
5555
String srcBroker,
5656
Optional<String> dstBroker) {
57-
58-
59-
6057
try {
6158
var antiAffinityGroupOptional = LoadManagerShared.getNamespaceAntiAffinityGroup(
6259
pulsar, LoadManagerShared.getNamespaceNameFromBundleName(bundle));

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
2020

2121
import java.util.Map;
22+
import java.util.Set;
2223
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
2324
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
2425

@@ -38,8 +39,8 @@ public interface NamespaceUnloadStrategy {
3839
* @param recentlyUnloadedBrokers The recently unloaded brokers.
3940
* @return unloadDecision containing a list of the bundles that should be unloaded.
4041
*/
41-
UnloadDecision findBundlesForUnloading(LoadManagerContext context,
42-
Map<String, Long> recentlyUnloadedBundles,
43-
Map<String, Long> recentlyUnloadedBrokers);
42+
Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
43+
Map<String, Long> recentlyUnloadedBundles,
44+
Map<String, Long> recentlyUnloadedBrokers);
4445

4546
}

0 commit comments

Comments
 (0)