Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Fix consumer doesn't acknowledge all chunk message Ids #321

Merged
merged 6 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions lib/AckGroupingTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

#include <atomic>
#include <limits>
#include <set>

#include "BitSet.h"
#include "ChunkMessageIdImpl.h"
#include "ClientConnection.h"
#include "Commands.h"
#include "LogUtils.h"
Expand All @@ -42,6 +44,17 @@ void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback c
}
return;
}
if (ackType == CommandAck_AckType_Individual) {
// If it's individual ack, we need to acknowledge all message IDs in a chunked message Id
// If it's cumulative ack, we only need to ack the last message ID of a chunked message.
// ChunkedMessageId return last chunk message ID by default, so we don't need to handle it.
if (auto chunkMessageId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
auto msgIdList = chunkMessageId->getChunkedMessageIds();
doImmediateAck(std::set<MessageId>(msgIdList.begin(), msgIdList.end()), callback);
return;
}
}
const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
if (waitResponse_) {
const auto requestId = requestIdSupplier_();
Expand Down Expand Up @@ -84,29 +97,41 @@ void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, Resul
return;
}

std::set<MessageId> ackMsgIds;

for (const auto& msgId : msgIds) {
if (auto chunkMessageId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
auto msgIdList = chunkMessageId->getChunkedMessageIds();
ackMsgIds.insert(msgIdList.begin(), msgIdList.end());
} else {
ackMsgIds.insert(msgId);
}
}

if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
if (waitResponse_) {
const auto requestId = requestIdSupplier_();
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId)
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId)
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
}
});
} else {
cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds));
cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, ackMsgIds));
if (callback) {
callback(ResultOk);
}
}
} else {
auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
auto count = std::make_shared<std::atomic<size_t>>(ackMsgIds.size());
auto wrappedCallback = [callback, count](Result result) {
if (--*count == 0 && callback) {
callback(result);
}
};
for (auto&& msgId : msgIds) {
for (auto&& msgId : ackMsgIds) {
doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual);
}
}
Expand Down
18 changes: 8 additions & 10 deletions lib/ChunkMessageIdImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,19 @@ class ChunkMessageIdImpl;
typedef std::shared_ptr<ChunkMessageIdImpl> ChunkMessageIdImplPtr;
class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_this<ChunkMessageIdImpl> {
public:
ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared<MessageIdImpl>()) {}

void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = *msgId.impl_; }

void setLastChunkMessageId(const MessageId& msgId) {
this->ledgerId_ = msgId.ledgerId();
this->entryId_ = msgId.entryId();
this->partition_ = msgId.partition();
explicit ChunkMessageIdImpl(std::vector<MessageId>&& chunkedMessageIds)
: chunkedMessageIds_(std::move(chunkedMessageIds)) {
auto lastChunkMsgId = chunkedMessageIds_.back();
this->ledgerId_ = lastChunkMsgId.ledgerId();
this->entryId_ = lastChunkMsgId.entryId();
this->partition_ = lastChunkMsgId.partition();
}

std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const { return firstChunkMsgId_; }
const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }

MessageId build() { return MessageId{std::dynamic_pointer_cast<MessageIdImpl>(shared_from_this())}; }

private:
std::shared_ptr<MessageIdImpl> firstChunkMsgId_;
std::vector<MessageId> chunkedMessageIds_;
};
} // namespace pulsar
6 changes: 3 additions & 3 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,9 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const Me

auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
if (chunkMsgId) {
auto firstId = chunkMsgId->getFirstChunkMessageId();
messageIdData.set_ledgerid(firstId->ledgerId_);
messageIdData.set_entryid(firstId->entryId_);
const auto& firstId = chunkMsgId->getChunkedMessageIds().front();
messageIdData.set_ledgerid(firstId.ledgerId());
messageIdData.set_entryid(firstId.entryId());
} else {
messageIdData.set_ledgerid(messageId.ledgerId());
messageIdData.set_entryid(messageId.entryId());
Expand Down
8 changes: 4 additions & 4 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,7 @@ boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuff
return boost::none;
}

ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
messageId = chunkMsgId->build();
messageId = std::make_shared<ChunkMessageIdImpl>(chunkedMsgCtx.moveChunkedMessageIds())->build();

LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
<< ", sequenceId: " << metadata.sequence_id());
Expand Down Expand Up @@ -1173,6 +1170,9 @@ std::pair<MessageId, bool> ConsumerImpl::prepareIndividualAck(const MessageId& m
(batchSize > 0) ? batchSize : 1);
unAckedMessageTrackerPtr_->remove(messageId);
possibleSendToDeadLetterTopicMessages_.remove(messageId);
if (std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageIdImpl)) {
return std::make_pair(messageId, true);
}
return std::make_pair(discardBatch(messageId), true);
} else if (config_.isBatchIndexAckEnabled()) {
return std::make_pair(messageId, true);
Expand Down
4 changes: 2 additions & 2 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ class ConsumerImpl : public ConsumerImplBase {

const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }

std::vector<MessageId> moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); }

