Skip to content

Commit

Permalink
Apply comments
Browse files Browse the repository at this point in the history
  • Loading branch information
codelipenghui committed Feb 11, 2020
1 parent 7ce1f44 commit 83c7cb3
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 78 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10
# Use -1 to disable the memory limitation. Default is 1/5 of direct memory.
maxMessagePublishBufferSizeInMB=

# Interval between checks to see if message publish buffer size is exceed the max message publish buffer size
# Use 0 or negative number to disable the max publish buffer limiting.
messagePublishBufferCheckIntervalInMills=100

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int maxMessagePublishBufferSizeInMB = Math.max(64,
(int) (PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024)));

@FieldContext(
category = CATEGORY_SERVER,
doc = "Interval between checks to see if message publish buffer size is exceed the max message publish buffer size"
)
private int messagePublishBufferCheckIntervalInMills = 100;

/**** --- Messaging Protocols --- ****/

@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,25 +275,36 @@ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
public void resetTopicPublishCountAndEnableReadIfRequired() {
// broker rate not exceeded. and completed topic limiter reset.
if (!getBrokerPublishRateLimiter().isPublishRateExceeded() && topicPublishRateLimiter.resetPublishCount()) {
enableProducerRead();
enableProducerReadForPublishRateLimiting();
}
}

@Override
public void resetBrokerPublishCountAndEnableReadIfRequired(boolean doneBrokerReset) {
// topic rate not exceeded, and completed broker limiter reset.
if (!topicPublishRateLimiter.isPublishRateExceeded() && doneBrokerReset) {
enableProducerRead();
enableProducerReadForPublishRateLimiting();
}
}

/**
* it sets cnx auto-readable if producer's cnx is disabled due to publish-throttling
*/
@Override
public void enableProducerRead() {
public void enableProducerReadForPublishRateLimiting() {
if (producers != null) {
producers.values().forEach(producer -> {
producer.getCnx().cancelPublishRateLimiting();
producer.getCnx().enableCnxAutoRead();
});
}
}

public void enableProducerReadForPublishBufferLimiting() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
producers.values().forEach(producer -> {
producer.getCnx().cancelPublishBufferLimiting();
producer.getCnx().enableCnxAutoRead();
});
}
}

