Skip to content

Commit

Permalink
[broker] Optimize TopicPolicies#maxUnackedMessagesOnConsumer with Hie…
Browse files Browse the repository at this point in the history
…rarchyTopicPolicies (apache#13618)
  • Loading branch information
Jason918 authored and nicklixinyang committed Apr 20, 2022
1 parent da41a16 commit 7d4fa12
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;

protected volatile int maxUnackedMessagesOnConsumerAppilied = 0;

protected volatile PublishRateLimiter topicPublishRateLimiter;

protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;
Expand Down Expand Up @@ -164,6 +162,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
}
topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxUnackedMessagesOnConsumer().updateTopicValue(data.getMaxUnackedMessagesOnConsumer());
topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
topicPolicies.getMaxConsumersPerSubscription().updateTopicValue(data.getMaxConsumersPerSubscription());
Expand Down Expand Up @@ -195,6 +194,8 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold);
topicPolicies.getReplicationClusters().updateNamespaceValue(
Lists.newArrayList(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
topicPolicies.getMaxUnackedMessagesOnConsumer()
.updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer);
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
Expand Down Expand Up @@ -235,6 +236,8 @@ private void updateTopicPolicyByBrokerConfig() {
config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateBrokerValue(
config.getBrokerDeduplicationSnapshotIntervalSeconds());
topicPolicies.getMaxUnackedMessagesOnConsumer()
.updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
//init backlogQuota
topicPolicies.getBackLogQuotaMap()
.get(BacklogQuota.BacklogQuotaType.destination_storage)
Expand Down Expand Up @@ -1069,6 +1072,7 @@ protected void updatePublishDispatcher(PublishRate publishRate) {
}
}

@Override
public HierarchyTopicPolicies getHierarchyTopicPolicies() {
return topicPolicies;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class Consumer {

private final ConsumerStatsImpl stats;

private volatile int maxUnackedMessages;
private final boolean isDurable;
private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
private volatile int unackedMessages = 0;
Expand Down Expand Up @@ -132,7 +132,7 @@ public class Consumer {

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
int maxUnackedMessages, TransportCnx cnx, String appId,
boolean isDurable, TransportCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition,
KeySharedMeta keySharedMeta, MessageId startMessageId) {

Expand All @@ -144,7 +144,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.priorityLevel = priorityLevel;
this.readCompacted = readCompacted;
this.consumerName = consumerName;
this.maxUnackedMessages = maxUnackedMessages;
this.isDurable = isDurable;
this.subscriptionInitialPosition = subscriptionInitialPosition;
this.keySharedMeta = keySharedMeta;
this.cnx = cnx;
Expand Down Expand Up @@ -290,8 +290,8 @@ public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batc

private void incrementUnackedMessages(int ackedMessages) {
if (Subscription.isIndividualAckMode(subType)
&& addAndGetUnAckedMsgs(this, ackedMessages) >= maxUnackedMessages
&& maxUnackedMessages > 0) {
&& addAndGetUnAckedMsgs(this, ackedMessages) >= getMaxUnackedMessages()
&& getMaxUnackedMessages() > 0) {
blockedConsumerOnUnackedMsgs = true;
}
}
Expand Down Expand Up @@ -624,7 +624,7 @@ public void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);

// block shared consumer when unacked-messages reaches limit
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= maxUnackedMessages) {
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= getMaxUnackedMessages()) {
blockedConsumerOnUnackedMsgs = true;
}
int oldPermits;
Expand Down Expand Up @@ -684,12 +684,12 @@ public void reachedEndOfTopic() {
/**
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
* a. consumer must have Shared-subscription<br/>
* b. {@link this#maxUnackedMessages} value > 0
* b. {@link this#getMaxUnackedMessages()} value > 0
*
* @return
*/
private boolean shouldBlockConsumerOnUnackMsgs() {
return Subscription.isIndividualAckMode(subType) && maxUnackedMessages > 0;
return Subscription.isIndividualAckMode(subType) && getMaxUnackedMessages() > 0;
}

public void updateRates() {
Expand Down Expand Up @@ -821,7 +821,7 @@ private void removePendingAcks(PositionImpl position) {
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
if ((((unAckedMsgs <= maxUnackedMessages / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
|| !shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
Expand Down Expand Up @@ -937,12 +937,14 @@ public void setReadPositionWhenJoining(PositionImpl readPositionWhenJoining) {
}

public int getMaxUnackedMessages() {
return maxUnackedMessages;
//Unacked messages check is disabled for non-durable subscriptions.
if (isDurable && subscription != null) {
return subscription.getTopic().getHierarchyTopicPolicies().getMaxUnackedMessagesOnConsumer().get();
} else {
return 0;
}
}

public void setMaxUnackedMessages(int maxUnackedMessages) {
this.maxUnackedMessages = maxUnackedMessages;
}

public TransportCnx cnx() {
return cnx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
Expand Down Expand Up @@ -328,4 +329,10 @@ default boolean isSystemTopic() {
*/
BrokerService getBrokerService();

/**
* Get HierarchyTopicPolicies.
* @return
*/
HierarchyTopicPolicies getHierarchyTopicPolicies();

}
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,9 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St

NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this, subscriptionName, isDurable));
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest);
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
false, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta,
MessageId.latest);
addConsumerToSubscription(subscription, consumer).thenRun(() -> {
if (!cnx.isActive()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ public CompletableFuture<Void> initialize() {
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
updateUnackedMessagesAppliedOnSubscription(null);
updateUnackedMessagesExceededOnConsumer(null);
return;
}

Expand All @@ -327,13 +326,11 @@ public CompletableFuture<Void> initialize() {
schemaValidationEnforced = policies.schema_validation_enforced;

updateUnackedMessagesAppliedOnSubscription(policies);
updateUnackedMessagesExceededOnConsumer(policies);
}).exceptionally(ex -> {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false",
topic, ex.getMessage());
isEncryptionRequired = false;
updateUnackedMessagesAppliedOnSubscription(null);
updateUnackedMessagesExceededOnConsumer(null);
return null;
}));
}
Expand Down Expand Up @@ -755,13 +752,9 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
startMessageRollbackDurationSec, readCompacted);

int maxUnackedMessages = isDurable
? getMaxUnackedMessagesOnConsumer()
: 0;

CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata,
consumerName, isDurable, cnx, cnx.getAuthRole(), metadata,
readCompacted, initialPosition, keySharedMeta, startMessageId);
return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
checkBackloggedCursors();
Expand Down Expand Up @@ -846,24 +839,6 @@ public void updateUnackedMessagesAppliedOnSubscription(Policies policies) {
);
}

private void updateUnackedMessagesExceededOnConsumer(Policies data) {
maxUnackedMessagesOnConsumerAppilied = getTopicPolicies()
.map(TopicPolicies::getMaxUnackedMessagesOnConsumer)
.orElseGet(() -> data != null && data.max_unacked_messages_per_consumer != null
? data.max_unacked_messages_per_consumer
: brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer());
getSubscriptions().forEach((name, sub) -> {
if (sub != null) {
sub.getConsumers().forEach(consumer -> {
if (consumer.getMaxUnackedMessages() != maxUnackedMessagesOnConsumerAppilied) {
consumer.setMaxUnackedMessages(maxUnackedMessagesOnConsumerAppilied);
}
});
}
});

}

private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated,
Map<String, String> subscriptionProperties) {
Expand Down Expand Up @@ -2414,7 +2389,6 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {

schemaValidationEnforced = data.schema_validation_enforced;
updateUnackedMessagesAppliedOnSubscription(data);
updateUnackedMessagesExceededOnConsumer(data);

//If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
Optional<TopicPolicies> topicPolicies = getTopicPolicies();
Expand Down Expand Up @@ -3034,10 +3008,6 @@ public long getDelayedDeliveryTickTimeMillis() {
return topicPolicies.getDelayedDeliveryTickTimeMillis().get();
}

public int getMaxUnackedMessagesOnConsumer() {
return maxUnackedMessagesOnConsumerAppilied;
}

public boolean isDelayedDeliveryEnabled() {
return topicPolicies.getDelayedDeliveryEnabled().get();
}
Expand Down Expand Up @@ -3092,7 +3062,6 @@ public void onUpdate(TopicPolicies policies) {
}
replicators.forEach((name, replicator) -> replicator.getRateLimiter()
.ifPresent(DispatchRateLimiter::updateDispatchRate));
updateUnackedMessagesExceededOnConsumer(namespacePolicies.orElse(null));

checkDeduplicationStatus();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {

// 2. Add old consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
"Cons1"/* consumer name */, true, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
Expand All @@ -301,7 +301,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {

// 3. Add new consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0,
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
"Cons2"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
Expand Down Expand Up @@ -329,7 +329,7 @@ public void testAddRemoveConsumer() throws Exception {

// 2. Add consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
"Cons1"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
Expand All @@ -354,7 +354,7 @@ public void testAddRemoveConsumer() throws Exception {

// 5. Add another consumer which does not change active consumer
Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
Expand All @@ -367,7 +367,7 @@ public void testAddRemoveConsumer() throws Exception {

// 6. Add a consumer which changes active consumer
Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0,
"Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
"Cons0"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
Expand Down Expand Up @@ -450,7 +450,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {

// 2. Add a consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
"Cons1"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
Expand All @@ -459,7 +459,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {

// 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order.
Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1,
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
"Cons2"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer2);

Expand All @@ -473,7 +473,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {

// 5. Add another consumer which has higher priority level
Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer3);
consumers = pdfc.getConsumers();
assertEquals(3, consumers.size());
Expand Down Expand Up @@ -663,7 +663,7 @@ private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatche
private Consumer createConsumer(PersistentTopic topic, int priority, int permit, boolean blocked, int id) throws Exception {
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
Consumer consumer =
new Consumer(sub, SubType.Shared, "test-topic", id, priority, ""+id, 5000,
new Consumer(sub, SubType.Shared, "test-topic", id, priority, ""+id, true,
serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest);
try {
consumer.flowPermits(permit);
Expand Down
Loading

0 comments on commit 7d4fa12

Please sign in to comment.