diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index bee65617fa2f4..0470563e9f801 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -111,8 +111,6 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages"); private volatile int unackedMessages = 0; @@ -132,7 +132,7 @@ public class Consumer { public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, - int maxUnackedMessages, TransportCnx cnx, String appId, + boolean isDurable, TransportCnx cnx, String appId, Map metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, KeySharedMeta keySharedMeta, MessageId startMessageId) { @@ -144,7 +144,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo this.priorityLevel = priorityLevel; this.readCompacted = readCompacted; this.consumerName = consumerName; - this.maxUnackedMessages = maxUnackedMessages; + this.isDurable = isDurable; this.subscriptionInitialPosition = subscriptionInitialPosition; this.keySharedMeta = keySharedMeta; this.cnx = cnx; @@ -290,8 +290,8 @@ public Future sendMessages(final List entries, EntryBatchSizes batc private void incrementUnackedMessages(int ackedMessages) { if (Subscription.isIndividualAckMode(subType) - && addAndGetUnAckedMsgs(this, ackedMessages) >= maxUnackedMessages - && maxUnackedMessages > 0) { + && addAndGetUnAckedMsgs(this, ackedMessages) >= getMaxUnackedMessages() + && getMaxUnackedMessages() > 0) { blockedConsumerOnUnackedMsgs = true; } } @@ -624,7 +624,7 @@ public void flowPermits(int additionalNumberOfMessages) { checkArgument(additionalNumberOfMessages > 0); // block shared consumer when unacked-messages reaches limit - if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= maxUnackedMessages) { + if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= getMaxUnackedMessages()) { blockedConsumerOnUnackedMsgs = true; } int oldPermits; @@ -684,12 +684,12 @@ public void reachedEndOfTopic() { /** * Checks if consumer-blocking on unAckedMessages is allowed for below conditions:
* a. consumer must have Shared-subscription
- * b. {@link this#maxUnackedMessages} value > 0 + * b. {@link this#getMaxUnackedMessages()} value > 0 * * @return */ private boolean shouldBlockConsumerOnUnackMsgs() { - return Subscription.isIndividualAckMode(subType) && maxUnackedMessages > 0; + return Subscription.isIndividualAckMode(subType) && getMaxUnackedMessages() > 0; } public void updateRates() { @@ -821,7 +821,7 @@ private void removePendingAcks(PositionImpl position) { // unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages => // consumer can start again consuming messages int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer); - if ((((unAckedMsgs <= maxUnackedMessages / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs) + if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs) && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()) || !shouldBlockConsumerOnUnackMsgs()) { ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false; @@ -937,12 +937,14 @@ public void setReadPositionWhenJoining(PositionImpl readPositionWhenJoining) { } public int getMaxUnackedMessages() { - return maxUnackedMessages; + //Unacked messages check is disabled for non-durable subscriptions. + if (isDurable && subscription != null) { + return subscription.getTopic().getHierarchyTopicPolicies().getMaxUnackedMessagesOnConsumer().get(); + } else { + return 0; + } } - public void setMaxUnackedMessages(int maxUnackedMessages) { - this.maxUnackedMessages = maxUnackedMessages; - } public TransportCnx cnx() { return cnx; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 621b48cfe057c..472bf4efdb83f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -34,6 +34,7 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; +import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; @@ -328,4 +329,10 @@ default boolean isSystemTopic() { */ BrokerService getBrokerService(); + /** + * Get HierarchyTopicPolicies. + * @return + */ + HierarchyTopicPolicies getHierarchyTopicPolicies(); + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index e238df344c44e..fd7ad954ba40d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -303,8 +303,9 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> new NonPersistentSubscription(this, subscriptionName, isDurable)); - Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, - cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest); + Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, + false, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, + MessageId.latest); addConsumerToSubscription(subscription, consumer).thenRun(() -> { if (!cnx.isActive()) { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1f9192f4cc46f..bb81d589b3605 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -311,7 +311,6 @@ public CompletableFuture initialize() { if (!optPolicies.isPresent()) { isEncryptionRequired = false; updateUnackedMessagesAppliedOnSubscription(null); - updateUnackedMessagesExceededOnConsumer(null); return; } @@ -327,13 +326,11 @@ public CompletableFuture initialize() { schemaValidationEnforced = policies.schema_validation_enforced; updateUnackedMessagesAppliedOnSubscription(policies); - updateUnackedMessagesExceededOnConsumer(policies); }).exceptionally(ex -> { log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, ex.getMessage()); isEncryptionRequired = false; updateUnackedMessagesAppliedOnSubscription(null); - updateUnackedMessagesExceededOnConsumer(null); return null; })); } @@ -755,13 +752,9 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec, readCompacted); - int maxUnackedMessages = isDurable - ? getMaxUnackedMessagesOnConsumer() - : 0; - CompletableFuture future = subscriptionFuture.thenCompose(subscription -> { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, - consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata, + consumerName, isDurable, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, startMessageId); return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { checkBackloggedCursors(); @@ -846,24 +839,6 @@ public void updateUnackedMessagesAppliedOnSubscription(Policies policies) { ); } - private void updateUnackedMessagesExceededOnConsumer(Policies data) { - maxUnackedMessagesOnConsumerAppilied = getTopicPolicies() - .map(TopicPolicies::getMaxUnackedMessagesOnConsumer) - .orElseGet(() -> data != null && data.max_unacked_messages_per_consumer != null - ? data.max_unacked_messages_per_consumer - : brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer()); - getSubscriptions().forEach((name, sub) -> { - if (sub != null) { - sub.getConsumers().forEach(consumer -> { - if (consumer.getMaxUnackedMessages() != maxUnackedMessagesOnConsumerAppilied) { - consumer.setMaxUnackedMessages(maxUnackedMessagesOnConsumerAppilied); - } - }); - } - }); - - } - private CompletableFuture getDurableSubscription(String subscriptionName, InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated, Map subscriptionProperties) { @@ -2414,7 +2389,6 @@ public CompletableFuture onPoliciesUpdate(Policies data) { schemaValidationEnforced = data.schema_validation_enforced; updateUnackedMessagesAppliedOnSubscription(data); - updateUnackedMessagesExceededOnConsumer(data); //If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy. Optional topicPolicies = getTopicPolicies(); @@ -3034,10 +3008,6 @@ public long getDelayedDeliveryTickTimeMillis() { return topicPolicies.getDelayedDeliveryTickTimeMillis().get(); } - public int getMaxUnackedMessagesOnConsumer() { - return maxUnackedMessagesOnConsumerAppilied; - } - public boolean isDelayedDeliveryEnabled() { return topicPolicies.getDelayedDeliveryEnabled().get(); } @@ -3092,7 +3062,6 @@ public void onUpdate(TopicPolicies policies) { } replicators.forEach((name, replicator) -> replicator.getRateLimiter() .ifPresent(DispatchRateLimiter::updateDispatchRate)); - updateUnackedMessagesExceededOnConsumer(namespacePolicies.orElse(null)); checkDeduplicationStatus(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 0efd2b748c118..d76505a69808d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -290,7 +290,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { // 2. Add old consumer Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, - "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); + "Cons1"/* consumer name */, true, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -301,7 +301,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { // 3. Add new consumer Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, - "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); + "Cons2"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -329,7 +329,7 @@ public void testAddRemoveConsumer() throws Exception { // 2. Add consumer Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, - "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons1"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); @@ -354,7 +354,7 @@ public void testAddRemoveConsumer() throws Exception { // 5. Add another consumer which does not change active consumer Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); + true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); @@ -367,7 +367,7 @@ public void testAddRemoveConsumer() throws Exception { // 6. Add a consumer which changes active consumer Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0, - "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons0"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer0); consumers = pdfc.getConsumers(); @@ -450,7 +450,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { // 2. Add a consumer Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1, - "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons1"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); @@ -459,7 +459,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { // 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order. Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1, - "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons2"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer2); @@ -473,7 +473,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { // 5. Add another consumer which has higher priority level Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); + true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer3); consumers = pdfc.getConsumers(); assertEquals(3, consumers.size()); @@ -663,7 +663,7 @@ private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatche private Consumer createConsumer(PersistentTopic topic, int priority, int permit, boolean blocked, int id) throws Exception { PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); Consumer consumer = - new Consumer(sub, SubType.Shared, "test-topic", id, priority, ""+id, 5000, + new Consumer(sub, SubType.Shared, "test-topic", id, priority, ""+id, true, serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); try { consumer.flowPermits(permit); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index e1b316cd821fc..5bb94882d45f6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -175,6 +175,7 @@ public void setup() throws Exception { ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setAdvertisedAddress("localhost"); svcConfig.setBrokerShutdownTimeoutMs(0L); + svcConfig.setMaxUnackedMessagesPerConsumer(50000); pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); doReturn(svcConfig).when(pulsar).getConfiguration(); doReturn(mock(Compactor.class)).when(pulsar).getCompactor(); @@ -747,7 +748,7 @@ public void testChangeSubscriptionType() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "change-sub-type", cursorMock, false); - Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, + Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", true, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest); sub.addConsumer(consumer); @@ -758,7 +759,7 @@ public void testChangeSubscriptionType() throws Exception { SubType.Exclusive)) { Dispatcher previousDispatcher = sub.getDispatcher(); - consumer = new Consumer(sub, subType, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, "myrole-1", + consumer = new Consumer(sub, subType, topic.getName(), 1, 0, "Cons1", true, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest); sub.addConsumer(consumer); @@ -783,7 +784,7 @@ public void testAddRemoveConsumer() throws Exception { // 1. simple add consumer Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); + true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); sub.addConsumer(consumer); assertTrue(sub.getDispatcher().isConsumerConnected()); @@ -815,7 +816,7 @@ public void testAddRemoveConsumerDurableCursor() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "non-durable-sub", cursorMock, false); - Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, + Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", true, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); sub.addConsumer(consumer); @@ -853,14 +854,14 @@ private void testMaxConsumersShared() throws Exception { // 1. add consumer1 Consumer consumer = new Consumer(sub, SubType.Shared, topic.getName(), 1 /* consumer id */, 0, - "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons1"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer); assertEquals(sub.getConsumers().size(), 1); // 2. add consumer2 Consumer consumer2 = new Consumer(sub, SubType.Shared, topic.getName(), 2 /* consumer id */, 0, - "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons2"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer2); assertEquals(sub.getConsumers().size(), 2); @@ -868,7 +869,7 @@ private void testMaxConsumersShared() throws Exception { // 3. add consumer3 but reach maxConsumersPerSubscription try { Consumer consumer3 = new Consumer(sub, SubType.Shared, topic.getName(), 3 /* consumer id */, 0, - "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons3"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture) addConsumerToSubscription.invoke(topic, sub, consumer3)).get(); fail("should have failed"); @@ -881,7 +882,7 @@ private void testMaxConsumersShared() throws Exception { // 4. add consumer4 to sub2 Consumer consumer4 = new Consumer(sub2, SubType.Shared, topic.getName(), 4 /* consumer id */, 0, - "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons4"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub2, consumer4); assertEquals(sub2.getConsumers().size(), 1); @@ -892,7 +893,7 @@ private void testMaxConsumersShared() throws Exception { // 5. add consumer5 to sub2 but reach maxConsumersPerTopic try { Consumer consumer5 = new Consumer(sub2, SubType.Shared, topic.getName(), 5 /* consumer id */, 0, - "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons5"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture) addConsumerToSubscription.invoke(topic, sub2, consumer5)).get(); fail("should have failed"); @@ -957,14 +958,14 @@ private void testMaxConsumersFailover() throws Exception { // 1. add consumer1 Consumer consumer = new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 0, - "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons1"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer); assertEquals(sub.getConsumers().size(), 1); // 2. add consumer2 Consumer consumer2 = new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 0, - "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons2"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer2); assertEquals(sub.getConsumers().size(), 2); @@ -972,7 +973,7 @@ private void testMaxConsumersFailover() throws Exception { // 3. add consumer3 but reach maxConsumersPerSubscription try { Consumer consumer3 = new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, - "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons3"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture) addConsumerToSubscription.invoke(topic, sub, consumer3)).get(); fail("should have failed"); @@ -985,7 +986,7 @@ private void testMaxConsumersFailover() throws Exception { // 4. add consumer4 to sub2 Consumer consumer4 = new Consumer(sub2, SubType.Failover, topic.getName(), 4 /* consumer id */, 0, - "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons4"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub2, consumer4); assertEquals(sub2.getConsumers().size(), 1); @@ -996,7 +997,7 @@ private void testMaxConsumersFailover() throws Exception { // 5. add consumer5 to sub2 but reach maxConsumersPerTopic try { Consumer consumer5 = new Consumer(sub2, SubType.Failover, topic.getName(), 5 /* consumer id */, 0, - "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + "Cons5"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture) addConsumerToSubscription.invoke(topic, sub2, consumer5)).get(); fail("should have failed"); @@ -1050,7 +1051,7 @@ private Consumer getMockedConsumerWithSpecificAddress(Topic topic, Subscription doReturn(address.getHostAddress()).when(cnx).clientSourceAddress(); doReturn(new PulsarCommandSenderImpl(null, cnx)).when(cnx).getCommandSender(); - return new Consumer(sub, SubType.Shared, topic.getName(), consumerId, 0, consumerNameBase + consumerId, 50000, + return new Consumer(sub, SubType.Shared, topic.getName(), consumerId, 0, consumerNameBase + consumerId, true, cnx, role, Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); } @@ -1159,7 +1160,7 @@ public void testUbsubscribeRaceConditions() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); + true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); sub.addConsumer(consumer1); doAnswer(new Answer() { @@ -1182,7 +1183,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { try { Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */ sub.addConsumer(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, - 0, "Cons2"/* consumer name */, 50000, serverCnx, + 0, "Cons2"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)).get(); fail(); } catch (Exception e) { @@ -1961,21 +1962,21 @@ public void testBacklogCursor() throws Exception { ManagedCursor cursor1 = ledger.openCursor("c1"); PersistentSubscription sub1 = new PersistentSubscription(topic, "sub-1", cursor1, false); Consumer consumer1 = new Consumer(sub1, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); + true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); topic.getSubscriptions().put(Codec.decode(cursor1.getName()), sub1); sub1.addConsumer(consumer1); // Open cursor2, add it into activeCursor-container and add it into subscription consumer list ManagedCursor cursor2 = ledger.openCursor("c2"); PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursor2, false); Consumer consumer2 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); + true, serverCnx, "myrole-2", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); topic.getSubscriptions().put(Codec.decode(cursor2.getName()), sub2); sub2.addConsumer(consumer2); // Open cursor3, add it into activeCursor-container and do not add it into subscription consumer list ManagedCursor cursor3 = ledger.openCursor("c3"); PersistentSubscription sub3 = new PersistentSubscription(topic, "sub-3", cursor3, false); Consumer consumer3 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 3 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); + true, serverCnx, "myrole-3", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); topic.getSubscriptions().put(Codec.decode(cursor3.getName()), sub3); // Case1: cursors are active as haven't started deactivateBacklogCursor scan @@ -2085,7 +2086,7 @@ public void testCheckInactiveSubscriptions() throws Exception { addConsumerToSubscription.setAccessible(true); Consumer consumer = new Consumer(nonDeletableSubscription1, SubType.Shared, topic.getName(), 1, 0, "consumer1", - 50000, serverCnx, "app1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); + true, serverCnx, "app1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, nonDeletableSubscription1, consumer); NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources(); @@ -2205,7 +2206,7 @@ public void testKeySharedMetadataExposedToStats() throws Exception { PersistentSubscription sub2 = new PersistentSubscription(topic, "key-shared-stats2", cursorMock, false); PersistentSubscription sub3 = new PersistentSubscription(topic, "key-shared-stats3", cursorMock, false); - Consumer consumer1 = new Consumer(sub1, SubType.Key_Shared, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, + Consumer consumer1 = new Consumer(sub1, SubType.Key_Shared, topic.getName(), 1, 0, "Cons1", true, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT).setAllowOutOfOrderDelivery(false), MessageId.latest); @@ -2216,7 +2217,7 @@ public void testKeySharedMetadataExposedToStats() throws Exception { assertEquals(stats1.keySharedMode, "AUTO_SPLIT"); assertFalse(stats1.allowOutOfOrderDelivery); - Consumer consumer2 = new Consumer(sub2, SubType.Key_Shared, topic.getName(), 2, 0, "Cons2", 50000, serverCnx, + Consumer consumer2 = new Consumer(sub2, SubType.Key_Shared, topic.getName(), 2, 0, "Cons2", true, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT).setAllowOutOfOrderDelivery(true), MessageId.latest); @@ -2230,7 +2231,7 @@ public void testKeySharedMetadataExposedToStats() throws Exception { KeySharedMeta ksm = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY) .setAllowOutOfOrderDelivery(false); ksm.addHashRange().setStart(0).setEnd(65535); - Consumer consumer3 = new Consumer(sub3, SubType.Key_Shared, topic.getName(), 3, 0, "Cons3", 50000, serverCnx, + Consumer consumer3 = new Consumer(sub3, SubType.Key_Shared, topic.getName(), 3, 0, "Cons3", true, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, ksm, MessageId.latest); sub3.addConsumer(consumer3); consumer3.close(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java index 74a72c125aee1..2b6bcc8acdecf 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java @@ -39,6 +39,7 @@ public class HierarchyTopicPolicies { final PolicyHierarchyValue inactiveTopicPolicies; final PolicyHierarchyValue> subscriptionTypesEnabled; final PolicyHierarchyValue maxSubscriptionsPerTopic; + final PolicyHierarchyValue maxUnackedMessagesOnConsumer; final PolicyHierarchyValue maxProducersPerTopic; final Map> backLogQuotaMap; final PolicyHierarchyValue topicMaxMessageSize; @@ -58,6 +59,7 @@ public HierarchyTopicPolicies() { inactiveTopicPolicies = new PolicyHierarchyValue<>(); subscriptionTypesEnabled = new PolicyHierarchyValue<>(); maxSubscriptionsPerTopic = new PolicyHierarchyValue<>(); + maxUnackedMessagesOnConsumer = new PolicyHierarchyValue<>(); maxProducersPerTopic = new PolicyHierarchyValue<>(); maxConsumerPerTopic = new PolicyHierarchyValue<>(); maxConsumersPerSubscription = new PolicyHierarchyValue<>();