From 83c7cb35bfa59beec97e269afc7c22b43bc9be85 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Tue, 11 Feb 2020 23:32:25 +0800 Subject: [PATCH] Apply comments --- conf/broker.conf | 4 + .../pulsar/broker/ServiceConfiguration.java | 6 + .../pulsar/broker/service/AbstractTopic.java | 23 ++- .../pulsar/broker/service/BrokerService.java | 69 +++++---- .../pulsar/broker/service/ServerCnx.java | 40 +++++- .../apache/pulsar/broker/service/Topic.java | 2 - .../MessagePublishBufferThrottleTest.java | 135 +++++++++++++----- 7 files changed, 201 insertions(+), 78 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 9c41153ec5fc8f..0d0da38fb9fa61 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -332,6 +332,10 @@ replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10 # Use -1 to disable the memory limitation. Default is 1/5 of direct memory. maxMessagePublishBufferSizeInMB= +# Interval between checks to see if message publish buffer size is exceed the max message publish buffer size +# Use 0 or negative number to disable the max publish buffer limiting. +messagePublishBufferCheckIntervalInMills=100 + ### --- Authentication --- ### # Role names that are treated as "proxy roles". If the broker sees a request with #role as proxyRoles - it will demand to see a valid original principal. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index cb0f112fee28c6..fbd5f5bf565b07 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -614,6 +614,12 @@ public class ServiceConfiguration implements PulsarConfiguration { private int maxMessagePublishBufferSizeInMB = Math.max(64, (int) (PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024))); + @FieldContext( + category = CATEGORY_SERVER, + doc = "Interval between checks to see if message publish buffer size is exceed the max message publish buffer size" + ) + private int messagePublishBufferCheckIntervalInMills = 100; + /**** --- Messaging Protocols --- ****/ @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index bcb7041f36a7f3..2cf6ad43b78f37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -275,7 +275,7 @@ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { public void resetTopicPublishCountAndEnableReadIfRequired() { // broker rate not exceeded. and completed topic limiter reset. if (!getBrokerPublishRateLimiter().isPublishRateExceeded() && topicPublishRateLimiter.resetPublishCount()) { - enableProducerRead(); + enableProducerReadForPublishRateLimiting(); } } @@ -283,17 +283,28 @@ public void resetTopicPublishCountAndEnableReadIfRequired() { public void resetBrokerPublishCountAndEnableReadIfRequired(boolean doneBrokerReset) { // topic rate not exceeded, and completed broker limiter reset. if (!topicPublishRateLimiter.isPublishRateExceeded() && doneBrokerReset) { - enableProducerRead(); + enableProducerReadForPublishRateLimiting(); } } /** * it sets cnx auto-readable if producer's cnx is disabled due to publish-throttling */ - @Override - public void enableProducerRead() { + public void enableProducerReadForPublishRateLimiting() { + if (producers != null) { + producers.values().forEach(producer -> { + producer.getCnx().cancelPublishRateLimiting(); + producer.getCnx().enableCnxAutoRead(); + }); + } + } + + public void enableProducerReadForPublishBufferLimiting() { if (producers != null) { - producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead()); + producers.values().forEach(producer -> { + producer.getCnx().cancelPublishBufferLimiting(); + producer.getCnx().enableCnxAutoRead(); + }); } } @@ -388,7 +399,7 @@ private void updatePublishDispatcher(Policies policies) { } else { log.info("Disabling publish throttling for {}", this.topic); this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER; - enableProducerRead(); + enableProducerReadForPublishRateLimiting(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ed5b5305d17522..d1f933baf3549e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -187,6 +187,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener 0 ? pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() * 1024 * 1024 : -1; this.resumeProducerReadMessagePublishBufferSize = this.maxMessagePublishBufferSize / 2; - this.currentMessagePublishBufferSize = new AtomicLong(0); + this.currentMessagePublishBufferSize = 0; this.managedLedgerFactory = pulsar.getManagedLedgerFactory(); this.topics = new ConcurrentOpenHashMap<>(); this.replicationClients = new ConcurrentOpenHashMap<>(); @@ -272,6 +273,8 @@ public BrokerService(PulsarService pulsar) throws Exception { .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-msg-expiry-monitor")); this.compactionMonitor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-compaction-monitor")); + this.messagePublishBufferMonitor = + Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor")); this.backlogQuotaManager = new BacklogQuotaManager(pulsar); this.backlogQuotaChecker = Executors @@ -401,6 +404,7 @@ public void start() throws Exception { this.startInactivityMonitor(); this.startMessageExpiryMonitor(); this.startCompactionMonitor(); + this.startMessagePublishBufferMonitor(); this.startBacklogQuotaChecker(); this.updateBrokerPublisherThrottlingMaxRate(); // register listener to capture zk-latency @@ -453,6 +457,14 @@ protected void startCompactionMonitor() { } } + protected void startMessagePublishBufferMonitor() { + int interval = pulsar().getConfiguration().getMessagePublishBufferCheckIntervalInMills(); + if (interval > 0 && maxMessagePublishBufferSize > 0) { + messagePublishBufferMonitor.scheduleAtFixedRate(safeRun(this::checkMessagePublishBuffer), + interval, interval, TimeUnit.MILLISECONDS); + } + } + protected void startBacklogQuotaChecker() { if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) { final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds(); @@ -2026,48 +2038,35 @@ public Optional getListenPortTls() { } } - private void enableTopicsAutoRead() { - topics.values().forEach(future -> { - if (future.isDone() && !future.isCompletedExceptionally()) { - try { - future.get().ifPresent(Topic::enableProducerRead); - } catch (InterruptedException | ExecutionException e) { - // no-op - } - } - }); - } - - @VisibleForTesting - boolean increasePublishBufferSizeAndCheckStopRead(int msgSize) { - if (maxMessagePublishBufferSize < 0) { - return false; - } - if (currentMessagePublishBufferSize.addAndGet(msgSize) >= maxMessagePublishBufferSize && - !isMessagePublishBufferThreshold) { + private void checkMessagePublishBuffer() { + currentMessagePublishBufferSize = 0; + foreachProducer(producer -> currentMessagePublishBufferSize += producer.getCnx().getMessagePublishBufferSize()); + if (currentMessagePublishBufferSize >= maxMessagePublishBufferSize + && !isMessagePublishBufferThreshold) { isMessagePublishBufferThreshold = true; messagePublishBufferThrottleTimes++; } - return isMessagePublishBufferThreshold; - } - - @VisibleForTesting - boolean decreasePublishBufferSizeAndCheckResumeRead(int msgSize) { - if (maxMessagePublishBufferSize < 0) { - return false; - } - if (currentMessagePublishBufferSize.addAndGet(-msgSize) < resumeProducerReadMessagePublishBufferSize && - isMessagePublishBufferThreshold) { + if (currentMessagePublishBufferSize < resumeProducerReadMessagePublishBufferSize + && isMessagePublishBufferThreshold) { isMessagePublishBufferThreshold = false; messagePublishBufferResumeTimes++; - enableTopicsAutoRead(); - return true; + forEachTopic(topic -> ((AbstractTopic) topic).enableProducerReadForPublishBufferLimiting()); } - return false; + } + + private void foreachProducer(Consumer consumer) { + topics.forEach((n, t) -> { + Optional topic = extractTopic(t); + topic.ifPresent(value -> value.getProducers().values().forEach(consumer)); + }); + } + + public boolean isMessagePublishBufferThreshold() { + return isMessagePublishBufferThreshold; } @VisibleForTesting - AtomicLong getCurrentMessagePublishBufferSize() { + long getCurrentMessagePublishBufferSize() { return currentMessagePublishBufferSize; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 437a13e08bc151..59275657049727 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -25,6 +25,7 @@ import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse; import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import io.netty.buffer.ByteBuf; @@ -146,6 +147,9 @@ public class ServerCnx extends PulsarHandler { // Flag to manage throttling-rate by atomically enable/disable read-channel. private volatile boolean autoReadDisabledRateLimiting = false; + // Flag to manage throttling-publish-buffer by atomically enable/disable read-channel. + private volatile boolean autoReadDisabledPublishBufferLimiting = false; + private volatile long messagePublishBufferSize = 0; enum State { Start, Connected, Failed, Connecting @@ -1578,17 +1582,20 @@ public boolean isWritable() { } private void startSendOperation(Producer producer, int msgSize) { + messagePublishBufferSize += msgSize; boolean isPublishRateExceeded = producer.getTopic().isPublishRateExceeded(); - if (++pendingSendRequest == MaxPendingSendRequests | isPublishRateExceeded | service.increasePublishBufferSizeAndCheckStopRead(msgSize)) { + if (++pendingSendRequest == MaxPendingSendRequests | isPublishRateExceeded | getBrokerService().isMessagePublishBufferThreshold()) { // When the quota of pending send requests is reached, stop reading from socket to cause backpressure on // client connection, possibly shared between multiple producers ctx.channel().config().setAutoRead(false); autoReadDisabledRateLimiting = isPublishRateExceeded; + autoReadDisabledPublishBufferLimiting = true; } } void completedSendOperation(boolean isNonPersistentTopic, int msgSize) { - if (--pendingSendRequest == ResumeReadsThreshold | service.decreasePublishBufferSizeAndCheckResumeRead(msgSize)) { + messagePublishBufferSize -= msgSize; + if (--pendingSendRequest == ResumeReadsThreshold) { // Resume reading from socket ctx.channel().config().setAutoRead(true); // triggers channel read if autoRead couldn't trigger it @@ -1603,14 +1610,27 @@ void enableCnxAutoRead() { // we can add check (&& pendingSendRequest < MaxPendingSendRequests) here but then it requires // pendingSendRequest to be volatile and it can be expensive while writing. also this will be called on if // throttling is enable on the topic. so, avoid pendingSendRequest check will be fine. - if (!ctx.channel().config().isAutoRead() && autoReadDisabledRateLimiting) { + if (!ctx.channel().config().isAutoRead() && !autoReadDisabledRateLimiting && !autoReadDisabledPublishBufferLimiting) { // Resume reading from socket if pending-request is not reached to threshold ctx.channel().config().setAutoRead(true); // triggers channel read ctx.read(); + } + } + + @VisibleForTesting + void cancelPublishRateLimiting() { + if (autoReadDisabledRateLimiting) { autoReadDisabledRateLimiting = false; } } + + @VisibleForTesting + void cancelPublishBufferLimiting() { + if (autoReadDisabledPublishBufferLimiting) { + autoReadDisabledPublishBufferLimiting = false; + } + } private ServerError getErrorCode(CompletableFuture future) { ServerError error = ServerError.UnknownError; @@ -1695,4 +1715,18 @@ public boolean isBatchMessageCompatibleVersion() { public String getClientVersion() { return clientVersion; } + + public long getMessagePublishBufferSize() { + return this.messagePublishBufferSize; + } + + @VisibleForTesting + void setMessagePublishBufferSize(long bufferSize) { + this.messagePublishBufferSize = bufferSize; + } + + @VisibleForTesting + void setAutoReadDisabledRateLimiting(boolean isLimiting) { + this.autoReadDisabledRateLimiting = isLimiting; + } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 3f78f1c94b18e4..26af1c1c5c8bf1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -201,6 +201,4 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats default Optional getDispatchRateLimiter() { return Optional.empty(); } - - void enableProducerRead(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java index 3307063791753a..0c181de787a8bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java @@ -27,6 +27,8 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** */ @@ -45,53 +47,122 @@ protected void cleanup() throws Exception { @Test public void testMessagePublishBufferThrottleDisabled() throws Exception { conf.setMaxMessagePublishBufferSizeInMB(-1); + conf.setMessagePublishBufferCheckIntervalInMills(10); super.baseSetup(); - Assert.assertFalse(pulsar.getBrokerService().increasePublishBufferSizeAndCheckStopRead(1)); - Assert.assertFalse(pulsar.getBrokerService().decreasePublishBufferSizeAndCheckResumeRead(1)); + final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleDisabled"; + Producer producer = pulsarClient.newProducer() + .topic(topic) + .producerName("producer-name") + .create(); + Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get(); + Assert.assertNotNull(topicRef); + ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2); + Thread.sleep(20); + Assert.assertFalse(pulsar.getBrokerService().isMessagePublishBufferThreshold()); + List> futures = new ArrayList<>(); + // Make sure the producer can publish succeed. + for (int i = 0; i < 10; i++) { + futures.add(producer.sendAsync(new byte[1024 * 1024])); + } + FutureUtil.waitForAll(futures).get(); + for (CompletableFuture future : futures) { + Assert.assertNotNull(future.get()); + } + Thread.sleep(4); + Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L); + super.internalCleanup(); } @Test - public void testMessagePublishBufferThrottle() throws Exception { + public void testMessagePublishBufferThrottleEnable() throws Exception { conf.setMaxMessagePublishBufferSizeInMB(1); + conf.setMessagePublishBufferCheckIntervalInMills(2); super.baseSetup(); - Assert.assertFalse(pulsar.getBrokerService().increasePublishBufferSizeAndCheckStopRead(512 * 1024)); - Assert.assertTrue(pulsar.getBrokerService().increasePublishBufferSizeAndCheckStopRead(512 * 1024)); - Assert.assertTrue(pulsar.getBrokerService().increasePublishBufferSizeAndCheckStopRead( 1024)); - Assert.assertFalse(pulsar.getBrokerService().decreasePublishBufferSizeAndCheckResumeRead(1024)); - Assert.assertFalse(pulsar.getBrokerService().decreasePublishBufferSizeAndCheckResumeRead(512 * 1024)); - Assert.assertTrue(pulsar.getBrokerService().decreasePublishBufferSizeAndCheckResumeRead(1024)); - Assert.assertFalse(pulsar.getBrokerService().decreasePublishBufferSizeAndCheckResumeRead(1024)); - Assert.assertTrue(pulsar.getBrokerService().increasePublishBufferSizeAndCheckStopRead(514 * 1024)); + Thread.sleep(4); + Assert.assertFalse(pulsar.getBrokerService().isMessagePublishBufferThreshold()); + final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable"; + Producer producer = pulsarClient.newProducer() + .topic(topic) + .producerName("producer-name") + .create(); + Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get(); + Assert.assertNotNull(topicRef); + ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2); + Thread.sleep(4); + Assert.assertTrue(pulsar.getBrokerService().isMessagePublishBufferThreshold()); + // The first message can publish success, but the second message should be blocked + producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS); + MessageId messageId = null; + try { + messageId = producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS); + Assert.fail("should failed, because producer blocked by publish buffer limiting"); + } catch (TimeoutException e) { + // No-op + } + Assert.assertNull(messageId); + + ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0L); + Thread.sleep(4); + + List> futures = new ArrayList<>(); + // Make sure the producer can publish succeed. + for (int i = 0; i < 10; i++) { + futures.add(producer.sendAsync(new byte[1024 * 1024])); + } + FutureUtil.waitForAll(futures).get(); + for (CompletableFuture future : futures) { + Assert.assertNotNull(future.get()); + } + Thread.sleep(4); + Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L); + super.internalCleanup(); } @Test - public void testCurrentPublishBufferShouldBeZeroWhenComplete() throws Exception { + public void testBlockByPublishRateLimiting() throws Exception { conf.setMaxMessagePublishBufferSizeInMB(1); + conf.setMessagePublishBufferCheckIntervalInMills(2); super.baseSetup(); + Thread.sleep(4); + Assert.assertFalse(pulsar.getBrokerService().isMessagePublishBufferThreshold()); + final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable"; + Producer producer = pulsarClient.newProducer() + .topic(topic) + .producerName("producer-name") + .create(); + Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get(); + Assert.assertNotNull(topicRef); + ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2); + producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS); - List> futures = new ArrayList<>(); - final int messages = 200; - final int producers = 10; - - List> producerList = new ArrayList<>(); - for (int i = 0; i < producers; i++) { - Producer producer = pulsarClient.newProducer() - .topic("persistent://prop/ns-abc/testCurrentPublishBufferShouldBeZeroWhenComplete") - .enableBatching(false) - .create(); - producerList.add(producer); + Thread.sleep(4); + ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setAutoReadDisabledRateLimiting(true); + ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0); + Thread.sleep(4); + Assert.assertFalse(pulsar.getBrokerService().isMessagePublishBufferThreshold()); + MessageId messageId = null; + try { + messageId = producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS); + Assert.fail("should failed, because producer blocked by publish buffer limiting"); + } catch (TimeoutException e) { + // No-op } + Assert.assertNull(messageId); - for (Producer producer : producerList) { - for (int j = 0; j < messages; j++) { - futures.add(producer.sendAsync(new byte[1024])); - } - } + ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setAutoReadDisabledRateLimiting(false); + ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead(); + List> futures = new ArrayList<>(); + // Make sure the producer can publish succeed. + for (int i = 0; i < 10; i++) { + futures.add(producer.sendAsync(new byte[1024 * 1024])); + } FutureUtil.waitForAll(futures).get(); - Assert.assertEquals(futures.size(), messages * producers); - Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize().get(), 0); - Assert.assertTrue(pulsar.getBrokerService().messagePublishBufferThrottleTimes > 0 - && pulsar.getBrokerService().messagePublishBufferThrottleTimes == pulsar.getBrokerService().messagePublishBufferResumeTimes); + for (CompletableFuture future : futures) { + Assert.assertNotNull(future.get()); + } + Thread.sleep(4); + Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L); + super.internalCleanup(); } }