Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit dbdb748

Browse files
committedDec 16, 2022
Shared chunk message id across OpSendMsg
1 parent 2a20892 commit dbdb748

File tree

2 files changed

+10
-10
lines changed

2 files changed

+10
-10
lines changed
 

‎lib/OpSendMsg.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,17 @@ struct OpSendMsg {
4747

4848
OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer& payload,
4949
const SendCallback& sendCallback, uint64_t producerId, uint64_t sequenceId, int sendTimeoutMs,
50-
uint32_t messagesCount, uint64_t messagesSize)
51-
: metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with the
50+
uint32_t messagesCount, uint64_t messagesSize, ChunkMessageIdImplPtr chunkedMessageId = nullptr)
51+
: metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with
5252
// a shared metadata object
5353
payload_(payload),
5454
sendCallback_(sendCallback),
5555
producerId_(producerId),
5656
sequenceId_(sequenceId),
5757
timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)),
5858
messagesCount_(messagesCount),
59-
messagesSize_(messagesSize) {}
59+
messagesSize_(messagesSize),
60+
chunkedMessageId_(chunkedMessageId) {}
6061

6162
void complete(Result result, const MessageId& messageId) const {
6263
if (sendCallback_) {

‎lib/ProducerImpl.cc

+6-7
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,8 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
562562
msgMetadata.set_total_chunk_msg_size(compressedSize);
563563
}
564564

565+
auto chunkMessageId = totalChunks > 1 ? std::make_shared<ChunkMessageIdImpl>() : nullptr;
566+
565567
int beginIndex = 0;
566568
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
567569
if (sendChunks) {
@@ -578,7 +580,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
578580
}
579581
OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr,
580582
producerId_, sequenceId, conf_.getSendTimeout(),
581-
1, uncompressedSize};
583+
1, uncompressedSize, chunkMessageId};
582584

583585
if (!chunkingEnabled_) {
584586
const uint32_t msgMetadataSize = op.metadata_.ByteSize();
@@ -873,14 +875,11 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
873875
// Message was persisted correctly
874876
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
875877

876-
auto totalChunks = op.metadata_.num_chunks_from_msg();
877-
if (totalChunks > 1) {
878-
if (!op.chunkedMessageId_) {
879-
op.chunkedMessageId_ = std::make_shared<ChunkMessageIdImpl>();
880-
}
878+
if (op.chunkedMessageId_) {
879+
// Handling the chunk message id.
881880
if (op.metadata_.chunk_id() == 0) {
882881
op.chunkedMessageId_->setFirstChunkMessageId(messageId);
883-
} else if (op.metadata_.chunk_id() == totalChunks - 1) {
882+
} else if (op.metadata_.chunk_id() == op.metadata_.num_chunks_from_msg() - 1) {
884883
op.chunkedMessageId_->setLastChunkMessageId(messageId);
885884
messageId = ChunkMessageIdImpl::buildMessageId(op.chunkedMessageId_);
886885
}

0 commit comments

Comments
 (0)
Please sign in to comment.