Expand Down Expand Up @@ -388,7 +399,7 @@ private void updatePublishDispatcher(Policies policies) {
} else {
log.info("Disabling publish throttling for {}", this.topic);
this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProducerRead();
enableProducerReadForPublishRateLimiting();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
private final ScheduledExecutorService compactionMonitor;
private final ScheduledExecutorService messagePublishBufferMonitor;
private ScheduledExecutorService topicPublishRateLimiterMonitor;
private ScheduledExecutorService brokerPublishRateLimiterMonitor;
protected volatile PublishRateLimiter brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
Expand Down Expand Up @@ -220,7 +221,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies

private final long maxMessagePublishBufferSize;
private final long resumeProducerReadMessagePublishBufferSize;
private final AtomicLong currentMessagePublishBufferSize;
private volatile long currentMessagePublishBufferSize;
private volatile boolean isMessagePublishBufferThreshold;
@VisibleForTesting
int messagePublishBufferThrottleTimes;
Expand All @@ -232,7 +233,7 @@ public BrokerService(PulsarService pulsar) throws Exception {
this.maxMessagePublishBufferSize = pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() > 0 ?
pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() * 1024 * 1024 : -1;
this.resumeProducerReadMessagePublishBufferSize = this.maxMessagePublishBufferSize / 2;
this.currentMessagePublishBufferSize = new AtomicLong(0);
this.currentMessagePublishBufferSize = 0;
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
this.topics = new ConcurrentOpenHashMap<>();
this.replicationClients = new ConcurrentOpenHashMap<>();
Expand Down Expand Up @@ -272,6 +273,8 @@ public BrokerService(PulsarService pulsar) throws Exception {
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-msg-expiry-monitor"));
this.compactionMonitor =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-compaction-monitor"));
this.messagePublishBufferMonitor =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor"));

this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
this.backlogQuotaChecker = Executors
Expand Down Expand Up @@ -401,6 +404,7 @@ public void start() throws Exception {
this.startInactivityMonitor();
this.startMessageExpiryMonitor();
this.startCompactionMonitor();
this.startMessagePublishBufferMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
// register listener to capture zk-latency
Expand Down Expand Up @@ -453,6 +457,14 @@ protected void startCompactionMonitor() {
}
}

protected void startMessagePublishBufferMonitor() {
int interval = pulsar().getConfiguration().getMessagePublishBufferCheckIntervalInMills();
if (interval > 0 && maxMessagePublishBufferSize > 0) {
messagePublishBufferMonitor.scheduleAtFixedRate(safeRun(this::checkMessagePublishBuffer),
interval, interval, TimeUnit.MILLISECONDS);
}
}

protected void startBacklogQuotaChecker() {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
Expand Down Expand Up @@ -2026,48 +2038,35 @@ public Optional<Integer> getListenPortTls() {
}
}

private void enableTopicsAutoRead() {
topics.values().forEach(future -> {
if (future.isDone() && !future.isCompletedExceptionally()) {
try {
future.get().ifPresent(Topic::enableProducerRead);
} catch (InterruptedException | ExecutionException e) {
// no-op
}
}
});
}

@VisibleForTesting
boolean increasePublishBufferSizeAndCheckStopRead(int msgSize) {
if (maxMessagePublishBufferSize < 0) {
return false;
}
if (currentMessagePublishBufferSize.addAndGet(msgSize) >= maxMessagePublishBufferSize &&
!isMessagePublishBufferThreshold) {
private void checkMessagePublishBuffer() {
currentMessagePublishBufferSize = 0;
foreachProducer(producer -> currentMessagePublishBufferSize += producer.getCnx().getMessagePublishBufferSize());
if (currentMessagePublishBufferSize >= maxMessagePublishBufferSize
&& !isMessagePublishBufferThreshold) {
isMessagePublishBufferThreshold = true;
messagePublishBufferThrottleTimes++;
}
return isMessagePublishBufferThreshold;
}

@VisibleForTesting
boolean decreasePublishBufferSizeAndCheckResumeRead(int msgSize) {
if (maxMessagePublishBufferSize < 0) {
return false;
}
if (currentMessagePublishBufferSize.addAndGet(-msgSize) < resumeProducerReadMessagePublishBufferSize &&
isMessagePublishBufferThreshold) {
if (currentMessagePublishBufferSize < resumeProducerReadMessagePublishBufferSize
&& isMessagePublishBufferThreshold) {
isMessagePublishBufferThreshold = false;
messagePublishBufferResumeTimes++;
enableTopicsAutoRead();
return true;
forEachTopic(topic -> ((AbstractTopic) topic).enableProducerReadForPublishBufferLimiting());
}
return false;
}

private void foreachProducer(Consumer<Producer> consumer) {
topics.forEach((n, t) -> {
Optional<Topic> topic = extractTopic(t);
topic.ifPresent(value -> value.getProducers().values().forEach(consumer));
});
}

public boolean isMessagePublishBufferThreshold() {
return isMessagePublishBufferThreshold;
}

@VisibleForTesting
AtomicLong getCurrentMessagePublishBufferSize() {
long getCurrentMessagePublishBufferSize() {
return currentMessagePublishBufferSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -146,6 +147,9 @@ public class ServerCnx extends PulsarHandler {

// Flag to manage throttling-rate by atomically enable/disable read-channel.
private volatile boolean autoReadDisabledRateLimiting = false;
// Flag to manage throttling-publish-buffer by atomically enable/disable read-channel.
private volatile boolean autoReadDisabledPublishBufferLimiting = false;
private volatile long messagePublishBufferSize = 0;

enum State {
Start, Connected, Failed, Connecting
Expand Down Expand Up @@ -1578,17 +1582,20 @@ public boolean isWritable() {
}

private void startSendOperation(Producer producer, int msgSize) {
messagePublishBufferSize += msgSize;
boolean isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
if (++pendingSendRequest == MaxPendingSendRequests | isPublishRateExceeded | service.increasePublishBufferSizeAndCheckStopRead(msgSize)) {
if (++pendingSendRequest == MaxPendingSendRequests | isPublishRateExceeded | getBrokerService().isMessagePublishBufferThreshold()) {
// When the quota of pending send requests is reached, stop reading from socket to cause backpressure on
// client connection, possibly shared between multiple producers
ctx.channel().config().setAutoRead(false);
autoReadDisabledRateLimiting = isPublishRateExceeded;
autoReadDisabledPublishBufferLimiting = true;
}
}

void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
if (--pendingSendRequest == ResumeReadsThreshold | service.decreasePublishBufferSizeAndCheckResumeRead(msgSize)) {
messagePublishBufferSize -= msgSize;
if (--pendingSendRequest == ResumeReadsThreshold) {
// Resume reading from socket
ctx.channel().config().setAutoRead(true);
// triggers channel read if autoRead couldn't trigger it
Expand All @@ -1603,14 +1610,27 @@ void enableCnxAutoRead() {
// we can add check (&& pendingSendRequest < MaxPendingSendRequests) here but then it requires
// pendingSendRequest to be volatile and it can be expensive while writing. also this will be called on if
// throttling is enable on the topic. so, avoid pendingSendRequest check will be fine.
if (!ctx.channel().config().isAutoRead() && autoReadDisabledRateLimiting) {
if (!ctx.channel().config().isAutoRead() && !autoReadDisabledRateLimiting && !autoReadDisabledPublishBufferLimiting) {
// Resume reading from socket if pending-request is not reached to threshold
ctx.channel().config().setAutoRead(true);
// triggers channel read
ctx.read();
}
}

@VisibleForTesting
void cancelPublishRateLimiting() {
if (autoReadDisabledRateLimiting) {
autoReadDisabledRateLimiting = false;
}
}

@VisibleForTesting
void cancelPublishBufferLimiting() {
if (autoReadDisabledPublishBufferLimiting) {
autoReadDisabledPublishBufferLimiting = false;
}
}

private <T> ServerError getErrorCode(CompletableFuture<T> future) {
ServerError error = ServerError.UnknownError;
Expand Down Expand Up @@ -1695,4 +1715,18 @@ public boolean isBatchMessageCompatibleVersion() {
public String getClientVersion() {
return clientVersion;
}

public long getMessagePublishBufferSize() {
return this.messagePublishBufferSize;
}

@VisibleForTesting
void setMessagePublishBufferSize(long bufferSize) {
this.messagePublishBufferSize = bufferSize;
}

@VisibleForTesting
void setAutoReadDisabledRateLimiting(boolean isLimiting) {
this.autoReadDisabledRateLimiting = isLimiting;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,4 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats
default Optional<DispatchRateLimiter> getDispatchRateLimiter() {
return Optional.empty();
}

void enableProducerRead();
}
Loading

0 comments on commit 83c7cb3

Please sign in to comment.