From f860566c05634dd097d8941ea38a2a939fcfd26e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 17 Jan 2023 19:42:38 +0800 Subject: [PATCH] Fix broken main branch by providing correct getBitSet implementation ### Motivation Currently the main branch is broken by the concurrent merge of https://github.com/apache/pulsar-client-cpp/pull/153 and https://github.com/apache/pulsar-client-cpp/pull/151. It's because when a batched message id is constructed from deserialization, there is no `getBitSet` implementation of the internal acker. ### Modifications Add a `bool` parameter to `MessageIdImpl::getBitSet` to indicate whether the message ID is batched. The logic is similar with https://github.com/apache/pulsar/blob/299bd70fdfa023768e94a8ee4347d39337b6cbd4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L325-L327 and https://github.com/apache/pulsar/blob/299bd70fdfa023768e94a8ee4347d39337b6cbd4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L345-L347 Add a `testMessageIdFromBuild` to test the acknowledgment for a message ID without an acker could succeed for a consumer that enables batch index ACK. ### TODO In future, https://github.com/apache/pulsar/pull/19031 might be migrated into the C++ client to fix the consumer that disables batch index ACK. --- lib/AckGroupingTracker.cc | 3 ++- lib/BatchMessageAcker.h | 8 ++++-- lib/BatchedMessageIdImpl.h | 16 +++++++++++- lib/Commands.cc | 2 +- lib/MessageIdImpl.h | 2 +- tests/AcknowledgeTest.cc | 52 ++++++++++++++++++++++++++++++++++++++ tests/ConsumerWrapper.h | 20 ++++++++++++--- 7 files changed, 94 insertions(+), 9 deletions(-) diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc index 064d4eac..ae919fb6 100644 --- a/lib/AckGroupingTracker.cc +++ b/lib/AckGroupingTracker.cc @@ -31,7 +31,8 @@ DECLARE_LOG_OBJECT(); inline void sendAck(ClientConnectionPtr cnx, uint64_t consumerId, const MessageId& msgId, CommandAck_AckType ackType) { - const auto& bitSet = Commands::getMessageIdImpl(msgId)->getBitSet(); + const auto& bitSet = + Commands::getMessageIdImpl(msgId)->getBitSet(ackType == CommandAck_AckType_Individual); auto cmd = Commands::newAck(consumerId, msgId.ledgerId(), msgId.entryId(), bitSet, ackType, -1); cnx->sendCommand(cmd); LOG_DEBUG("ACK request is sent for message - [" << msgId.ledgerId() << ", " << msgId.entryId() << "]"); diff --git a/lib/BatchMessageAcker.h b/lib/BatchMessageAcker.h index 7aa8c1fa..a7538ebb 100644 --- a/lib/BatchMessageAcker.h +++ b/lib/BatchMessageAcker.h @@ -37,14 +37,16 @@ class BatchMessageAcker { // by deserializing from raw bytes. virtual bool ackIndividual(int32_t) { return false; } virtual bool ackCumulative(int32_t) { return false; } + virtual const BitSet& getBitSet() noexcept { + static BitSet emptyBitSet; + return emptyBitSet; + } bool shouldAckPreviousMessageId() noexcept { bool expectedValue = false; return prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true); } - const BitSet& getBitSet() const noexcept { return bitSet_; } - private: // When a batched message is acknowledged cumulatively, the previous message id will be acknowledged // without batch index ACK enabled. However, it should be acknowledged only once. Use this flag to @@ -80,6 +82,8 @@ class BatchMessageAckerImpl : public BatchMessageAcker { return bitSet_.isEmpty(); } + const BitSet& getBitSet() const noexcept { return bitSet_; } + private: BitSet bitSet_; mutable std::mutex mutex_; diff --git a/lib/BatchedMessageIdImpl.h b/lib/BatchedMessageIdImpl.h index f3bf4bc2..751de53c 100644 --- a/lib/BatchedMessageIdImpl.h +++ b/lib/BatchedMessageIdImpl.h @@ -41,7 +41,21 @@ class BatchedMessageIdImpl : public MessageIdImpl { bool shouldAckPreviousMessageId() const { return acker_->shouldAckPreviousMessageId(); } - const BitSet& getBitSet() const noexcept override { return acker_->getBitSet(); } + const BitSet& getBitSet(bool individual) const noexcept override { + const auto& bitSet = acker_->getBitSet(); + if (bitSet.isEmpty()) { + thread_local BitSet threadLocalBitSet; + threadLocalBitSet = BitSet{batchSize_}; + threadLocalBitSet.set(0, batchSize_); + if (individual) { + threadLocalBitSet.clear(batchIndex_); + } else { + threadLocalBitSet.clear(0, batchIndex_ + 1); + } + return threadLocalBitSet; + } + return bitSet; + } MessageId getPreviousMessageId() { return MessageIdBuilder().ledgerId(ledgerId_).entryId(entryId_ - 1).partition(partition_).build(); diff --git a/lib/Commands.cc b/lib/Commands.cc index 8d8b5f3f..070931fe 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -450,7 +450,7 @@ SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const std::setadd_message_id(); newMsgId->set_ledgerid(msgId.ledgerId()); newMsgId->set_entryid(msgId.entryId()); - for (auto x : getMessageIdImpl(msgId)->getBitSet()) { + for (auto x : getMessageIdImpl(msgId)->getBitSet(true)) { newMsgId->add_ack_set(x); } } diff --git a/lib/MessageIdImpl.h b/lib/MessageIdImpl.h index 0aa96648..1a6c7ea3 100644 --- a/lib/MessageIdImpl.h +++ b/lib/MessageIdImpl.h @@ -46,7 +46,7 @@ class MessageIdImpl { const std::string& getTopicName() { return *topicName_; } void setTopicName(const std::string& topicName) { topicName_ = &topicName; } - virtual const BitSet& getBitSet() const noexcept { + virtual const BitSet& getBitSet(bool individual) const noexcept { static const BitSet emptyBitSet; return emptyBitSet; } diff --git a/tests/AcknowledgeTest.cc b/tests/AcknowledgeTest.cc index e30c7b1b..f0ed7cfc 100644 --- a/tests/AcknowledgeTest.cc +++ b/tests/AcknowledgeTest.cc @@ -18,6 +18,7 @@ */ #include #include +#include #include #include @@ -302,4 +303,55 @@ TEST_F(AcknowledgeTest, testMixedCumulativeAck) { ASSERT_EQ(ResultTimeout, consumer.getConsumer().receive(msg, 1000)); } +TEST_F(AcknowledgeTest, testMessageIdFromBuild) { + Client client(lookupUrl); + const std::string topic = "test-message-id-from-build-" + unique_str(); + + ConsumerWrapper consumer0; + consumer0.initialize(client, topic, "sub-0", false); + + ConsumerWrapper consumer1; + consumer1.initialize(client, topic, "sub-1", true /* batch index ACK enabled */); + + Producer producer; + auto producerConf = + ProducerConfiguration().setBatchingMaxMessages(100).setBatchingMaxPublishDelayMs(3600 * 1000); + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer)); + + constexpr int numMessages = 5; + for (int i = 0; i < numMessages; i++) { + producer.sendAsync(MessageBuilder().setContent("msg").build(), nullptr); + } + producer.flush(); + + std::vector msgIds; + consumer0.receiveAtMost(numMessages); + for (auto&& msgId : consumer0.messageIdList()) { + msgIds.emplace_back(MessageIdBuilder() + .ledgerId(msgId.ledgerId()) + .entryId(msgId.entryId()) + .batchIndex(msgId.batchIndex()) + .batchSize(msgId.batchSize()) + .build()); + } + + consumer0.acknowledgeMessageIdAndRestart(msgIds[0], AckType::INDIVIDUAL); + consumer1.acknowledgeMessageIdAndRestart(msgIds[0], AckType::INDIVIDUAL); + + Message msg; + ASSERT_EQ(ResultOk, consumer0.getConsumer().receive(msg, 1000)); + EXPECT_EQ(msg.getMessageId().batchIndex(), 0); + ASSERT_EQ(ResultOk, consumer1.getConsumer().receive(msg, 1000)); + EXPECT_EQ(msg.getMessageId().batchIndex(), 1); + + consumer0.acknowledgeMessageIdAndRestart(msgIds[3], AckType::CUMULATIVE); + consumer1.acknowledgeMessageIdAndRestart(msgIds[3], AckType::CUMULATIVE); + + ASSERT_EQ(ResultOk, consumer0.getConsumer().receive(msg, 1000)); + EXPECT_EQ(msg.getMessageId().batchIndex(), 0); + ASSERT_EQ(ResultOk, consumer1.getConsumer().receive(msg, 1000)); + EXPECT_EQ(msg.getMessageId().batchIndex(), 3 + 1); + client.close(); +} + INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, testing::Values(100, 0)); diff --git a/tests/ConsumerWrapper.h b/tests/ConsumerWrapper.h index 09fc4981..75b475c7 100644 --- a/tests/ConsumerWrapper.h +++ b/tests/ConsumerWrapper.h @@ -54,6 +54,7 @@ class ConsumerWrapper { // Enable the stats for cumulative ack conf_.setUnAckedMessagesTimeoutMs(10000); conf_.setBatchIndexAckEnabled(enableBatchIndexAck); + conf_.setAckGroupingTimeMs(0); // send ACK immediately ASSERT_EQ(ResultOk, client_->subscribe(topic_, subscription_, conf_, consumer_)); } @@ -95,9 +96,16 @@ class ConsumerWrapper { // the acknowledgment by restarting the consumer. void acknowledgeAndRestart(const std::vector& indexes, AckType ackType) { acknowledge(indexes, ackType); - messageIdList_.clear(); - consumer_.close(); - ASSERT_EQ(ResultOk, client_->subscribe(topic_, subscription_, conf_, consumer_)); + restart(); + } + + void acknowledgeMessageIdAndRestart(MessageId msgId, AckType ackType) { + if (ackType == AckType::CUMULATIVE) { + consumer_.acknowledgeCumulative(msgId); + } else { + consumer_.acknowledge(msgId); + } + restart(); } Consumer& getConsumer() noexcept { return consumer_; } @@ -109,6 +117,12 @@ class ConsumerWrapper { ConsumerConfiguration conf_; Consumer consumer_; std::vector messageIdList_; + + void restart() { + messageIdList_.clear(); + consumer_.close(); + ASSERT_EQ(ResultOk, client_->subscribe(topic_, subscription_, conf_, consumer_)); + } }; } // namespace pulsar