From aba50f2a276412bc43a4652b9ce303d384f71966 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 18 Jul 2023 11:22:41 +0800 Subject: [PATCH] [fix] [cli] the variable producerName of BatchMsgContainer is null (#20819) Motivation: If the producer name is generated by the Broker, the producer will update the variable `producerName` after connecting, but not update the same variable of the batch message container. Modifications: fix bug --- .../apache/pulsar/broker/service/ServerCnx.java | 5 +++-- .../impl/AbstractBatchMessageContainer.java | 2 -- .../client/impl/BatchMessageContainerImpl.java | 16 ++++++++++++---- .../impl/BatchMessageKeyBasedContainer.java | 4 ++-- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index bf55dda10be52..8dfeb54fd6a32 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1744,11 +1744,12 @@ private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) headersAndPayload.resetReaderIndex(); if (log.isDebugEnabled()) { log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}," - + " partition key is: {}, ordering key is {}", + + " partition key is: {}, ordering key is {}, uncompressedSize is {}", remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(), headersAndPayload.readableBytes(), msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null, - msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null); + msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null, + msgMetadata.getUncompressedSize()); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java index 1827142cdfa2b..e81365d3886cc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java @@ -35,7 +35,6 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta protected CompressionType compressionType; protected CompressionCodec compressor; protected String topicName; - protected String producerName; protected ProducerImpl producer; protected int maxNumMessagesInBatch; @@ -108,7 +107,6 @@ public ProducerImpl.OpSendMsg createOpSendMsg() throws IOException { public void setProducer(ProducerImpl producer) { this.producer = producer; this.topicName = producer.getTopic(); - this.producerName = producer.getProducerName(); this.compressionType = CompressionCodecProvider .convertToWireProtocol(producer.getConfiguration().getCompressionType()); this.compressor = CompressionCodecProvider.getCompressionCodec(compressionType); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index fdbf1f15c296a..9be7210a38742 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -89,8 +89,8 @@ public BatchMessageContainerImpl(ProducerImpl producer) { public boolean add(MessageImpl msg, SendCallback callback) { if (log.isDebugEnabled()) { - log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName, - numMessagesInBatch); + log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, + producer.getProducerName(), numMessagesInBatch); } if (++numMessagesInBatch == 1) { @@ -234,8 +234,8 @@ public void discard(Exception ex) { batchedMessageMetadataAndPayload = null; } } catch (Throwable t) { - log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName, - lowestSequenceId, t); + log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, + producer.getProducerName(), lowestSequenceId, t); } clear(); } @@ -304,6 +304,14 @@ public OpSendMsg createOpSendMsg() throws IOException { ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(), messageMetadata.getHighestSequenceId(), numMessagesInBatch, messageMetadata, encryptedPayload); + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Build batch msg seq:{}, highest-seq:{}, numMessagesInBatch: {}, uncompressedSize: {}," + + " payloadSize: {}", topicName, producer.getProducerName(), + messageMetadata.getSequenceId(), messageMetadata.getNumMessagesInBatch(), + messageMetadata.getHighestSequenceId(), + messageMetadata.getUncompressedSize(), encryptedPayload.readableBytes()); + } + OpSendMsg op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(), messageMetadata.getHighestSequenceId(), firstCallback, batchAllocatedSizeBytes); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java index 45d683e72b0a3..e2ce9e2d0bd70 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java @@ -43,8 +43,8 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer { @Override public boolean add(MessageImpl msg, SendCallback callback) { if (log.isDebugEnabled()) { - log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, producerName, - numMessagesInBatch); + log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, + producer.getProducerName(), numMessagesInBatch); } String key = getKey(msg); final BatchMessageContainerImpl batchMessageContainer = batches.computeIfAbsent(key,