Skip to content

Commit fa3ac76

Browse files
authored
[feat] PIP 107: Introduce chunk message ID (#148)
Fixes #79 ### Motivation This is the C++ implementation for apache/pulsar#12402 ### Modifications * Add ChunkMessageIdImpl * Return ChunkMessageId when the Producer produces the chunk message or when the consumer consumes the chunk message. * In cosumer.seek, use the first chunk message-id of the chunk message-id. This will solve the problem caused by seeking chunk messages. This is also the impact of this PIP on the original business logic.
1 parent c8b98c6 commit fa3ac76

12 files changed

+228
-34
lines changed

include/pulsar/Message.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,7 @@ class PULSAR_PUBLIC Message {
188188
MessageImplPtr impl_;
189189

190190
Message(MessageImplPtr& impl);
191-
Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, SharedBuffer& payload,
192-
int32_t partition);
191+
Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload);
193192
/// Used for Batch Messages
194193
Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload,
195194
proto::SingleMessageMetadata& singleMetadata, const std::string& topicName);

include/pulsar/MessageId.h

+1
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ class PULSAR_PUBLIC MessageId {
108108
friend class PulsarFriend;
109109
friend class NegativeAcksTracker;
110110
friend class MessageIdBuilder;
111+
friend class ChunkMessageIdImpl;
111112

112113
friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const MessageId& messageId);
113114

lib/ChunkMessageIdImpl.h

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <pulsar/MessageId.h>
23+
24+
#include "MessageIdImpl.h"
25+
26+
namespace pulsar {
27+
class ChunkMessageIdImpl;
28+
typedef std::shared_ptr<ChunkMessageIdImpl> ChunkMessageIdImplPtr;
29+
class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_this<ChunkMessageIdImpl> {
30+
public:
31+
ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared<MessageIdImpl>()) {}
32+
33+
void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = *msgId.impl_; }
34+
35+
void setLastChunkMessageId(const MessageId& msgId) {
36+
this->ledgerId_ = msgId.ledgerId();
37+
this->entryId_ = msgId.entryId();
38+
this->partition_ = msgId.partition();
39+
}
40+
41+
std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const { return firstChunkMsgId_; }
42+
43+
MessageId build() { return MessageId{std::dynamic_pointer_cast<MessageIdImpl>(shared_from_this())}; }
44+
45+
private:
46+
std::shared_ptr<MessageIdImpl> firstChunkMsgId_;
47+
};
48+
} // namespace pulsar

lib/Commands.cc

+12-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include "BatchMessageAcker.h"
3030
#include "BatchedMessageIdImpl.h"
31+
#include "ChunkMessageIdImpl.h"
3132
#include "LogUtils.h"
3233
#include "MessageImpl.h"
3334
#include "PulsarApi.pb.h"
@@ -512,8 +513,17 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const Me
512513
commandSeek->set_request_id(requestId);
513514

514515
MessageIdData& messageIdData = *commandSeek->mutable_message_id();
515-
messageIdData.set_ledgerid(messageId.ledgerId());
516-
messageIdData.set_entryid(messageId.entryId());
516+
517+
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
518+
if (chunkMsgId) {
519+
auto firstId = chunkMsgId->getFirstChunkMessageId();
520+
messageIdData.set_ledgerid(firstId->ledgerId_);
521+
messageIdData.set_entryid(firstId->entryId_);
522+
} else {
523+
messageIdData.set_ledgerid(messageId.ledgerId());
524+
messageIdData.set_entryid(messageId.entryId());
525+
}
526+
517527
return writeMessageWithSize(cmd);
518528
}
519529

lib/ConsumerImpl.cc

+13-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "AckGroupingTrackerEnabled.h"
2828
#include "BatchMessageAcker.h"
2929
#include "BatchedMessageIdImpl.h"
30+
#include "ChunkMessageIdImpl.h"
3031
#include "ClientConnection.h"
3132
#include "ClientImpl.h"
3233
#include "Commands.h"
@@ -375,9 +376,9 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
375376

376377
boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
377378
const proto::MessageMetadata& metadata,
378-
const MessageId& messageId,
379379
const proto::MessageIdData& messageIdData,
380-
const ClientConnectionPtr& cnx) {
380+
const ClientConnectionPtr& cnx,
381+
MessageId& messageId) {
381382
const auto chunkId = metadata.chunk_id();
382383
const auto uuid = metadata.uuid();
383384
LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
@@ -432,6 +433,11 @@ boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuff
432433
return boost::none;
433434
}
434435

