Skip to content

Commit 95c4e96

Browse files
committed
[improve][broker] PIP-192 Added namespace unload scheduler
1 parent fd3ce8b commit 95c4e96

File tree

3 files changed

+338
-1
lines changed

3 files changed

+338
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
4040
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
4141
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
42+
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
43+
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.NamespaceUnloadScheduler;
4244
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
4345
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
4446
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
@@ -75,6 +77,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
7577
private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
7678
private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;
7779

80+
private LoadManagerScheduler namespaceUnloadScheduler;
81+
7882
@Getter
7983
private LoadManagerContext context;
8084

@@ -131,7 +135,10 @@ public void start() throws PulsarServerException {
131135
.topBundleLoadDataStore(topBundlesLoadDataStore).build();
132136
// TODO: Start load data reporter.
133137

134-
// TODO: Start unload scheduler and bundle split scheduler
138+
// TODO: Start bundle split scheduler.
139+
this.namespaceUnloadScheduler = new NamespaceUnloadScheduler(
140+
pulsar.getLoadManagerExecutor(), context, serviceUnitStateChannel);
141+
this.namespaceUnloadScheduler.start();
135142
this.started = true;
136143
}
137144

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
20+
21+
import com.google.common.annotations.VisibleForTesting;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.ScheduledFuture;
29+
import java.util.concurrent.TimeUnit;
30+
import lombok.extern.slf4j.Slf4j;
31+
import org.apache.pulsar.broker.ServiceConfiguration;
32+
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
33+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
34+
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
35+
import org.apache.pulsar.common.util.FutureUtil;
36+
import org.apache.pulsar.common.util.Reflections;
37+
38+
@Slf4j
39+
public class NamespaceUnloadScheduler implements LoadManagerScheduler {
40+
41+
private final NamespaceUnloadStrategy namespaceUnloadStrategy;
42+
43+
private final ScheduledExecutorService loadManagerExecutor;
44+
45+
private final LoadManagerContext context;
46+
47+
private final ServiceUnitStateChannel channel;
48+
49+
private final ServiceConfiguration conf;
50+
51+
private volatile ScheduledFuture<?> loadSheddingTask;
52+
53+
private final Map<String, Long> recentlyUnloadedBundles;
54+
55+
private final Map<String, Long> recentlyUnloadedBrokers;
56+
57+
private volatile CompletableFuture<Void> currentRunningFuture = null;
58+
59+
public NamespaceUnloadScheduler(ScheduledExecutorService loadManagerExecutor,
60+
LoadManagerContext context,
61+
ServiceUnitStateChannel channel) {
62+
this(loadManagerExecutor, context, channel, createNamespaceUnloadStrategy(context.brokerConfiguration()));
63+
}
64+
65+
@VisibleForTesting
66+
protected NamespaceUnloadScheduler(ScheduledExecutorService loadManagerExecutor,
67+
LoadManagerContext context,
68+
ServiceUnitStateChannel channel,
69+
NamespaceUnloadStrategy strategy) {
70+
this.namespaceUnloadStrategy = strategy;
71+
this.recentlyUnloadedBundles = new ConcurrentHashMap<>();
72+
this.recentlyUnloadedBrokers = new ConcurrentHashMap<>();
73+
this.loadManagerExecutor = loadManagerExecutor;
74+
this.context = context;
75+
this.conf = context.brokerConfiguration();
76+
this.channel = channel;
77+
}
78+
79+
@Override
80+
public synchronized void execute() {
81+
boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
82+
if (debugMode) {
83+
log.info("Load balancer enabled: {}, Shedding enabled: {}.",
84+
conf.isLoadBalancerEnabled(), conf.isLoadBalancerSheddingEnabled());
85+
}
86+
if (currentRunningFuture != null && !currentRunningFuture.isDone()
87+
|| !(conf.isLoadBalancerEnabled() && conf.isLoadBalancerSheddingEnabled())) {
88+
return;
89+
}
90+
this.currentRunningFuture = channel.isChannelOwnerAsync().thenCompose(isChannelOwner -> {
91+
if (!isChannelOwner) {
92+
return CompletableFuture.completedFuture(null);
93+
}
94+
return context.brokerRegistry().getAvailableBrokersAsync().thenCompose(availableBrokers -> {
95+
if (debugMode) {
96+
log.info("Available brokers: {}", availableBrokers);
97+
}
98+
if (availableBrokers.size() <= 1) {
99+
log.info("Only 1 broker available: no load shedding will be performed");
100+
return CompletableFuture.completedFuture(null);
101+
}
102+
final UnloadDecision unloadDecision = namespaceUnloadStrategy
103+
.findBundlesForUnloading(context, recentlyUnloadedBundles, recentlyUnloadedBrokers);
104+
if (debugMode) {
105+
log.info("[{}] Unload decision result: {}",
106+
namespaceUnloadStrategy.getClass().getSimpleName(), unloadDecision.toString());
107+
}
108+
if (unloadDecision.getUnloads().isEmpty()) {
109+
return CompletableFuture.completedFuture(null);
110+
}
111+
List<CompletableFuture<Void>> futures = new ArrayList<>();
112+
unloadDecision.getUnloads().forEach((broker, unload) -> {
113+
log.info("[{}] Unloading bundle: {}", namespaceUnloadStrategy.getClass().getSimpleName(), unload);
114+
futures.add(channel.publishUnloadEventAsync(unload).thenAccept(__ -> {
115+
recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis());
116+
}));
117+
});
118+
return FutureUtil.waitForAll(futures).exceptionally(ex -> {
119+
log.error("[{}] Namespace unload has exception.", namespaceUnloadStrategy, ex);
120+
return null;
121+
});
122+
});
123+
});
124+
}
125+
126+
@Override
127+
public void start() {
128+
long loadSheddingInterval = TimeUnit.MINUTES
129+
.toMillis(conf.getLoadBalancerSheddingIntervalMinutes());
130+
this.loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
131+
this::execute, loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
132+
}
133+
134+
@Override
135+
public void close() {
136+
if (this.loadSheddingTask != null) {
137+
this.loadSheddingTask.cancel(false);
138+
}
139+
this.recentlyUnloadedBundles.clear();
140+
this.recentlyUnloadedBrokers.clear();
141+
}
142+
143+
private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(ServiceConfiguration conf) {
144+
try {
145+
return Reflections.createInstance(conf.getLoadBalancerLoadSheddingStrategy(), NamespaceUnloadStrategy.class,
146+
Thread.currentThread().getContextClassLoader());
147+
} catch (Exception e) {
148+
log.error("Error when trying to create namespace unload strategy: {}",
149+
conf.getLoadBalancerLoadPlacementStrategy(), e);
150+
}
151+
log.error("create namespace unload strategy failed. using TransferShedder instead.");
152+
return new TransferShedder();
153+
}
154+
155+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
20+
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.eq;
23+
import static org.mockito.Mockito.doAnswer;
24+
import static org.mockito.Mockito.doReturn;
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.times;
27+
import static org.mockito.Mockito.verify;
28+
29+
import com.google.common.collect.Lists;
30+
import org.apache.pulsar.broker.ServiceConfiguration;
31+
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
32+
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
33+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
34+
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
35+
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
36+
import org.apache.pulsar.client.util.ExecutorProvider;
37+
import org.testng.annotations.AfterMethod;
38+
import org.testng.annotations.BeforeMethod;
39+
import org.testng.annotations.Test;
40+
import java.util.concurrent.CompletableFuture;
41+
import java.util.concurrent.CountDownLatch;
42+
import java.util.concurrent.ExecutorService;
43+
import java.util.concurrent.Executors;
44+
import java.util.concurrent.ScheduledExecutorService;
45+
import java.util.concurrent.TimeUnit;
46+
47+
@Test(groups = "broker")
48+
public class NamespaceUnloadSchedulerTest {
49+
50+
private ScheduledExecutorService loadManagerExecutor;
51+
52+
public LoadManagerContext setupContext(){
53+
var ctx = getContext();
54+
ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
55+
return ctx;
56+
}
57+
58+
@BeforeMethod
59+
public void setUp() {
60+
this.loadManagerExecutor = Executors
61+
.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
62+
}
63+
64+
@AfterMethod
65+
public void tearDown() {
66+
this.loadManagerExecutor.shutdown();
67+
}
68+
69+
@Test
70+
public void testExecuteSuccess() {
71+
LoadManagerContext context = setupContext();
72+
BrokerRegistry registry = context.brokerRegistry();
73+
ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
74+
NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
75+
doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
76+
doReturn(CompletableFuture.completedFuture(Lists.newArrayList("broker-1", "broker-2")))
77+
.when(registry).getAvailableBrokersAsync();
78+
doReturn(CompletableFuture.completedFuture(null)).when(channel).publishUnloadEventAsync(any());
79+
UnloadDecision decision = new UnloadDecision();
80+
Unload unload = new Unload("broker-1", "bundle-1");
81+
decision.getUnloads().put("broker-1", unload);
82+
doReturn(decision).when(unloadStrategy).findBundlesForUnloading(any(), any(), any());
83+
84+
NamespaceUnloadScheduler scheduler =
85+
new NamespaceUnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy);
86+
87+
scheduler.execute();
88+
89+
verify(channel, times(1)).publishUnloadEventAsync(eq(unload));
90+
91+
// Test empty unload.
92+
UnloadDecision emptyUnload = new UnloadDecision();
93+
doReturn(emptyUnload).when(unloadStrategy).findBundlesForUnloading(any(), any(), any());
94+
95+
scheduler.execute();
96+
97+
verify(channel, times(1)).publishUnloadEventAsync(eq(unload));
98+
}
99+
100+
@Test
101+
public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedException {
102+
LoadManagerContext context = setupContext();
103+
BrokerRegistry registry = context.brokerRegistry();
104+
ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
105+
NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
106+
doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
107+
doAnswer(__ -> CompletableFuture.supplyAsync(() -> {
108+
try {
109+
// Delay 5 seconds to finish.
110+
TimeUnit.SECONDS.sleep(5);
111+
} catch (InterruptedException e) {
112+
throw new RuntimeException(e);
113+
}
114+
return Lists.newArrayList("broker-1", "broker-2");
115+
}, Executors.newFixedThreadPool(1))).when(registry).getAvailableBrokersAsync();
116+
NamespaceUnloadScheduler scheduler =
117+
new NamespaceUnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy);
118+
119+
ExecutorService executorService = Executors.newFixedThreadPool(10);
120+
CountDownLatch latch = new CountDownLatch(10);
121+
for (int i = 0; i < 10; i++) {
122+
executorService.execute(() -> {
123+
scheduler.execute();
124+
latch.countDown();
125+
});
126+
}
127+
latch.await();
128+
129+
verify(registry, times(1)).getAvailableBrokersAsync();
130+
}
131+
132+
@Test
133+
public void testDisableLoadBalancer() {
134+
LoadManagerContext context = setupContext();
135+
context.brokerConfiguration().setLoadBalancerEnabled(false);
136+
ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
137+
NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
138+
NamespaceUnloadScheduler scheduler =
139+
new NamespaceUnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy);
140+
141+
scheduler.execute();
142+
143+
verify(channel, times(0)).isChannelOwnerAsync();
144+
145+
context.brokerConfiguration().setLoadBalancerEnabled(true);
146+
context.brokerConfiguration().setLoadBalancerSheddingEnabled(false);
147+
scheduler.execute();
148+
149+
verify(channel, times(0)).isChannelOwnerAsync();
150+
}
151+
152+
@Test
153+
public void testNotChannelOwner() {
154+
LoadManagerContext context = setupContext();
155+
context.brokerConfiguration().setLoadBalancerEnabled(false);
156+
ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
157+
NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
158+
NamespaceUnloadScheduler scheduler =
159+
new NamespaceUnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy);
160+
doReturn(CompletableFuture.completedFuture(false)).when(channel).isChannelOwnerAsync();
161+
162+
scheduler.execute();
163+
164+
verify(context.brokerRegistry(), times(0)).getAvailableBrokersAsync();
165+
}
166+
167+
public LoadManagerContext getContext(){
168+
var ctx = mock(LoadManagerContext.class);
169+
var registry = mock(BrokerRegistry.class);
170+
var conf = new ServiceConfiguration();
171+
doReturn(conf).when(ctx).brokerConfiguration();
172+
doReturn(registry).when(ctx).brokerRegistry();
173+
return ctx;
174+
}
175+
}

0 commit comments

Comments
 (0)