Skip to content

Commit

Permalink
[fix][broker] Increment topic stats outbound message counters after m…
Browse files Browse the repository at this point in the history
…essages have been written to the TCP/IP connection (apache#17043)

(cherry picked from commit 2bc933e)
  • Loading branch information
lhotari committed Sep 5, 2022
1 parent 894fde1 commit bb0b94b
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,19 @@ public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batc
topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get());
}
incrementUnackedMessages(unackedMessages);
msgOut.recordMultipleEvents(totalMessages, totalBytes);
msgOutCounter.add(totalMessages);
bytesOutCounter.add(totalBytes);
chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);


return cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx,
entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch);
Future<Void> writeAndFlushPromise =
cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx,
entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch);
writeAndFlushPromise.addListener(status -> {
// only increment counters after the messages have been successfully written to the TCP/IP connection
if (status.isSuccess()) {
msgOut.recordMultipleEvents(totalMessages, totalBytes);
msgOutCounter.add(totalMessages);
bytesOutCounter.add(totalBytes);
chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
}
});
return writeAndFlushPromise;
}

private void incrementUnackedMessages(int ackedMessages) {
Expand Down

0 comments on commit bb0b94b

Please sign in to comment.