Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[broker] Optimize TopicPolicies#maxUnackedMessagesOnConsumer with HierarchyTopicPolicies #13618

Merged
merged 3 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;

protected volatile int maxUnackedMessagesOnConsumerAppilied = 0;

protected volatile PublishRateLimiter topicPublishRateLimiter;

protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;
Expand Down Expand Up @@ -164,6 +162,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
}
topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxUnackedMessagesOnConsumer().updateTopicValue(data.getMaxUnackedMessagesOnConsumer());
topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
topicPolicies.getMaxConsumersPerSubscription().updateTopicValue(data.getMaxConsumersPerSubscription());
Expand Down Expand Up @@ -195,6 +194,8 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold);
topicPolicies.getReplicationClusters().updateNamespaceValue(
Lists.newArrayList(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
topicPolicies.getMaxUnackedMessagesOnConsumer()
.updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer);
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
Expand Down Expand Up @@ -235,6 +236,8 @@ private void updateTopicPolicyByBrokerConfig() {
config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateBrokerValue(
config.getBrokerDeduplicationSnapshotIntervalSeconds());
topicPolicies.getMaxUnackedMessagesOnConsumer()
.updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
//init backlogQuota
topicPolicies.getBackLogQuotaMap()
.get(BacklogQuota.BacklogQuotaType.destination_storage)
Expand Down Expand Up @@ -1069,6 +1072,7 @@ protected void updatePublishDispatcher(PublishRate publishRate) {
}
}

@Override
public HierarchyTopicPolicies getHierarchyTopicPolicies() {
return topicPolicies;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class Consumer {

private final ConsumerStatsImpl stats;

private volatile int maxUnackedMessages;
private final boolean isDurable;
private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
private volatile int unackedMessages = 0;
Expand Down Expand Up @@ -132,7 +132,7 @@ public class Consumer {

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
int maxUnackedMessages, TransportCnx cnx, String appId,
boolean isDurable, TransportCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition,
KeySharedMeta keySharedMeta, MessageId startMessageId) {

Expand All @@ -144,7 +144,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.priorityLevel = priorityLevel;
this.readCompacted = readCompacted;
this.consumerName = consumerName;
this.maxUnackedMessages = maxUnackedMessages;
this.isDurable = isDurable;
this.subscriptionInitialPosition = subscriptionInitialPosition;
this.keySharedMeta = keySharedMeta;
this.cnx = cnx;
Expand Down Expand Up @@ -290,8 +290,8 @@ public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batc

private void incrementUnackedMessages(int ackedMessages) {
if (Subscription.isIndividualAckMode(subType)
&& addAndGetUnAckedMsgs(this, ackedMessages) >= maxUnackedMessages
&& maxUnackedMessages > 0) {
&& addAndGetUnAckedMsgs(this, ackedMessages) >= getMaxUnackedMessages()
&& getMaxUnackedMessages() > 0) {
blockedConsumerOnUnackedMsgs = true;
}
}
Expand Down Expand Up @@ -624,7 +624,7 @@ public void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);

// block shared consumer when unacked-messages reaches limit
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= maxUnackedMessages) {
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= getMaxUnackedMessages()) {
blockedConsumerOnUnackedMsgs = true;
}
int oldPermits;
Expand Down Expand Up @@ -684,12 +684,12 @@ public void reachedEndOfTopic() {
/**
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
* a. consumer must have Shared-subscription<br/>
* b. {@link this#maxUnackedMessages} value > 0
* b. {@link this#getMaxUnackedMessages()} value > 0
*
* @return
*/
private boolean shouldBlockConsumerOnUnackMsgs() {
return Subscription.isIndividualAckMode(subType) && maxUnackedMessages > 0;
return Subscription.isIndividualAckMode(subType) && getMaxUnackedMessages() > 0;
}

public void updateRates() {
Expand Down Expand Up @@ -821,7 +821,7 @@ private void removePendingAcks(PositionImpl position) {
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
if ((((unAckedMsgs <= maxUnackedMessages / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
|| !shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
Expand Down Expand Up @@ -937,12 +937,14 @@ public void setReadPositionWhenJoining(PositionImpl readPositionWhenJoining) {
}

public int getMaxUnackedMessages() {
return maxUnackedMessages;
//Unacked messages check is disabled for non-durable subscriptions.
if (isDurable && subscription != null) {
return subscription.getTopic().getHierarchyTopicPolicies().getMaxUnackedMessagesOnConsumer().get();
} else {
return 0;
}
}

public void setMaxUnackedMessages(int maxUnackedMessages) {
this.maxUnackedMessages = maxUnackedMessages;
}

public TransportCnx cnx() {
return cnx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
Expand Down Expand Up @@ -328,4 +329,10 @@ default boolean isSystemTopic() {
*/
BrokerService getBrokerService();

/**
* Get HierarchyTopicPolicies.
* @return
*/
HierarchyTopicPolicies getHierarchyTopicPolicies();

}
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,9 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St

NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this, subscriptionName, isDurable));
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest);
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
false, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta,
MessageId.latest);
addConsumerToSubscription(subscription, consumer).thenRun(() -> {
if (!cnx.isActive()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ public CompletableFuture<Void> initialize() {
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
updateUnackedMessagesAppliedOnSubscription(null);
updateUnackedMessagesExceededOnConsumer(null);
return;
}

Expand All @@ -327,13 +326,11 @@ public CompletableFuture<Void> initialize() {
schemaValidationEnforced = policies.schema_validation_enforced;

updateUnackedMessagesAppliedOnSubscription(policies);
updateUnackedMessagesExceededOnConsumer(policies);
}).exceptionally(ex -> {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false",
topic, ex.getMessage());
isEncryptionRequired = false;
updateUnackedMessagesAppliedOnSubscription(null);
updateUnackedMessagesExceededOnConsumer(null);
return null;
}));
}
Expand Down Expand Up @@ -755,13 +752,9 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
startMessageRollbackDurationSec, readCompacted);

int maxUnackedMessages = isDurable
? getMaxUnackedMessagesOnConsumer()
: 0;

CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata,
consumerName, isDurable, cnx, cnx.getAuthRole(), metadata,
readCompacted, initialPosition, keySharedMeta, startMessageId);
return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
checkBackloggedCursors();
Expand Down Expand Up @@ -846,24 +839,6 @@ public void updateUnackedMessagesAppliedOnSubscription(Policies policies) {
);
}

private void updateUnackedMessagesExceededOnConsumer(Policies data) {
maxUnackedMessagesOnConsumerAppilied = getTopicPolicies()
.map(TopicPolicies::getMaxUnackedMessagesOnConsumer)
.orElseGet(() -> data != null && data.max_unacked_messages_per_consumer != null
? data.max_unacked_messages_per_consumer
: brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer());
getSubscriptions().forEach((name, sub) -> {
if (sub != null) {
sub.getConsumers().forEach(consumer -> {
if (consumer.getMaxUnackedMessages() != maxUnackedMessagesOnConsumerAppilied) {
consumer.setMaxUnackedMessages(maxUnackedMessagesOnConsumerAppilied);
}
});
}
});

}

private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated,
Map<String, String> subscriptionProperties) {
Expand Down Expand Up @@ -2414,7 +2389,6 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {

schemaValidationEnforced = data.schema_validation_enforced;
updateUnackedMessagesAppliedOnSubscription(data);
updateUnackedMessagesExceededOnConsumer(data);

//If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
Optional<TopicPolicies> topicPolicies = getTopicPolicies();
Expand Down Expand Up @@ -3034,10 +3008,6 @@ public long getDelayedDeliveryTickTimeMillis() {
return topicPolicies.getDelayedDeliveryTickTimeMillis().get();
}

public int getMaxUnackedMessagesOnConsumer() {
return maxUnackedMessagesOnConsumerAppilied;
}

public boolean isDelayedDeliveryEnabled() {
return topicPolicies.getDelayedDeliveryEnabled().get();
}
Expand Down Expand Up @@ -3092,7 +3062,6 @@ public void onUpdate(TopicPolicies policies) {
}
replicators.forEach((name, replicator) -> replicator.getRateLimiter()
.ifPresent(DispatchRateLimiter::updateDispatchRate));
updateUnackedMessagesExceededOnConsumer(namespacePolicies.orElse(null));

checkDeduplicationStatus();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {

// 2. Add old consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
"Cons1"/* consumer name */, true, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
Expand All @@ -301,7 +301,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {

// 3. Add new consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0,
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
"Cons2"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
Expand Down Expand Up @@ -329,7 +329,7 @@ public void testAddRemoveConsumer() throws Exception {

// 2. Add consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
"Cons1"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
Expand All @@ -354,7 +354,7 @@ public void testAddRemoveConsumer() throws Exception {

// 5. Add another consumer which does not change active consumer
Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
Expand All @@ -367,7 +367,7 @@ public void testAddRemoveConsumer() throws Exception {

// 6. Add a consumer which changes active consumer
Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0,
"Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
"Cons0"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
Expand Down Expand Up @@ -450,7 +450,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {

// 2. Add a consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
"Cons1"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
Expand All @@ -459,7 +459,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {

// 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order.
Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1,
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
"Cons2"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer2);

Expand All @@ -473,7 +473,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {

// 5. Add another consumer which has higher priority level
Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest));
pdfc.addConsumer(consumer3);
consumers = pdfc.getConsumers();
assertEquals(3, consumers.size());
Expand Down Expand Up @@ -663,7 +663,7 @@ private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatche
private Consumer createConsumer(PersistentTopic topic, int priority, int permit, boolean blocked, int id) throws Exception {
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
Consumer consumer =
new Consumer(sub, SubType.Shared, "test-topic", id, priority, ""+id, 5000,
new Consumer(sub, SubType.Shared, "test-topic", id, priority, ""+id, true,
serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest);
try {
consumer.flowPermits(permit);
Expand Down
Loading