From b97a25b2965a8da74fa091c963d002893fd3bc35 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 22 Sep 2021 16:44:25 +0800 Subject: [PATCH 1/2] Return the message-id of the last chunk when sending chunked messages. --- .../apache/pulsar/client/impl/ProducerImpl.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index c3ea39874c045..13d7e298a5bea 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -52,6 +52,7 @@ import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -150,6 +151,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private Optional topicEpoch = Optional.empty(); private final List previousExceptions = new CopyOnWriteArrayList(); + private ConcurrentHashMap chunkMessageIds; + @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater msgIdGeneratorUpdater = AtomicLongFieldUpdater .newUpdater(ProducerImpl.class, "msgIdGenerator"); @@ -170,6 +173,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration } else { this.semaphore = Optional.empty(); } + this.chunkMessageIds = new ConcurrentHashMap<>(); this.compressor = CompressionCodecProvider.getCompressionCodec(conf.getCompressionType()); @@ -1000,6 +1004,18 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le OpSendMsg finalOp = op; LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, getHighestSequenceId(finalOp))); op.setMessageId(ledgerId, entryId, partitionIndex); + if (op.totalChunks > 1) { + if (op.chunkId == 0) { + chunkMessageIds.put(op.msg.getMessageBuilder().getUuid(), + new MessageIdImpl(ledgerId, entryId, partitionIndex)); + } else if (op.chunkId == op.totalChunks - 1) { + MessageIdImpl firstChunkMsgId = chunkMessageIds.get(op.msg.getMessageBuilder().getUuid()); + if (firstChunkMsgId != null) { + op.setMessageId(firstChunkMsgId.ledgerId, firstChunkMsgId.entryId, firstChunkMsgId.partitionIndex); + } + } + } + // if message is chunked then call callback only on last chunk if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) { try { From a87fb8f8d117bfd253262d12694b67962cf42f41 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 24 Sep 2021 18:37:25 +0800 Subject: [PATCH 2/2] Use ConcurrentOpenHashMap. Signed-off-by: Zike Yang --- .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 13d7e298a5bea..1e02c82a36fe3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -52,7 +52,6 @@ import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -90,6 +89,7 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,7 +151,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private Optional topicEpoch = Optional.empty(); private final List previousExceptions = new CopyOnWriteArrayList(); - private ConcurrentHashMap chunkMessageIds; + private ConcurrentOpenHashMap chunkMessageIds; @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater msgIdGeneratorUpdater = AtomicLongFieldUpdater @@ -173,7 +173,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration } else { this.semaphore = Optional.empty(); } - this.chunkMessageIds = new ConcurrentHashMap<>(); + this.chunkMessageIds = new ConcurrentOpenHashMap<>(); this.compressor = CompressionCodecProvider.getCompressionCodec(conf.getCompressionType()); @@ -1013,6 +1013,7 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le if (firstChunkMsgId != null) { op.setMessageId(firstChunkMsgId.ledgerId, firstChunkMsgId.entryId, firstChunkMsgId.partitionIndex); } + chunkMessageIds.remove(op.msg.getMessageBuilder().getUuid()); } }