Skip to content

Commit 67205f7

Browse files
committed
Support seek for chunk messages
1 parent dbdb748 commit 67205f7

File tree

5 files changed

+79
-20
lines changed

5 files changed

+79
-20
lines changed

lib/Commands.cc

+12-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include "BatchMessageAcker.h"
3030
#include "BatchedMessageIdImpl.h"
31+
#include "ChunkMessageIdImpl.h"
3132
#include "LogUtils.h"
3233
#include "MessageImpl.h"
3334
#include "PulsarApi.pb.h"
@@ -512,8 +513,17 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const Me
512513
commandSeek->set_request_id(requestId);
513514

514515
MessageIdData& messageIdData = *commandSeek->mutable_message_id();
515-
messageIdData.set_ledgerid(messageId.ledgerId());
516-
messageIdData.set_entryid(messageId.entryId());
516+
517+
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
518+
if (chunkMsgId) {
519+
auto firstId = chunkMsgId->getFirstChunkMessageId();
520+
messageIdData.set_ledgerid(firstId->ledgerId_);
521+
messageIdData.set_entryid(firstId->entryId_);
522+
} else {
523+
messageIdData.set_ledgerid(messageId.ledgerId());
524+
messageIdData.set_entryid(messageId.entryId());
525+
}
526+
517527
return writeMessageWithSize(cmd);
518528
}
519529

lib/ConsumerImpl.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,7 @@ class ConsumerImpl : public ConsumerImplBase {
308308
boost::optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
309309
const proto::MessageMetadata& metadata,
310310
const proto::MessageIdData& messageIdData,
311-
const ClientConnectionPtr& cnx,
312-
MessageId& messageId);
311+
const ClientConnectionPtr& cnx, MessageId& messageId);
313312

314313
friend class PulsarFriend;
315314

lib/MessageId.cc

+10-11
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,14 @@ void MessageId::serialize(std::string& result) const {
7070
}
7171

7272
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
73-
if(chunkMsgId) {
74-
auto* firstChunkIdData = new proto::MessageIdData();
73+
if (chunkMsgId) {
74+
proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id();
7575
auto firstChunkId = chunkMsgId->getFirstChunkMessageId();
76-
firstChunkIdData->set_ledgerid(firstChunkId->ledgerId_);
77-
firstChunkIdData->set_entryid(firstChunkId->entryId_);
78-
if(chunkMsgId->partition_!=-1){
79-
firstChunkIdData->set_partition(firstChunkId->partition_);
76+
firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_);
77+
firstChunkIdData.set_entryid(firstChunkId->entryId_);
78+
if (chunkMsgId->partition_ != -1) {
79+
firstChunkIdData.set_partition(firstChunkId->partition_);
8080
}
81-
idData.set_allocated_first_chunk_message_id(firstChunkIdData);
8281
}
8382

8483
idData.SerializeToString(&result);
@@ -95,7 +94,7 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) {
9594

9695
MessageId msgId = MessageIdBuilder::from(idData).build();
9796

98-
if(idData.has_first_chunk_message_id()) {
97+
if (idData.has_first_chunk_message_id()) {
9998
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
10099
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
101100
chunkMsgId->setLastChunkMessageId(msgId);
@@ -117,10 +116,10 @@ int32_t MessageId::batchSize() const { return impl_->batchSize_; }
117116

118117
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
119118
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
120-
if(chunkMsgId) {
119+
if (chunkMsgId) {
121120
auto firstId = chunkMsgId->getFirstChunkMessageId();
122-
s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ','
123-
<< firstId->partition_ << ',' << firstId->batchIndex_ << ");";
121+
s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ','
122+
<< firstId->batchIndex_ << ");";
124123
}
125124
s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ','
126125
<< messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')';

tests/MessageChunkingTest.cc

+55-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <ctime>
2424
#include <random>
2525

26+
#include "ChunkMessageIdImpl.h"
2627
#include "PulsarFriend.h"
2728
#include "WaitUtils.h"
2829
#include "lib/LogUtils.h"
@@ -117,7 +118,8 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
117118
for (int i = 0; i < numMessages; i++) {
118119
MessageId messageId;
119120
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
120-
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
121+
auto chunkMsgId =
122+
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
121123
ASSERT_TRUE(chunkMsgId);
122124
LOG_INFO("Send " << i << " to " << messageId);
123125
sendMessageIds.emplace_back(messageId);
@@ -132,7 +134,8 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
132134
ASSERT_EQ(msg.getMessageId().batchIndex(), -1);
133135
ASSERT_EQ(msg.getMessageId().batchSize(), 0);
134136
auto messageId = msg.getMessageId();
135-
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
137+
auto chunkMsgId =
138+
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
136139
ASSERT_TRUE(chunkMsgId);
137140
receivedMessageIds.emplace_back(messageId);
138141
}
@@ -261,6 +264,56 @@ TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) {
261264
consumer.close();
262265
}
263266

267+
TEST_P(MessageChunkingTest, testSeekChunkMessages) {
268+
const std::string topic =
269+
"MessageChunkingTest-testSeekChunkMessages-" + toString(GetParam()) + std::to_string(time(nullptr));
270+
271+
constexpr int numMessages = 10;
272+
273+
Consumer consumer1;
274+
ConsumerConfiguration consumer1Conf;
275+
consumer1Conf.setStartMessageIdInclusive(true);
276+
createConsumer(topic, consumer1, consumer1Conf);
277+
278+
Producer producer;
279+
createProducer(topic, producer);
280+
281+
for (int i = 0; i < numMessages; i++) {
282+
MessageId messageId;
283+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
284+
LOG_INFO("Send " << i << " to " << messageId);
285+
}
286+
287+
Message msg;
288+
std::vector<MessageId> receivedMessageIds;
289+
for (int i = 0; i < numMessages; i++) {
290+
ASSERT_EQ(ResultOk, consumer1.receive(msg, 3000));
291+
LOG_INFO("Receive " << msg.getLength() << " bytes from " << msg.getMessageId());
292+
receivedMessageIds.emplace_back(msg.getMessageId());
293+
}
294+
295+
consumer1.seek(receivedMessageIds[1]);
296+
for (int i = 1; i < numMessages; i++) {
297+
Message msgAfterSeek;
298+
ASSERT_EQ(ResultOk, consumer1.receive(msgAfterSeek, 3000));
299+
ASSERT_EQ(msgAfterSeek.getMessageId(), receivedMessageIds[i]);
300+
}
301+
302+
consumer1.close();
303+
Consumer consumer2;
304+
createConsumer(topic, consumer2);
305+
306+
consumer2.seek(receivedMessageIds[1]);
307+
for (int i = 2; i < numMessages; i++) {
308+
Message msgAfterSeek;
309+
ASSERT_EQ(ResultOk, consumer2.receive(msgAfterSeek, 3000));
310+
ASSERT_EQ(msgAfterSeek.getMessageId(), receivedMessageIds[i]);
311+
}
312+
313+
consumer2.close();
314+
producer.close();
315+
}
316+
264317
TEST(ChunkMessageIdTest, testSetChunkMessageId) {
265318
MessageId msgId;
266319
{

tests/PulsarFriend.h

+1-3
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,7 @@ class PulsarFriend {
180180

181181
static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; }
182182

183-
static std::shared_ptr<MessageIdImpl> getMessageIdImpl(MessageId& msgId) {
184-
return msgId.impl_;
185-
}
183+
static std::shared_ptr<MessageIdImpl> getMessageIdImpl(MessageId& msgId) { return msgId.impl_; }
186184
};
187185
} // namespace pulsar
188186

0 commit comments

Comments
 (0)