436+
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
437+
chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
438+
chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
439+
messageId = chunkMsgId->build();
440+
435441
LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
436442
<< ", sequenceId: " << metadata.sequence_id());
437443

@@ -472,19 +478,20 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
472478
}
473479
}
474480

481+
const auto& messageIdData = msg.message_id();
482+
auto messageId = MessageIdBuilder::from(messageIdData).batchIndex(-1).build();
483+
475484
// Only a non-batched messages can be a chunk
476485
if (!metadata.has_num_messages_in_batch() && isChunkedMessage) {
477-
const auto& messageIdData = msg.message_id();
478-
auto messageId = MessageIdBuilder::from(messageIdData).build();
479-
auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx);
486+
auto optionalPayload = processMessageChunk(payload, metadata, messageIdData, cnx, messageId);
480487
if (optionalPayload) {
481488
payload = optionalPayload.value();
482489
} else {
483490
return;
484491
}
485492
}
486493

487-
Message m(msg, metadata, payload, partitionIndex_);
494+
Message m(messageId, metadata, payload);
488495
m.impl_->cnx_ = cnx.get();
489496
m.impl_->setTopicName(topic_);
490497
m.impl_->setRedeliveryCount(msg.redelivery_count());

lib/ConsumerImpl.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -298,18 +298,17 @@ class ConsumerImpl : public ConsumerImplBase {
298298
*
299299
* @param payload the payload of a chunk
300300
* @param metadata the message metadata
301-
* @param messageId
302301
* @param messageIdData
303302
* @param cnx
303+
* @param messageId
304304
*
305305
* @return the concatenated payload if chunks are concatenated into a completed message payload
306306
* successfully, else Optional::empty()
307307
*/
308308
boost::optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
309309
const proto::MessageMetadata& metadata,
310-
const MessageId& messageId,
311310
const proto::MessageIdData& messageIdData,
312-
const ClientConnectionPtr& cnx);
311+
const ClientConnectionPtr& cnx, MessageId& messageId);
313312

314313
friend class PulsarFriend;
315314

lib/Message.cc

+2-3
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,9 @@ Message::Message() : impl_() {}
6969

7070
Message::Message(MessageImplPtr& impl) : impl_(impl) {}
7171

72-
Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload,
73-
int32_t partition)
72+
Message::Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload)
7473
: impl_(std::make_shared<MessageImpl>()) {
75-
impl_->messageId = MessageIdBuilder::from(msg.message_id()).batchIndex(-1).build();
74+
impl_->messageId = messageId;
7675
impl_->metadata = metadata;
7776
impl_->payload = payload;
7877
}

lib/MessageId.cc

+28-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <memory>
2626
#include <stdexcept>
2727

28+
#include "ChunkMessageIdImpl.h"
2829
#include "MessageIdImpl.h"
2930
#include "PulsarApi.pb.h"
3031

@@ -68,6 +69,17 @@ void MessageId::serialize(std::string& result) const {
6869
idData.set_batch_index(impl_->batchIndex_);
6970
}
7071

72+
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
73+
if (chunkMsgId) {
74+
proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id();
75+
auto firstChunkId = chunkMsgId->getFirstChunkMessageId();
76+
firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_);
77+
firstChunkIdData.set_entryid(firstChunkId->entryId_);
78+
if (chunkMsgId->partition_ != -1) {
79+
firstChunkIdData.set_partition(firstChunkId->partition_);
80+
}
81+
}
82+
7183
idData.SerializeToString(&result);
7284
}
7385

@@ -80,7 +92,16 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) {
8092
throw std::invalid_argument("Failed to parse serialized message id");
8193
}
8294

83-
return MessageIdBuilder::from(idData).build();
95+
MessageId msgId = MessageIdBuilder::from(idData).build();
96+
97+
if (idData.has_first_chunk_message_id()) {
98+
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
99+
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
100+
chunkMsgId->setLastChunkMessageId(msgId);
101+
return chunkMsgId->build();
102+
}
103+
104+
return msgId;
84105
}
85106

