From 51cac6b6668134231d86d736d25ef461b1865d93 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Wed, 23 Nov 2022 14:23:57 +0800 Subject: [PATCH] [improve][broker] System topic writer/reader connection not counted. (#18369) --- .../pulsar/broker/service/AbstractTopic.java | 24 ++++++++++--- .../service/persistent/PersistentTopic.java | 3 ++ .../pulsar/broker/service/ReplicatorTest.java | 33 +++++++++++++++++ .../systopic/PartitionedSystemTopicTest.java | 36 ++++++++++++++++++- 4 files changed, 90 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index ebd1ed2c79ded..d0451d5a5feb8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -420,14 +420,21 @@ private PublishRate publishRateInBroker(ServiceConfiguration config) { return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), config.getMaxPublishRatePerTopicInBytes()); } - protected boolean isProducersExceeded() { + protected boolean isProducersExceeded(Producer producer) { + if (isSystemTopic() || producer.isRemote()) { + return false; + } Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get(); - if (maxProducers > 0 && maxProducers <= producers.size()) { + if (maxProducers != null && maxProducers > 0 && maxProducers <= getUserCreatedProducersSize()) { return true; } return false; } + private long getUserCreatedProducersSize() { + return producers.values().stream().filter(p -> !p.isRemote()).count(); + } + protected void registerTopicPolicyListener() { if (brokerService.pulsar().getConfig().isSystemTopicEnabled() && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) { @@ -469,14 +476,21 @@ public int getNumberOfSameAddressProducers(final String clientAddress) { } protected boolean isConsumersExceededOnTopic() { - int maxConsumersPerTopic = topicPolicies.getMaxConsumerPerTopic().get(); - if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) { + if (isSystemTopic()) { + return false; + } + Integer maxConsumersPerTopic = topicPolicies.getMaxConsumerPerTopic().get(); + if (maxConsumersPerTopic != null && maxConsumersPerTopic > 0 + && maxConsumersPerTopic <= getNumberOfConsumers()) { return true; } return false; } protected boolean isSameAddressConsumersExceededOnTopic(Consumer consumer) { + if (isSystemTopic()) { + return false; + } final int maxSameAddressConsumers = brokerService.pulsar().getConfiguration() .getMaxSameAddressConsumersPerTopic(); @@ -891,7 +905,7 @@ protected void checkTopicFenced() throws BrokerServiceException { } protected void internalAddProducer(Producer producer) throws BrokerServiceException { - if (isProducersExceeded()) { + if (isProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index cd2324e2fe542..5f12bbf588637 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3125,6 +3125,9 @@ public MessageDeduplication getMessageDeduplication() { } private boolean checkMaxSubscriptionsPerTopicExceed(String subscriptionName) { + if (isSystemTopic()) { + return false; + } //Existing subscriptions are not affected if (StringUtils.isNotEmpty(subscriptionName) && getSubscription(subscriptionName) != null) { return false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index e736aa86d7b3b..aeb35f4d39acd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -76,6 +76,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; @@ -1536,4 +1537,36 @@ public void testWhenUpdateReplicationCluster() throws Exception { assertTrue(topic.getReplicators().isEmpty()); }); } + + @Test + public void testReplicatorProducerNotExceed() throws Exception { + log.info("--- testReplicatorProducerNotExceed ---"); + String namespace1 = "pulsar/ns11"; + admin1.namespaces().createNamespace(namespace1); + admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2")); + final TopicName dest1 = TopicName.get( + BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1")); + String namespace2 = "pulsar/ns22"; + admin2.namespaces().createNamespace(namespace2); + admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2")); + final TopicName dest2 = TopicName.get( + BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed2")); + admin1.topics().createPartitionedTopic(dest1.toString(), 1); + admin1.topicPolicies().setMaxProducers(dest1.toString(), 1); + admin2.topics().createPartitionedTopic(dest2.toString(), 1); + admin2.topicPolicies().setMaxProducers(dest2.toString(), 1); + @Cleanup + MessageProducer producer1 = new MessageProducer(url1, dest1); + log.info("--- Starting producer1 --- " + url1); + + producer1.produce(1); + + @Cleanup + MessageProducer producer2 = new MessageProducer(url2, dest2); + log.info("--- Starting producer2 --- " + url2); + + producer2.produce(1); + + Assert.assertThrows(PulsarClientException.ProducerBusyException.class, () -> new MessageProducer(url2, dest2)); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index cf45d614f0fb4..89834d300f665 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.events.EventsTopicNames; +import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -179,7 +180,7 @@ public void testHealthCheckTopicNotOffload() throws Exception { } @Test - private void testSetBacklogCausedCreatingProducerFailure() throws Exception { + public void testSetBacklogCausedCreatingProducerFailure() throws Exception { final String ns = "prop/ns-test"; final String topic = ns + "/topic-1"; @@ -234,4 +235,37 @@ private void testSetBacklogCausedCreatingProducerFailure() throws Exception { Assert.fail("failed to create producer"); } } + + @Test + public void testSystemTopicNotCheckExceed() throws Exception { + final String ns = "prop/ns-test"; + final String topic = ns + "/topic-1"; + + admin.namespaces().createNamespace(ns, 2); + admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1); + + admin.namespaces().setMaxConsumersPerTopic(ns, 1); + admin.topicPolicies().setMaxConsumers(topic, 1); + NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); + TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory + .createTopicPoliciesSystemTopicClient(NamespaceName.get(ns)); + SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader(); + SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader(); + + admin.topicPolicies().setMaxProducers(topic, 1); + + CompletableFuture> writer1 = systemTopicClientForNamespace.newWriterAsync(); + CompletableFuture> writer2 = systemTopicClientForNamespace.newWriterAsync(); + CompletableFuture f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L); + + FutureUtil.waitForAll(List.of(writer1, writer2, f1)).join(); + Assert.assertTrue(reader1.hasMoreEvents()); + Assert.assertNotNull(reader1.readNext()); + Assert.assertTrue(reader2.hasMoreEvents()); + Assert.assertNotNull(reader2.readNext()); + reader1.close(); + reader2.close(); + writer1.get().close(); + writer2.get().close(); + } }