long getReceivedTimeMs() const noexcept { return receivedTimeMs_; }

friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) {
Expand All @@ -292,8 +294,6 @@ class ConsumerImpl : public ConsumerImplBase {
// concurrently on the topic) then it guards against broken chunked message which was not fully published
const bool autoAckOldestChunkedMessageOnQueueFull_;

// The key is UUID, value is the associated ChunkedMessageCtx of the chunked message.
std::unordered_map<std::string, ChunkedMessageCtx> chunkedMessagesMap_;
// This list contains all the keys of `chunkedMessagesMap_`, each key is an UUID that identifies a pending
// chunked message. Once the number of pending chunked messages exceeds the limit, the oldest UUIDs and
// the associated ChunkedMessageCtx will be removed.
Expand Down
19 changes: 9 additions & 10 deletions lib/MessageId.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ void MessageId::serialize(std::string& result) const {
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
if (chunkMsgId) {
proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id();
auto firstChunkId = chunkMsgId->getFirstChunkMessageId();
firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_);
firstChunkIdData.set_entryid(firstChunkId->entryId_);
const auto& firstChunkId = chunkMsgId->getChunkedMessageIds().front();
firstChunkIdData.set_ledgerid(firstChunkId.ledgerId());
firstChunkIdData.set_entryid(firstChunkId.entryId());
if (chunkMsgId->partition_ != -1) {
firstChunkIdData.set_partition(firstChunkId->partition_);
firstChunkIdData.set_partition(firstChunkId.partition());
}
}

Expand All @@ -99,9 +99,8 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) {
MessageId msgId = MessageIdBuilder::from(idData).build();

if (idData.has_first_chunk_message_id()) {
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
chunkMsgId->setLastChunkMessageId(msgId);
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(
std::vector<MessageId>({MessageIdBuilder::from(idData.first_chunk_message_id()).build(), msgId}));
return chunkMsgId->build();
}

Expand All @@ -121,9 +120,9 @@ int32_t MessageId::batchSize() const { return impl_->batchSize_; }
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
if (chunkMsgId) {
auto firstId = chunkMsgId->getFirstChunkMessageId();
s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ','
<< firstId->batchIndex_ << ");";
const auto& firstId = chunkMsgId->getChunkedMessageIds().front();
s << '(' << firstId.ledgerId() << ',' << firstId.entryId() << ',' << firstId.partition() << ','
<< firstId.batchIndex() << ");";
}
s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ','
<< messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')';
Expand Down
8 changes: 5 additions & 3 deletions lib/OpSendMsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ struct SendArguments {
SendArguments& operator=(const SendArguments&) = delete;
};

typedef std::shared_ptr<std::vector<MessageId>> ChunkMessageIdListPtr;

struct OpSendMsg {
const Result result;
const int32_t chunkId;
Expand All @@ -54,7 +56,7 @@ struct OpSendMsg {
const boost::posix_time::ptime timeout;
const SendCallback sendCallback;
std::vector<std::function<void(Result)>> trackerCallbacks;
ChunkMessageIdImplPtr chunkedMessageId;
ChunkMessageIdListPtr chunkMessageIdList;
// Use shared_ptr here because producer might resend the message with the same arguments
const std::shared_ptr<SendArguments> sendArgs;

Expand Down Expand Up @@ -89,7 +91,7 @@ struct OpSendMsg {
sendArgs(nullptr) {}

OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, uint64_t messagesSize,
int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdImplPtr chunkedMessageId,
int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdListPtr chunkMessageIdList,
uint64_t producerId, SharedBuffer payload)
: result(ResultOk),
chunkId(metadata.chunk_id()),
Expand All @@ -98,7 +100,7 @@ struct OpSendMsg {
messagesSize(messagesSize),
timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)),
sendCallback(std::move(callback)),
chunkedMessageId(chunkedMessageId),
chunkMessageIdList(std::move(chunkMessageIdList)),
sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {}
};

Expand Down
19 changes: 9 additions & 10 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,14 +571,14 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
}
} else {
const bool sendChunks = (totalChunks > 1);
ChunkMessageIdListPtr chunkMessageIdList;
if (sendChunks) {
msgMetadata.set_uuid(producerName_ + "-" + std::to_string(sequenceId));
msgMetadata.set_num_chunks_from_msg(totalChunks);
msgMetadata.set_total_chunk_msg_size(compressedSize);
chunkMessageIdList = std::make_shared<std::vector<MessageId>>();
}

auto chunkMessageId = totalChunks > 1 ? std::make_shared<ChunkMessageIdImpl>() : nullptr;

int beginIndex = 0;
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
if (sendChunks) {
Expand All @@ -595,7 +595,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
}

auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize, conf_.getSendTimeout(),
(chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageId,
(chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageIdList,
producerId_, encryptedPayload);

if (!chunkingEnabled_) {
Expand Down Expand Up @@ -886,7 +886,7 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
return true;
}

const auto& op = *pendingMessagesQueue_.front();
auto& op = *pendingMessagesQueue_.front();
if (op.result != ResultOk) {
LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " for " << sequenceId << " and "
<< rawMessageId);
Expand All @@ -910,13 +910,12 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);

if (op.chunkedMessageId) {
if (op.chunkMessageIdList) {
// Handling the chunk message id.
if (op.chunkId == 0) {
op.chunkedMessageId->setFirstChunkMessageId(messageId);
} else if (op.chunkId == op.numChunks - 1) {
op.chunkedMessageId->setLastChunkMessageId(messageId);
messageId = op.chunkedMessageId->build();
op.chunkMessageIdList->push_back(messageId);
if (op.chunkId == op.numChunks - 1) {
auto chunkedMessageId = std::make_shared<ChunkMessageIdImpl>(std::move(*op.chunkMessageIdList));
messageId = chunkedMessageId->build();
}
}

Expand Down
47 changes: 31 additions & 16 deletions tests/MessageChunkingTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ class MessageChunkingTest : public ::testing::TestWithParam<CompressionType> {
}

void createConsumer(const std::string& topic, Consumer& consumer) {
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
ConsumerConfiguration conf;
conf.setBrokerConsumerStatsCacheTimeInMs(1000);
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer));
}

void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) {
Expand Down Expand Up @@ -118,9 +120,6 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
for (int i = 0; i < numMessages; i++) {
MessageId messageId;
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
auto chunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
ASSERT_TRUE(chunkMsgId);
LOG_INFO("Send " << i << " to " << messageId);
sendMessageIds.emplace_back(messageId);
}
Expand All @@ -134,19 +133,35 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
ASSERT_EQ(msg.getMessageId().batchIndex(), -1);
ASSERT_EQ(msg.getMessageId().batchSize(), 0);
auto messageId = msg.getMessageId();
auto chunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
ASSERT_TRUE(chunkMsgId);
receivedMessageIds.emplace_back(messageId);
consumer.acknowledge(messageId);
}
ASSERT_EQ(receivedMessageIds, sendMessageIds);
ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId());
for (int i = 0; i < sendMessageIds.size(); ++i) {
auto sendChunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(sendMessageIds[i]));
ASSERT_TRUE(sendChunkMsgId);
auto receiveChunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(
PulsarFriend::getMessageIdImpl(receivedMessageIds[i]));
ASSERT_TRUE(receiveChunkMsgId);
ASSERT_EQ(sendChunkMsgId->getChunkedMessageIds(), receiveChunkMsgId->getChunkedMessageIds());
}
ASSERT_GT(receivedMessageIds.back().entryId(), numMessages);