86107
int64_t MessageId::ledgerId() const { return impl_->ledgerId_; }
@@ -94,6 +115,12 @@ int32_t MessageId::partition() const { return impl_->partition_; }
94115
int32_t MessageId::batchSize() const { return impl_->batchSize_; }
95116

96117
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
118+
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
119+
if (chunkMsgId) {
120+
auto firstId = chunkMsgId->getFirstChunkMessageId();
121+
s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ','
122+
<< firstId->batchIndex_ << ");";
123+
}
97124
s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ','
98125
<< messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')';
99126
return s;

lib/OpSendMsg.h

+6-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include <boost/date_time/posix_time/ptime.hpp>
2626

27+
#include "ChunkMessageIdImpl.h"
2728
#include "PulsarApi.pb.h"
2829
#include "SharedBuffer.h"
2930
#include "TimeUtils.h"
@@ -40,21 +41,23 @@ struct OpSendMsg {
4041
uint32_t messagesCount_;
4142
uint64_t messagesSize_;
4243
std::vector<std::function<void(Result)>> trackerCallbacks_;
44+
ChunkMessageIdImplPtr chunkedMessageId_;
4345

4446
OpSendMsg() = default;
4547

4648
OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer& payload,
4749
const SendCallback& sendCallback, uint64_t producerId, uint64_t sequenceId, int sendTimeoutMs,
48-
uint32_t messagesCount, uint64_t messagesSize)
49-
: metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with the
50+
uint32_t messagesCount, uint64_t messagesSize, ChunkMessageIdImplPtr chunkedMessageId = nullptr)
51+
: metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with
5052
// a shared metadata object
5153
payload_(payload),
5254
sendCallback_(sendCallback),
5355
producerId_(producerId),
5456
sequenceId_(sequenceId),
5557
timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)),
5658
messagesCount_(messagesCount),
57-
messagesSize_(messagesSize) {}
59+
messagesSize_(messagesSize),
60+
chunkedMessageId_(chunkedMessageId) {}
5861

5962
void complete(Result result, const MessageId& messageId) const {
6063
if (sendCallback_) {

lib/ProducerImpl.cc

+26-13
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,8 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
562562
msgMetadata.set_total_chunk_msg_size(compressedSize);
563563
}
564564

565+
auto chunkMessageId = totalChunks > 1 ? std::make_shared<ChunkMessageIdImpl>() : nullptr;
566+
565567
int beginIndex = 0;
566568
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
567569
if (sendChunks) {
@@ -578,7 +580,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
578580
}
579581
OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr,
580582
producerId_, sequenceId, conf_.getSendTimeout(),
581-
1, uncompressedSize};
583+
1, uncompressedSize, chunkMessageId};
582584

583585
if (!chunkingEnabled_) {
584586
const uint32_t msgMetadataSize = op.metadata_.ByteSize();
@@ -868,22 +870,33 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
868870
<< " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId
869871
<< " producer: " << producerId_);
870872
return true;
871-
} else {
872-
// Message was persisted correctly
873-
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
874-
releaseSemaphoreForSendOp(op);
875-
lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;
873+
}
876874

877-
pendingMessagesQueue_.pop_front();
875+
// Message was persisted correctly
876+
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
878877

879-
lock.unlock();
880-
try {
881-
op.complete(ResultOk, messageId);
882-
} catch (const std::exception& e) {
883-
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
878+
if (op.chunkedMessageId_) {
879+
// Handling the chunk message id.
880+
if (op.metadata_.chunk_id() == 0) {
881+
op.chunkedMessageId_->setFirstChunkMessageId(messageId);
882+
} else if (op.metadata_.chunk_id() == op.metadata_.num_chunks_from_msg() - 1) {
883+
op.chunkedMessageId_->setLastChunkMessageId(messageId);
884+
messageId = op.chunkedMessageId_->build();
884885
}
885-
return true;
886886
}
887+
888+
releaseSemaphoreForSendOp(op);
889+
lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;
890+
891+
pendingMessagesQueue_.pop_front();
892+
893+
lock.unlock();
894+
try {
895+
op.complete(ResultOk, messageId);
896+
} catch (const std::exception& e) {
897+
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
898+
}
899+
return true;
887900
}
888901

889902
bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload,

0 commit comments

Comments
 (0)