From 2c308a0f2d09f345ab627144ab10136488428f70 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Mon, 7 Nov 2022 20:55:42 +0800 Subject: [PATCH 1/8] System topic writer/reader connection not counted. --- .../pulsar/broker/service/AbstractTopic.java | 3 + .../service/persistent/PersistentTopic.java | 3 + .../systopic/PartitionedSystemTopicTest.java | 62 +++++++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 85f23426290c9..81319a7d504ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -439,6 +439,9 @@ private PublishRate publishRateInBroker(ServiceConfiguration config) { } protected boolean isProducersExceeded() { + if (isSystemTopic()) { + return false; + } Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get(); if (maxProducers > 0 && maxProducers <= producers.size()) { return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 418da33aea0b3..4709edf631a6d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3314,6 +3314,9 @@ public MessageDeduplication getMessageDeduplication() { } private boolean checkMaxSubscriptionsPerTopicExceed(String subscriptionName) { + if (isSystemTopic()) { + return false; + } //Existing subscriptions are not affected if (StringUtils.isNotEmpty(subscriptionName) && getSubscription(subscriptionName) != null) { return false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 9ab32d5ffa750..de1a9746fbc38 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -25,7 +25,10 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import lombok.Cleanup; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -52,6 +55,8 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.FutureUtil; @@ -260,4 +265,61 @@ private void testSetBacklogCausedCreatingProducerFailure() throws Exception { Assert.fail("failed to create producer"); } } + + @Test + private void testSystemTopicNotCheckExceed() throws Exception { + final String ns = "prop/ns-test"; + final String topic = ns + "/topic-1"; + + admin.namespaces().createNamespace(ns, 2); + admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1); + + admin.topicPolicies().setMaxConsumers(topic, 1); + NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); + TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory + .createTopicPoliciesSystemTopicClient(NamespaceName.get(ns)); + SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader(); + SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader(); + + admin.topicPolicies().setMaxProducers(topic, 1); + AtomicInteger successCount = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(3); + new Thread(() -> { + try { + admin.topicPolicies().setCompactionThreshold(topic, 1L); + successCount.incrementAndGet(); + } catch (PulsarAdminException ignore) { + } finally { + latch.countDown(); + } + }).start(); + new Thread(() -> { + try { + admin.topicPolicies().setSchemaCompatibilityStrategy(topic, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + successCount.incrementAndGet(); + } catch (PulsarAdminException ignore) { + } finally { + latch.countDown(); + } + }).start(); + new Thread(() -> { + try { + admin.topicPolicies().setDispatchRate(topic, DispatchRate.builder() + .ratePeriodInSecond(1).build()); + successCount.incrementAndGet(); + } catch (PulsarAdminException ignore) { + } finally { + latch.countDown(); + } + }).start(); + latch.await(); + Assert.assertEquals(3, successCount.get()); + Assert.assertTrue(reader1.hasMoreEvents()); + Assert.assertNotNull(reader1.readNext()); + Assert.assertTrue(reader2.hasMoreEvents()); + Assert.assertNotNull(reader2.readNext()); + reader1.close(); + reader2.close(); + } } From 421fe1dffdb0111bed46015b5397eb6571c89eda Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 8 Nov 2022 19:46:05 +0800 Subject: [PATCH 2/8] update test. --- .../systopic/PartitionedSystemTopicTest.java | 47 ++----------------- 1 file changed, 4 insertions(+), 43 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index de1a9746fbc38..92558b9a1402c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -25,9 +25,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; import org.apache.bookkeeper.mledger.LedgerOffloader; @@ -55,8 +53,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.BacklogQuota; -import org.apache.pulsar.common.policies.data.DispatchRate; -import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.FutureUtil; @@ -274,52 +270,17 @@ private void testSystemTopicNotCheckExceed() throws Exception { admin.namespaces().createNamespace(ns, 2); admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1); - admin.topicPolicies().setMaxConsumers(topic, 1); + admin.topicPolicies().setMaxConsumers(topic, 0); NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory .createTopicPoliciesSystemTopicClient(NamespaceName.get(ns)); SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader(); - SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader(); - admin.topicPolicies().setMaxProducers(topic, 1); - AtomicInteger successCount = new AtomicInteger(0); - CountDownLatch latch = new CountDownLatch(3); - new Thread(() -> { - try { - admin.topicPolicies().setCompactionThreshold(topic, 1L); - successCount.incrementAndGet(); - } catch (PulsarAdminException ignore) { - } finally { - latch.countDown(); - } - }).start(); - new Thread(() -> { - try { - admin.topicPolicies().setSchemaCompatibilityStrategy(topic, - SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); - successCount.incrementAndGet(); - } catch (PulsarAdminException ignore) { - } finally { - latch.countDown(); - } - }).start(); - new Thread(() -> { - try { - admin.topicPolicies().setDispatchRate(topic, DispatchRate.builder() - .ratePeriodInSecond(1).build()); - successCount.incrementAndGet(); - } catch (PulsarAdminException ignore) { - } finally { - latch.countDown(); - } - }).start(); - latch.await(); - Assert.assertEquals(3, successCount.get()); + admin.topicPolicies().setMaxProducers(topic, 0); + admin.topicPolicies().setCompactionThreshold(topic, 1L); + Assert.assertTrue(reader1.hasMoreEvents()); Assert.assertNotNull(reader1.readNext()); - Assert.assertTrue(reader2.hasMoreEvents()); - Assert.assertNotNull(reader2.readNext()); reader1.close(); - reader2.close(); } } From 26d3bd317e462cbe7f688e8d0e6b359c32899764 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 8 Nov 2022 21:15:47 +0800 Subject: [PATCH 3/8] apply comment. --- .../pulsar/broker/service/AbstractTopic.java | 12 +++++--- .../pulsar/broker/service/ReplicatorTest.java | 30 +++++++++++++++++++ .../systopic/PartitionedSystemTopicTest.java | 13 ++++++-- 3 files changed, 48 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 81319a7d504ac..4b4c11a0130ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -438,17 +438,21 @@ private PublishRate publishRateInBroker(ServiceConfiguration config) { return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), config.getMaxPublishRatePerTopicInBytes()); } - protected boolean isProducersExceeded() { - if (isSystemTopic()) { + protected boolean isProducersExceeded(Producer producer) { + if (isSystemTopic() || producer.isRemote()) { return false; } Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get(); - if (maxProducers > 0 && maxProducers <= producers.size()) { + if (maxProducers != null && maxProducers > 0 && maxProducers <= getUserCreatedProducerSize()) { return true; } return false; } + private long getUserCreatedProducerSize() { + return producers.values().stream().filter(p -> !p.isRemote() || !p.getTopic().isSystemTopic()).count(); + } + protected void registerTopicPolicyListener() { if (brokerService.pulsar().getConfig().isSystemTopicEnabled() && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) { @@ -963,7 +967,7 @@ protected void checkTopicFenced() throws BrokerServiceException { } protected void internalAddProducer(Producer producer) throws BrokerServiceException { - if (isProducersExceeded()) { + if (isProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index b8f8abc9a6265..e433f734aa9a5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -1505,4 +1505,34 @@ public void testWhenUpdateReplicationCluster() throws Exception { assertTrue(topic.getReplicators().isEmpty()); }); } + + @Test + public void testReplicatorProducerNotExceed() throws Exception { + log.info("--- testReplicatorProducerNotExceed ---"); + String namespace1 = "pulsar/ns11"; + admin1.namespaces().createNamespace(namespace1); + admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2")); + final TopicName dest1 = TopicName.get( + BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1")); + String namespace2 = "pulsar/ns22"; + admin2.namespaces().createNamespace(namespace2); + admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2")); + final TopicName dest2 = TopicName.get( + BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed2")); + admin1.topics().createPartitionedTopic(dest1.toString(), 1); + admin1.topicPolicies().setMaxProducers(dest1.toString(), 1); + admin2.topics().createPartitionedTopic(dest2.toString(), 1); + admin2.topicPolicies().setMaxProducers(dest2.toString(), 1); + @Cleanup + MessageProducer producer1 = new MessageProducer(url1, dest1); + log.info("--- Starting producer1 --- " + url1); + + producer1.produce(1); + + @Cleanup + MessageProducer producer2 = new MessageProducer(url2, dest2); + log.info("--- Starting producer2 --- " + url2); + + producer2.produce(1); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 92558b9a1402c..88501d711d88c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -270,17 +270,24 @@ private void testSystemTopicNotCheckExceed() throws Exception { admin.namespaces().createNamespace(ns, 2); admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1); - admin.topicPolicies().setMaxConsumers(topic, 0); + admin.topicPolicies().setMaxConsumers(topic, 1); NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory .createTopicPoliciesSystemTopicClient(NamespaceName.get(ns)); SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader(); + SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader(); - admin.topicPolicies().setMaxProducers(topic, 0); - admin.topicPolicies().setCompactionThreshold(topic, 1L); + admin.topicPolicies().setMaxProducers(topic, 1); + CompletableFuture f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L); + CompletableFuture f2 = admin.topicPolicies().setCompactionThresholdAsync(topic, 2L); + CompletableFuture f3 = admin.topicPolicies().setCompactionThresholdAsync(topic, 3L); + FutureUtil.waitForAll(List.of(f1, f2, f3)).join(); Assert.assertTrue(reader1.hasMoreEvents()); Assert.assertNotNull(reader1.readNext()); + Assert.assertTrue(reader2.hasMoreEvents()); + Assert.assertNotNull(reader2.readNext()); reader1.close(); + reader2.close(); } } From 997b050864b481584440d9df6334bdf49dd27073 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Wed, 16 Nov 2022 14:46:49 +0800 Subject: [PATCH 4/8] fix test. --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 4b4c11a0130ca..eaad01012ceb7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -450,7 +450,7 @@ protected boolean isProducersExceeded(Producer producer) { } private long getUserCreatedProducerSize() { - return producers.values().stream().filter(p -> !p.isRemote() || !p.getTopic().isSystemTopic()).count(); + return producers.values().stream().filter(p -> !(p.isRemote() || p.getTopic().isSystemTopic())).count(); } protected void registerTopicPolicyListener() { From ce0b2eb21c8b2377011114e3f239d12cabf269f5 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 22 Nov 2022 10:18:21 +0800 Subject: [PATCH 5/8] apply comment. --- .../apache/pulsar/broker/service/ReplicatorTest.java | 3 +++ .../broker/systopic/PartitionedSystemTopicTest.java | 10 +++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index e433f734aa9a5..7f31ce39c9620 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -75,6 +75,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; @@ -1534,5 +1535,7 @@ public void testReplicatorProducerNotExceed() throws Exception { log.info("--- Starting producer2 --- " + url2); producer2.produce(1); + + Assert.assertThrows(PulsarClientException.ProducerBusyException.class, () -> new MessageProducer(url2, dest2)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 88501d711d88c..342c017c403b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; @@ -278,16 +279,19 @@ private void testSystemTopicNotCheckExceed() throws Exception { SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader(); admin.topicPolicies().setMaxProducers(topic, 1); + + CompletableFuture> writer1 = systemTopicClientForNamespace.newWriterAsync(); + CompletableFuture> writer2 = systemTopicClientForNamespace.newWriterAsync(); CompletableFuture f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L); - CompletableFuture f2 = admin.topicPolicies().setCompactionThresholdAsync(topic, 2L); - CompletableFuture f3 = admin.topicPolicies().setCompactionThresholdAsync(topic, 3L); - FutureUtil.waitForAll(List.of(f1, f2, f3)).join(); + FutureUtil.waitForAll(List.of(writer1, writer2, f1)).join(); Assert.assertTrue(reader1.hasMoreEvents()); Assert.assertNotNull(reader1.readNext()); Assert.assertTrue(reader2.hasMoreEvents()); Assert.assertNotNull(reader2.readNext()); reader1.close(); reader2.close(); + writer1.get().close(); + writer2.get().close(); } } From ef991c8c50a15494c3b8998399cf3625d3c3657c Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 22 Nov 2022 10:20:04 +0800 Subject: [PATCH 6/8] apply comment. --- .../pulsar/broker/systopic/PartitionedSystemTopicTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 342c017c403b3..feb205fd39498 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -204,7 +204,7 @@ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception { } @Test - private void testSetBacklogCausedCreatingProducerFailure() throws Exception { + public void testSetBacklogCausedCreatingProducerFailure() throws Exception { final String ns = "prop/ns-test"; final String topic = ns + "/topic-1"; @@ -264,7 +264,7 @@ private void testSetBacklogCausedCreatingProducerFailure() throws Exception { } @Test - private void testSystemTopicNotCheckExceed() throws Exception { + public void testSystemTopicNotCheckExceed() throws Exception { final String ns = "prop/ns-test"; final String topic = ns + "/topic-1"; From a9de0043135a040e6e865cb703cb4fdf161e4bd9 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Wed, 23 Nov 2022 12:02:20 +0800 Subject: [PATCH 7/8] apply comment. --- .../pulsar/broker/service/AbstractTopic.java | 17 ++++++++++++----- .../systopic/PartitionedSystemTopicTest.java | 1 + 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index eaad01012ceb7..d8d524689a1c9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -443,14 +443,14 @@ protected boolean isProducersExceeded(Producer producer) { return false; } Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get(); - if (maxProducers != null && maxProducers > 0 && maxProducers <= getUserCreatedProducerSize()) { + if (maxProducers != null && maxProducers > 0 && maxProducers <= getUserCreatedProducersSize()) { return true; } return false; } - private long getUserCreatedProducerSize() { - return producers.values().stream().filter(p -> !(p.isRemote() || p.getTopic().isSystemTopic())).count(); + private long getUserCreatedProducersSize() { + return producers.values().stream().filter(p -> !p.isRemote()).count(); } protected void registerTopicPolicyListener() { @@ -494,14 +494,21 @@ public int getNumberOfSameAddressProducers(final String clientAddress) { } protected boolean isConsumersExceededOnTopic() { - int maxConsumersPerTopic = topicPolicies.getMaxConsumerPerTopic().get(); - if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) { + if (isSystemTopic()) { + return false; + } + Integer maxConsumersPerTopic = topicPolicies.getMaxConsumerPerTopic().get(); + if (maxConsumersPerTopic != null && maxConsumersPerTopic > 0 + && maxConsumersPerTopic <= getNumberOfConsumers()) { return true; } return false; } protected boolean isSameAddressConsumersExceededOnTopic(Consumer consumer) { + if (isSystemTopic()) { + return false; + } final int maxSameAddressConsumers = brokerService.pulsar().getConfiguration() .getMaxSameAddressConsumersPerTopic(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index feb205fd39498..7552f47392f2e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -271,6 +271,7 @@ public void testSystemTopicNotCheckExceed() throws Exception { admin.namespaces().createNamespace(ns, 2); admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1); + admin.namespaces().setMaxConsumersPerTopic(ns, 1); admin.topicPolicies().setMaxConsumers(topic, 1); NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory From 53710c1e355c7393c8c54f1e2fc0f7bee4d47c29 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Wed, 23 Nov 2022 12:04:19 +0800 Subject: [PATCH 8/8] remove blank. --- .../pulsar/broker/systopic/PartitionedSystemTopicTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 7552f47392f2e..e79197bb1b686 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -26,7 +26,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerConfig;