Skip to content

Commit a88e96e

Browse files
authored
[Branch-2.10][Cherry-pick] tidy update subscriptions dispatcher rate-limiter (apache#16778)
1 parent 6aaff4e commit a88e96e

File tree

11 files changed

+354
-134
lines changed

11 files changed

+354
-134
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

+58
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.pulsar.common.policies.data.PublishRate;
6666
import org.apache.pulsar.common.policies.data.RetentionPolicies;
6767
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
68+
import org.apache.pulsar.common.policies.data.SubscribeRate;
6869
import org.apache.pulsar.common.policies.data.TopicPolicies;
6970
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
7071
import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -157,6 +158,10 @@ public AbstractTopic(String topic, BrokerService brokerService) {
157158
this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable();
158159
}
159160

161+
public SubscribeRate getSubscribeRate() {
162+
return this.topicPolicies.getSubscribeRate().get();
163+
}
164+
160165
public DispatchRateImpl getSubscriptionDispatchRate() {
161166
return this.topicPolicies.getSubscriptionDispatchRate().get();
162167
}
@@ -169,6 +174,10 @@ public DispatchRateImpl getReplicatorDispatchRate() {
169174
return this.topicPolicies.getReplicatorDispatchRate().get();
170175
}
171176

177+
public DispatchRateImpl getDispatchRate() {
178+
return this.topicPolicies.getDispatchRate().get();
179+
}
180+
172181
private SchemaCompatibilityStrategy formatSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
173182
return strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy;
174183
}
@@ -204,8 +213,10 @@ protected void updateTopicPolicy(TopicPolicies data) {
204213
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
205214
topicPolicies.getReplicatorDispatchRate().updateTopicValue(normalize(data.getReplicatorDispatchRate()));
206215
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
216+
topicPolicies.getSubscribeRate().updateTopicValue(SubscribeRate.normalize(data.getSubscribeRate()));
207217
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
208218
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
219+
topicPolicies.getDispatchRate().updateTopicValue(normalize(data.getDispatchRate()));
209220
}
210221

211222
protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
@@ -247,9 +258,24 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
247258
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
248259
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
249260
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
261+
updateNamespaceSubscribeRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
250262
updateNamespaceSubscriptionDispatchRate(namespacePolicies,
251263
brokerService.getPulsar().getConfig().getClusterName());
252264
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
265+
updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
266+
}
267+
268+
private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
269+
DispatchRateImpl dispatchRate = namespacePolicies.topicDispatchRate.get(cluster);
270+
if (dispatchRate == null) {
271+
dispatchRate = namespacePolicies.clusterDispatchRate.get(cluster);
272+
}
273+
topicPolicies.getDispatchRate().updateNamespaceValue(normalize(dispatchRate));
274+
}
275+
276+
private void updateNamespaceSubscribeRate(Policies namespacePolicies, String cluster) {
277+
topicPolicies.getSubscribeRate()
278+
.updateNamespaceValue(SubscribeRate.normalize(namespacePolicies.clusterSubscribeRate.get(cluster)));
253279
}
254280

255281
private void updateNamespaceSubscriptionDispatchRate(Policies namespacePolicies, String cluster) {
@@ -335,9 +361,26 @@ private void updateTopicPolicyByBrokerConfig() {
335361
if (isSystemTopic()) {
336362
schemaCompatibilityStrategy = config.getSystemTopicSchemaCompatibilityStrategy();
337363
}
364+
topicPolicies.getSubscribeRate().updateBrokerValue(subscribeRateInBroker(config));
338365
topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(subscriptionDispatchRateInBroker(config));
339366
topicPolicies.getSchemaCompatibilityStrategy()
340367
.updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
368+
topicPolicies.getDispatchRate().updateBrokerValue(dispatchRateInBroker(config));
369+
}
370+
371+
private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) {
372+
return DispatchRateImpl.builder()
373+
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerTopicInMsg())
374+
.dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerTopicInByte())
375+
.ratePeriodInSecond(1)
376+
.build();
377+
}
378+
379+
private SubscribeRate subscribeRateInBroker(ServiceConfiguration config) {
380+
return new SubscribeRate(
381+
config.getSubscribeThrottlingRatePerConsumer(),
382+
config.getSubscribeRatePeriodPerConsumerInSecond()
383+
);
341384
}
342385

