-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-3 : Introduce message-dispatch rate limiting #634
Conversation
6100afd
to
1c1c031
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall approach looks good to me
conf/broker.conf
Outdated
@@ -106,6 +106,14 @@ maxUnackedMessagesPerBroker=0 | |||
# limit/2 messages | |||
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 | |||
|
|||
# Default number of message dispatching throttling-limit for every topic. Using a value of 0, is disabling default | |||
# message dispatch-throttling | |||
dispatchRatePerTopicInMsg=0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we include "throttling" in the var name? Like dispatchThrottlingRatePerTopicInMsg
@@ -245,6 +245,19 @@ protected Policies getNamespacePolicies(String property, String cluster, String | |||
} | |||
} | |||
|
|||
protected LocalPolicies getNamespaceLocalPolicies(String property, String cluster, String namespace) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we configuring the throttling policies on the global zk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least from an API perspective, it should be very clear what is the scope of the configuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we configuring the throttling policies on the global zk?
Actually, every cluster has different traffic for the namespace and we may want to control enabling or throttling-limit per cluster level. Therefore, kept it in local-policies and not in global-zk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine, but it can be quite confusing for a user given that other methods are changing the settings globally.
I think it could either be explicitely set for a particular cluster, or for all the clusters. In order to clarify the scope. Eg:
{
"rateLimit" : {
"dispatchRatePerTopicInMsg" : 1000.0,
"dispatchRatePerTopicInBytes" : 1000000.0,
"clusters" : {
"small-cluster" : {
"dispatchRatePerTopicInMsg" : 10.0,
"dispatchRatePerTopicInBytes" : 1000.0,
},
"large-cluster" : {
"dispatchRatePerTopicInMsg" : 1000000.0,
"dispatchRatePerTopicInBytes" : 1000000000.0,
}
}
}
}
The advantage is that you can also see all the limit in a single place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. But I think we introduced local-policies to keep cluster-level policy information such as BundleData
and it keeps global-policies clean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though BundleData is different. It is not configuration. It is written back, by the brokers themselves, into the local ZK because we want each cluster to be able to split the bundles independently.
For rate limiting, we are talking about proper configuration.
@@ -810,6 +811,42 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa | |||
} | |||
} | |||
|
|||
@PUT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this POST
rather than PUT
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have updated with POST
but, REST standard suggests PUT
for resource-update and POST
for create.
multiLayerTopicsMap.forEach((namespace, bundle) -> { | ||
bundle.forEach((name, topics) -> { | ||
topics.forEach((topicName, topic) -> { | ||
if (topic instanceof PersistentTopic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just iterate on the flat topics map?
@@ -575,5 +578,31 @@ private void clearUnAckedMsgs(Consumer consumer) { | |||
subscription.addUnAckedMessages(-unaAckedMsgs); | |||
} | |||
|
|||
public static class SendMessageInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this information used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At dispatcher, we want to know how many messages/bytes we actually sent to manage the permits. Therefore, at here in consumer, we exactly know how many msgs/bytes is sent and return it to dispatcher. Earlier, consumer.sendMessages()
was returning Pair<ChannelPromise, Integer>
and we wanted to add one more variable so, created SendMessageInfo
entity to combine all information.
} catch (Exception e) { | ||
log.warn("Failed to get message-rate for {}", this.topic, e); | ||
} | ||
DispatchRate dispatchRate = brokerService.pulsar().getConfiguration().getDispatchRatePerTopicInMsg() > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, if both limits are set, both of them should be applied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, that could just be done inside DispatchRate
class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, that could just be done inside DispatchRate class
Actually, using admin-api, we always define DispatchRateType=msgRate/byteRate
and throttling-value in DispatchRate
class.
However, for dynamic configuration we can't store complex-type so, created two variables dispatchRatePerTopicInMsg
and dispatchRatePerTopicInByte
, therefore, we need this logic when we have value set for both configuration.
Ideally, if both limits are set, both of them should be applied.
This will require 2 RateLimiter
objects for every topic which we use across all the subscriptions. Do you think considering both limits would be useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would require 2 rate limiters only if both limits are set. Otherwise one of them, or both, will be null.
} | ||
|
||
@Override | ||
public int hashCode() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless hashCode()
is used in a very critical place, just use Objects.hashCode(var1, var2)
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Objects.equals(a, obj.a) && Objects.equals(b, obj.b)
} | ||
|
||
@Override | ||
public String toString() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Objects.toString(this);
😄
* acquire 10 permits at any time with in that 1 second. | ||
* | ||
*/ | ||
public class RateLimiter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this class different from Guava rate limiter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have started with Guava-RateLimiter
but ended up with custom one due to 2 main reasons:
(1) achieve per-second rate-limiting and (2) custom is faster compare to guava. I have added both Explanation here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I found out the gist with the explanation and it makes sense. Can you include the full explanation here as well? Also, would it make sense to have the 1sec
period to be configurable as well? Eg: only rate limit if it's exceeding the rate for 10 seconds instead of 1.
dbd93c9
to
bda64ac
Compare
read messages based on available-message-rate-permits add admin command add message-rate + byte-rate options for dispatch-rate limiting policies on global-zk + combined msg/byte rate + reset rateLimiter-timeUnit
@merlimat I addressed all the comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Just few final comments
|
||
} | ||
|
||
public synchronized void shutdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you implement AutoCloseable
and change this into close()
for consistency
conf/broker.conf
Outdated
@@ -106,6 +106,14 @@ maxUnackedMessagesPerBroker=0 | |||
# limit/2 messages | |||
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 | |||
|
|||
# Default number of message dispatching throttling-limit for every topic. Using a value of 0, is disabling default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should say the "messages per second" rate
conf/broker.conf
Outdated
# message dispatch-throttling | ||
dispatchThrottlingRatePerTopicInMsg=0 | ||
|
||
# Default number of message-bytes dispatching throttling-limit for every topic. Using a value of 0, is disabling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, make it clear the meaning is bytes/sec
// throttle only if: (1) cursor is not active bcz active-cursor reads message from cache rather from | ||
// bookkeeper (2) if topic has reached message-rate threshold: then schedule the read after | ||
// MESSAGE_RATE_BACKOFF_MS | ||
if (!cursor.isActive()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually, we could later add a config option to also rate limit the delivery from cache (that might help for topics with many subscriptions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually, we could later add a config option to also rate limit the delivery from cache (that might help for topics with many subscriptions)
yes, we also need this configuration to bound actual number of dispatch-messages, it is also helpful to throttle n/w bandwidth consumption by each client. I will create a separate PR to introduce this config.
dispatchRate); | ||
validateSuperUserAccess(); | ||
|
||
if (!cluster.equals(Namespaces.GLOBAL_CLUSTER)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not necessary to check the cluster here, change will be stored in global-zk and the right cluster will pick it up.
CompletableFuture<Optional<LocalPolicies>> future = new CompletableFuture<>(); | ||
if (path == null || !path.startsWith(LOCAL_POLICIES_ROOT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still needed since we're not using local policies anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, this method returns CompletableFuture<>
so, we should fail the future rather throwing runtime exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Motivation
As discussed in PIP, adding message-rate limiting to throttle message-dispatching.
Modifications
Result
Broker can throttle message-dispatching at topic level.