// Verify the cache has been cleared
auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
ASSERT_EQ(chunkedMessageCache.size(), 0);

BrokerConsumerStats consumerStats;
waitUntil(
std::chrono::seconds(10),
[&] {
return consumer.getBrokerConsumerStats(consumerStats) == ResultOk &&
consumerStats.getMsgBacklog() == 0;
},
1000);
ASSERT_EQ(consumerStats.getMsgBacklog(), 0);

producer.close();
consumer.close();
}
Expand Down Expand Up @@ -317,9 +332,9 @@ TEST_P(MessageChunkingTest, testSeekChunkMessages) {
TEST(ChunkMessageIdTest, testSetChunkMessageId) {
MessageId msgId;
{
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build());
chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build());
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(
std::vector<MessageId>({MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build(),
MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()}));
msgId = chunkMsgId->build();
// Test the destructor of the underlying message id should also work for the generated messageId.
}
Expand All @@ -332,13 +347,13 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) {
ASSERT_EQ(deserializedMsgId.entryId(), 5);
ASSERT_EQ(deserializedMsgId.partition(), 6);

auto chunkMsgId =
const auto& chunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(deserializedMsgId));
ASSERT_TRUE(chunkMsgId);
auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId();
ASSERT_EQ(firstChunkMsgId->ledgerId_, 1);
ASSERT_EQ(firstChunkMsgId->entryId_, 2);
ASSERT_EQ(firstChunkMsgId->partition_, 3);
auto firstChunkMsgId = chunkMsgId->getChunkedMessageIds().front();
ASSERT_EQ(firstChunkMsgId.ledgerId(), 1);
ASSERT_EQ(firstChunkMsgId.entryId(), 2);
ASSERT_EQ(firstChunkMsgId.partition(), 3);
}

// The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P
Expand Down