343386
private DispatchRateImpl subscriptionDispatchRateInBroker(ServiceConfiguration config) {
@@ -1173,4 +1216,19 @@ public void updateBrokerReplicatorDispatchRate() {
11731216
topicPolicies.getReplicatorDispatchRate().updateBrokerValue(
11741217
replicatorDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
11751218
}
1219+
1220+
public void updateBrokerDispatchRate() {
1221+
topicPolicies.getDispatchRate().updateBrokerValue(
1222+
dispatchRateInBroker(brokerService.pulsar().getConfiguration()));
1223+
}
1224+
1225+
public void updateBrokerPublishRate() {
1226+
topicPolicies.getPublishRate().updateBrokerValue(
1227+
publishRateInBroker(brokerService.pulsar().getConfiguration()));
1228+
}
1229+
1230+
public void updateBrokerSubscribeRate() {
1231+
topicPolicies.getSubscribeRate().updateBrokerValue(
1232+
subscribeRateInBroker(brokerService.pulsar().getConfiguration()));
1233+
}
11761234
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

+38
Original file line numberDiff line numberDiff line change
@@ -2216,6 +2216,21 @@ private void updateConfigurationAndRegisterListeners() {
22162216
(publisherThrottlingTickTimeMillis) -> {
22172217
setupBrokerPublishRateLimiterMonitor();
22182218
});
2219+
2220+
// add listener to update topic publish-rate dynamic config
2221+
registerConfigurationListener("maxPublishRatePerTopicInMessages",
2222+
maxPublishRatePerTopicInMessages -> updateMaxPublishRatePerTopicInMessages()
2223+
);
2224+
registerConfigurationListener("maxPublishRatePerTopicInBytes",
2225+
maxPublishRatePerTopicInMessages -> updateMaxPublishRatePerTopicInMessages()
2226+
);
2227+
2228+
// add listener to update subscribe-rate dynamic config
2229+
registerConfigurationListener("subscribeThrottlingRatePerConsumer",
2230+
subscribeThrottlingRatePerConsumer -> updateSubscribeRate());
2231+
registerConfigurationListener("subscribeRatePeriodPerConsumerInSecond",
2232+
subscribeRatePeriodPerConsumerInSecond -> updateSubscribeRate());
2233+
22192234
// add listener to notify broker publish-rate dynamic config
22202235
registerConfigurationListener("brokerPublisherThrottlingMaxMessageRate",
22212236
(brokerPublisherThrottlingMaxMessageRate) ->
@@ -2252,6 +2267,26 @@ private void updateBrokerDispatchThrottlingMaxRate() {
22522267
}
22532268
}
22542269

2270+
private void updateMaxPublishRatePerTopicInMessages() {
2271+
this.pulsar().getExecutor().submit(() ->
2272+
forEachTopic(topic -> {
2273+
if (topic instanceof AbstractTopic) {
2274+
((AbstractTopic) topic).updateBrokerPublishRate();
2275+
((AbstractTopic) topic).updatePublishDispatcher();
2276+
}
2277+
}));
2278+
}
2279+
2280+
private void updateSubscribeRate() {
2281+
this.pulsar().getExecutor().submit(() ->
2282+
forEachTopic(topic -> {
2283+
if (topic instanceof PersistentTopic) {
2284+
((PersistentTopic) topic).updateBrokerSubscribeRate();
2285+
((PersistentTopic) topic).updateSubscribeRateLimiter();
2286+
}
2287+
}));
2288+
}
2289+
22552290
private void updateBrokerPublisherThrottlingMaxRate() {
22562291
int currentMaxMessageRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate();
22572292
long currentMaxByteRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate();
@@ -2284,6 +2319,9 @@ private void updateTopicMessageDispatchRate() {
22842319
this.pulsar().getExecutor().execute(() -> {
22852320
// update message-rate for each topic
22862321
forEachTopic(topic -> {
2322+
if (topic instanceof AbstractTopic) {
2323+
((AbstractTopic) topic).updateBrokerDispatchRate();
2324+
}
22872325
if (topic.getDispatchRateLimiter().isPresent()) {
22882326
topic.getDispatchRateLimiter().get().updateDispatchRate();
22892327
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java

+5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.TimeUnit;
2626
import org.apache.bookkeeper.mledger.Position;
2727
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
28+
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
2829
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
2930
import org.apache.pulsar.broker.stats.NamespaceStats;
3031
import org.apache.pulsar.client.api.MessageId;
@@ -285,6 +286,10 @@ default Optional<DispatchRateLimiter> getDispatchRateLimiter() {
285286
return Optional.empty();
286287
}
287288

289+
default Optional<SubscribeRateLimiter> getSubscribeRateLimiter() {
290+
return Optional.empty();
291+
}
292+
288293
default Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
289294
return Optional.empty();
290295
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java

+3
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ private DispatchRate createDispatchRate() {
170170
*/
171171
public void updateDispatchRate() {
172172
switch (type) {
173+
case TOPIC:
174+
updateDispatchRate(topic.getDispatchRate());
175+
return;
173176
case SUBSCRIPTION:
174177
updateDispatchRate(topic.getSubscriptionDispatchRate());
175178
return;

0 commit comments

Comments
 (0)