Skip to content

Commit

Permalink
Fix race condition of OpSendMsgQueue when publishing messages (#14231)
Browse files Browse the repository at this point in the history
* Add synchronized for getPendingQueueSize();

* Use iterator instead.

* Use counter to keep track of messages count

* Changed to int

Co-authored-by: Matteo Merli <mmerli@apache.org>
  • Loading branch information
codelipenghui and merlimat authored Feb 12, 2022
1 parent 7b169be commit 2c9af46
Showing 1 changed file with 16 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -1027,7 +1026,7 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le
if (sequenceId > op.sequenceId) {
log.warn("[{}] [{}] Got ack for msg. expecting: {} - {} - got: {} - {} - queue-size: {}",
topic, producerName, op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId,
pendingMessages.size());
pendingMessages.messagesCount());
// Force connection closing so that messages can be re-transmitted in a new connection
cnx.channel().close();
return;
Expand All @@ -1051,7 +1050,7 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le
log.warn("[{}] [{}] Got ack for batch msg error. expecting: {} - {} - got: {} - {} - queue-size: {}"
+ "",
topic, producerName, op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId,
pendingMessages.size());
pendingMessages.messagesCount());
// Force connection closing so that messages can be re-transmitted in a new connection
cnx.channel().close();
return;
Expand Down Expand Up @@ -1429,6 +1428,7 @@ protected static class OpSendMsgQueue implements Iterable<OpSendMsg> {
private final Queue<OpSendMsg> delegate = new ArrayDeque<>();
private int forEachDepth = 0;
private List<OpSendMsg> postponedOpSendMgs;
private final AtomicInteger messagesCount = new AtomicInteger(0);

@Override
public void forEach(Consumer<? super OpSendMsg> action) {
Expand All @@ -1449,6 +1449,7 @@ public void forEach(Consumer<? super OpSendMsg> action) {

public boolean add(OpSendMsg o) {
// postpone adding to the queue while forEach iteration is in progress
messagesCount.addAndGet(o.numMessagesInBatch);
if (forEachDepth > 0) {
if (postponedOpSendMgs == null) {
postponedOpSendMgs = new ArrayList<>();
Expand All @@ -1461,18 +1462,22 @@ public boolean add(OpSendMsg o) {

public void clear() {
delegate.clear();
messagesCount.set(0);
}

public void remove() {
delegate.remove();
OpSendMsg op = delegate.remove();
if (op != null) {
messagesCount.addAndGet(-op.numMessagesInBatch);
}
}

public OpSendMsg peek() {
return delegate.peek();
}

public int size() {
return delegate.size();
public int messagesCount() {
return messagesCount.get();
}

@Override
Expand Down Expand Up @@ -1641,7 +1646,7 @@ public void connectionOpened(final ClientCnx cnx) {

if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Pending messages: {}", topic, producerName,
pendingMessages.size());
pendingMessages.messagesCount());
}

PulsarClientException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException(
Expand Down Expand Up @@ -1743,7 +1748,7 @@ private void resendMessages(ClientCnx cnx, long expectedEpoch) {
cnx.channel().close();
return;
}
int messagesToResend = pendingMessages.size();
int messagesToResend = pendingMessages.messagesCount();
if (messagesToResend == 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] No pending messages to resend {}", topic, producerName, messagesToResend);
Expand Down Expand Up @@ -1849,7 +1854,7 @@ public void run(Timeout timeout) throws Exception {
// The diff is less than or equal to zero, meaning that the message has been timed out.
// Set the callback to timeout on every message, then clear the pending queue.
log.info("[{}] [{}] Message send timed out. Failing {} messages", topic, producerName,
pendingMessages.size());
pendingMessages.messagesCount());

PulsarClientException te = new PulsarClientException.TimeoutException(
format("The producer %s can not send message to the topic %s within given timeout",
Expand Down Expand Up @@ -2019,7 +2024,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
// called again once the new connection registers the producer with the broker.
log.info("[{}][{}] Producer epoch mismatch or the current connection is null. Skip re-sending the "
+ " {} pending messages since they will deliver using another connection.", topic, producerName,
pendingMessages.size());
pendingMessages.messagesCount());
return;
}
final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
Expand Down Expand Up @@ -2091,14 +2096,7 @@ public String getConnectedSince() {
}

public int getPendingQueueSize() {
if (!isBatchMessagingEnabled()) {
return pendingMessages.size();
}
MutableInt size = new MutableInt(0);
pendingMessages.forEach(op -> {
size.add(Math.max(op.numMessagesInBatch, 1));
});
return size.getValue();
return pendingMessages.messagesCount();
}

@Override
Expand Down

0 comments on commit 2c9af46

Please sign in to comment.