diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index c3ea39874c045..1e02c82a36fe3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -89,6 +89,7 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -150,6 +151,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private Optional topicEpoch = Optional.empty(); private final List previousExceptions = new CopyOnWriteArrayList(); + private ConcurrentOpenHashMap chunkMessageIds; + @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater msgIdGeneratorUpdater = AtomicLongFieldUpdater .newUpdater(ProducerImpl.class, "msgIdGenerator"); @@ -170,6 +173,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration } else { this.semaphore = Optional.empty(); } + this.chunkMessageIds = new ConcurrentOpenHashMap<>(); this.compressor = CompressionCodecProvider.getCompressionCodec(conf.getCompressionType()); @@ -1000,6 +1004,19 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le OpSendMsg finalOp = op; LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, getHighestSequenceId(finalOp))); op.setMessageId(ledgerId, entryId, partitionIndex); + if (op.totalChunks > 1) { + if (op.chunkId == 0) { + chunkMessageIds.put(op.msg.getMessageBuilder().getUuid(), + new MessageIdImpl(ledgerId, entryId, partitionIndex)); + } else if (op.chunkId == op.totalChunks - 1) { + MessageIdImpl firstChunkMsgId = chunkMessageIds.get(op.msg.getMessageBuilder().getUuid()); + if (firstChunkMsgId != null) { + op.setMessageId(firstChunkMsgId.ledgerId, firstChunkMsgId.entryId, firstChunkMsgId.partitionIndex); + } + chunkMessageIds.remove(op.msg.getMessageBuilder().getUuid()); + } + } + // if message is chunked then call callback only on last chunk if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) { try {