Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] Support configuration to rate-limit dispatching on batch message #12294

Merged
merged 1 commit into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ dispatchThrottlingRatePerTopicInMsg=0
# default message-byte dispatch-throttling
dispatchThrottlingRatePerTopicInByte=0

# Apply dispatch rate limiting on batch message instead individual
# messages with in batch message. (Default is disabled)
dispatchThrottlingOnBatchMessageEnabled=false

# Default number of message dispatching throttling-limit for a subscription.
# Using a value of 0, is disabling default message dispatch-throttling.
dispatchThrottlingRatePerSubscriptionInMsg=0
Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ dispatchThrottlingRatePerTopicInMsg=0
# default message-byte dispatch-throttling
dispatchThrottlingRatePerTopicInByte=0

# Apply dispatch rate limiting on batch message instead individual
# messages with in batch message. (Default is disabled)
dispatchThrottlingOnBatchMessageEnabled=false

# Dispatch rate-limiting relative to publish rate.
# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate:
# throttle-dispatch-rate = (publish-rate + configured dispatch-rate).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Default number of message-bytes dispatching throttling-limit for every topic. \n\n"
+ "Using a value of 0, is disabling default message-byte dispatch-throttling")
private long dispatchThrottlingRatePerTopicInByte = 0;
@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Apply dispatch rate limiting on batch message instead individual "
+ "messages with in batch message. (Default is disabled)")
private boolean dispatchThrottlingOnBatchMessageEnabled = false;

@FieldContext(
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
protected final Subscription subscription;

protected final ServiceConfiguration serviceConfig;
protected final boolean dispatchThrottlingOnBatchMessageEnabled;

protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
this.subscription = subscription;
this.serviceConfig = serviceConfig;
this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
}

/**
Expand Down Expand Up @@ -97,24 +99,26 @@ protected int updateEntryWrapperWithMetadata(EntryWrapper[] entryWrappers, List<
* @param sendMessageInfo
* an object where the total size in messages and bytes will be returned back to the caller
*/
public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
ManagedCursor cursor, boolean isReplayRead) {
filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor,
return filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor,
isReplayRead);
}

public void filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
if (entry == null) {
continue;
}
totalEntries++;
ByteBuf metadataAndPayload = entry.getDataBuffer();
int entryWrapperIndex = i + entryWrapperOffset;
MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
Expand Down Expand Up @@ -182,6 +186,7 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
return totalEntries;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
int start = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1;

int firstAvailableConsumerPermits, currentTotalAvailablePermits;
Expand Down Expand Up @@ -541,8 +542,9 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {

EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
totalEntries += filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start,
entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay);

c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
Expand Down Expand Up @@ -571,13 +573,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
}

// acquire message-dispatch permits for already delivered messages
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,16 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
redeliveryTracker)
.addListener(future -> {
if (future.isSuccess()) {
int permits = dispatchThrottlingOnBatchMessageEnabled ? entries.size()
: sendMessageInfo.getTotalMessages();
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(sendMessageInfo.getTotalMessages(),
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes());
}

dispatchRateLimiter.ifPresent(rateLimiter ->
rateLimiter.tryDispatchPermit(sendMessageInfo.getTotalMessages(),
rateLimiter.tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ protected Map<Consumer, List<Entry>> initialValue() throws Exception {
protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
int entriesCount = entries.size();

// Trigger read more messages
Expand Down Expand Up @@ -229,8 +230,8 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);
filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay);
totalEntries += filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay);

consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
Expand All @@ -252,12 +253,13 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {

// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,84 @@ public void testRateLimitingMultipleConsumers() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testRateLimitingWithBatchMsgEnabled() throws Exception {
log.info("-- Starting {} test --", methodName);

conf.setDispatchThrottlingOnBatchMessageEnabled(true);

final String namespace = "my-property/throttling_ns";
final String topicName = "persistent://" + namespace + "/throttlingMultipleConsumers";

final int messageRate = 5;
DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(messageRate)
.dispatchThrottlingRateInByte(-1).ratePeriodInSecond(360).build();
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setDispatchRate(namespace, dispatchRate);

final int messagesPerBatch = 100;
final int numProducedMessages = messageRate * messagesPerBatch;
// create producer and topic
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS).batchingMaxMessages(messagesPerBatch).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
boolean isMessageRateUpdate = false;
int retry = 5;
for (int i = 0; i < retry; i++) {
if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0) {
isMessageRateUpdate = true;
break;
} else {
if (i != retry - 1) {
Thread.sleep(100);
}
}
}
Assert.assertTrue(isMessageRateUpdate);
Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate);

final AtomicInteger totalReceived = new AtomicInteger(0);

ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
totalReceived.incrementAndGet();
});
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
Consumer<byte[]> consumer4 = consumerBuilder.subscribe();
Consumer<byte[]> consumer5 = consumerBuilder.subscribe();

// deactive cursors
deactiveCursors((ManagedLedgerImpl) topic.getManagedLedger());

// Asynchronously produce messages
CountDownLatch latch = new CountDownLatch(numProducedMessages);
for (int i = 0; i < numProducedMessages; i++) {
final String message = "my-message-" + i;
producer.sendAsync(message.getBytes()).thenAccept(__ -> latch.countDown());
}

latch.await();

Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> totalReceived.get() == numProducedMessages);

// consumer should not have received all published message due to message-rate throttling
Assert.assertEquals(totalReceived.get(), numProducedMessages);

consumer1.close();
consumer2.close();
consumer3.close();
consumer4.close();
consumer5.close();
producer.close();
log.info("-- Exiting {} test --", methodName);
}

@Test(dataProvider = "subscriptions", timeOut = 5000)
public void testClusterRateLimitingConfiguration(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down
2 changes: 2 additions & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
|subscribeRatePeriodPerConsumerInSecond|Rate period for {subscribeThrottlingRatePerConsumer}. By default, it is 30s.|30|
| dispatchThrottlingRatePerTopicInMsg | Default messages (per second) dispatch throttling-limit for every topic. When the value is set to 0, default message dispatch throttling-limit is disabled. |0 |
| dispatchThrottlingRatePerTopicInByte | Default byte (per second) dispatch throttling-limit for every topic. When the value is set to 0, default byte dispatch throttling-limit is disabled. | 0|
| dispatchThrottlingOnBatchMessageEnabled |Apply dispatch rate limiting on batch message instead individual messages with in batch message. (Default is disabled). | false|

| dispatchThrottlingRateRelativeToPublishRate | Enable dispatch rate-limiting relative to publish rate. | false |
|dispatchThrottlingRatePerSubscriptionInMsg|The defaulted number of message dispatching throttling-limit for a subscription. The value of 0 disables message dispatch-throttling.|0|
|dispatchThrottlingRatePerSubscriptionInByte|The default number of message-bytes dispatching throttling-limit for a subscription. The value of 0 disables message-byte dispatch-throttling.|0|
Expand Down