Skip to content

Commit

Permalink
Fix broken main branch by providing correct getBitSet implementation
Browse files Browse the repository at this point in the history
### Motivation

Currently the main branch is broken by the concurrent merge of
apache#153 and
apache#151. It's because when
a batched message id is constructed from deserialization, there is no
`getBitSet` implementation of the internal acker.

### Modifications

Add a `bool` parameter to `MessageIdImpl::getBitSet` to indicate whether
the message ID is batched. The logic is similar with

https://github.com/apache/pulsar/blob/299bd70fdfa023768e94a8ee4347d39337b6cbd4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L325-L327

and

https://github.com/apache/pulsar/blob/299bd70fdfa023768e94a8ee4347d39337b6cbd4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L345-L347

Add a `testMessageIdFromBuild` to test the acknowledgment for a message
ID without an acker could succeed for a consumer that enables batch
index ACK.

### TODO

In future, apache/pulsar#19031 might be migrated
into the C++ client to fix the consumer that disables batch index ACK.
  • Loading branch information
BewareMyPower committed Jan 17, 2023
1 parent 06eab69 commit f860566
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 9 deletions.
3 changes: 2 additions & 1 deletion lib/AckGroupingTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ DECLARE_LOG_OBJECT();

inline void sendAck(ClientConnectionPtr cnx, uint64_t consumerId, const MessageId& msgId,
CommandAck_AckType ackType) {
const auto& bitSet = Commands::getMessageIdImpl(msgId)->getBitSet();
const auto& bitSet =
Commands::getMessageIdImpl(msgId)->getBitSet(ackType == CommandAck_AckType_Individual);
auto cmd = Commands::newAck(consumerId, msgId.ledgerId(), msgId.entryId(), bitSet, ackType, -1);
cnx->sendCommand(cmd);
LOG_DEBUG("ACK request is sent for message - [" << msgId.ledgerId() << ", " << msgId.entryId() << "]");
Expand Down
8 changes: 6 additions & 2 deletions lib/BatchMessageAcker.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ class BatchMessageAcker {
// by deserializing from raw bytes.
virtual bool ackIndividual(int32_t) { return false; }
virtual bool ackCumulative(int32_t) { return false; }
virtual const BitSet& getBitSet() noexcept {
static BitSet emptyBitSet;
return emptyBitSet;
}

bool shouldAckPreviousMessageId() noexcept {
bool expectedValue = false;
return prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
}

const BitSet& getBitSet() const noexcept { return bitSet_; }

private:
// When a batched message is acknowledged cumulatively, the previous message id will be acknowledged
// without batch index ACK enabled. However, it should be acknowledged only once. Use this flag to
Expand Down Expand Up @@ -80,6 +82,8 @@ class BatchMessageAckerImpl : public BatchMessageAcker {
return bitSet_.isEmpty();
}

const BitSet& getBitSet() const noexcept { return bitSet_; }

private:
BitSet bitSet_;
mutable std::mutex mutex_;
Expand Down
16 changes: 15 additions & 1 deletion lib/BatchedMessageIdImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,21 @@ class BatchedMessageIdImpl : public MessageIdImpl {

bool shouldAckPreviousMessageId() const { return acker_->shouldAckPreviousMessageId(); }

const BitSet& getBitSet() const noexcept override { return acker_->getBitSet(); }
const BitSet& getBitSet(bool individual) const noexcept override {
const auto& bitSet = acker_->getBitSet();
if (bitSet.isEmpty()) {
thread_local BitSet threadLocalBitSet;
threadLocalBitSet = BitSet{batchSize_};
threadLocalBitSet.set(0, batchSize_);
if (individual) {
threadLocalBitSet.clear(batchIndex_);
} else {
threadLocalBitSet.clear(0, batchIndex_ + 1);
}
return threadLocalBitSet;
}
return bitSet;
}

MessageId getPreviousMessageId() {
return MessageIdBuilder().ledgerId(ledgerId_).entryId(entryId_ - 1).partition(partition_).build();
Expand Down
2 changes: 1 addition & 1 deletion lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ SharedBuffer Commands::newMultiMessageAck(uint64_t consumerId, const std::set<Me
auto newMsgId = ack->add_message_id();
newMsgId->set_ledgerid(msgId.ledgerId());
newMsgId->set_entryid(msgId.entryId());
for (auto x : getMessageIdImpl(msgId)->getBitSet()) {
for (auto x : getMessageIdImpl(msgId)->getBitSet(true)) {
newMsgId->add_ack_set(x);
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/MessageIdImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class MessageIdImpl {
const std::string& getTopicName() { return *topicName_; }
void setTopicName(const std::string& topicName) { topicName_ = &topicName; }

virtual const BitSet& getBitSet() const noexcept {
virtual const BitSet& getBitSet(bool individual) const noexcept {
static const BitSet emptyBitSet;
return emptyBitSet;
}
Expand Down
52 changes: 52 additions & 0 deletions tests/AcknowledgeTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <pulsar/MessageIdBuilder.h>

#include <chrono>
#include <set>
Expand Down Expand Up @@ -302,4 +303,55 @@ TEST_F(AcknowledgeTest, testMixedCumulativeAck) {
ASSERT_EQ(ResultTimeout, consumer.getConsumer().receive(msg, 1000));
}

TEST_F(AcknowledgeTest, testMessageIdFromBuild) {
Client client(lookupUrl);
const std::string topic = "test-message-id-from-build-" + unique_str();

ConsumerWrapper consumer0;
consumer0.initialize(client, topic, "sub-0", false);

ConsumerWrapper consumer1;
consumer1.initialize(client, topic, "sub-1", true /* batch index ACK enabled */);

Producer producer;
auto producerConf =
ProducerConfiguration().setBatchingMaxMessages(100).setBatchingMaxPublishDelayMs(3600 * 1000);
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));

constexpr int numMessages = 5;
for (int i = 0; i < numMessages; i++) {
producer.sendAsync(MessageBuilder().setContent("msg").build(), nullptr);
}
producer.flush();

std::vector<MessageId> msgIds;
consumer0.receiveAtMost(numMessages);
for (auto&& msgId : consumer0.messageIdList()) {
msgIds.emplace_back(MessageIdBuilder()
.ledgerId(msgId.ledgerId())
.entryId(msgId.entryId())
.batchIndex(msgId.batchIndex())
.batchSize(msgId.batchSize())
.build());
}

consumer0.acknowledgeMessageIdAndRestart(msgIds[0], AckType::INDIVIDUAL);
consumer1.acknowledgeMessageIdAndRestart(msgIds[0], AckType::INDIVIDUAL);

Message msg;
ASSERT_EQ(ResultOk, consumer0.getConsumer().receive(msg, 1000));
EXPECT_EQ(msg.getMessageId().batchIndex(), 0);
ASSERT_EQ(ResultOk, consumer1.getConsumer().receive(msg, 1000));
EXPECT_EQ(msg.getMessageId().batchIndex(), 1);

consumer0.acknowledgeMessageIdAndRestart(msgIds[3], AckType::CUMULATIVE);
consumer1.acknowledgeMessageIdAndRestart(msgIds[3], AckType::CUMULATIVE);

ASSERT_EQ(ResultOk, consumer0.getConsumer().receive(msg, 1000));
EXPECT_EQ(msg.getMessageId().batchIndex(), 0);
ASSERT_EQ(ResultOk, consumer1.getConsumer().receive(msg, 1000));
EXPECT_EQ(msg.getMessageId().batchIndex(), 3 + 1);
client.close();
}

INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, testing::Values(100, 0));
20 changes: 17 additions & 3 deletions tests/ConsumerWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ConsumerWrapper {
// Enable the stats for cumulative ack
conf_.setUnAckedMessagesTimeoutMs(10000);
conf_.setBatchIndexAckEnabled(enableBatchIndexAck);
conf_.setAckGroupingTimeMs(0); // send ACK immediately
ASSERT_EQ(ResultOk, client_->subscribe(topic_, subscription_, conf_, consumer_));
}

Expand Down Expand Up @@ -95,9 +96,16 @@ class ConsumerWrapper {
// the acknowledgment by restarting the consumer.
void acknowledgeAndRestart(const std::vector<size_t>& indexes, AckType ackType) {
acknowledge(indexes, ackType);
messageIdList_.clear();
consumer_.close();
ASSERT_EQ(ResultOk, client_->subscribe(topic_, subscription_, conf_, consumer_));
restart();
}

void acknowledgeMessageIdAndRestart(MessageId msgId, AckType ackType) {
if (ackType == AckType::CUMULATIVE) {
consumer_.acknowledgeCumulative(msgId);
} else {
consumer_.acknowledge(msgId);
}
restart();
}

Consumer& getConsumer() noexcept { return consumer_; }
Expand All @@ -109,6 +117,12 @@ class ConsumerWrapper {
ConsumerConfiguration conf_;
Consumer consumer_;
std::vector<MessageId> messageIdList_;

void restart() {
messageIdList_.clear();
consumer_.close();
ASSERT_EQ(ResultOk, client_->subscribe(topic_, subscription_, conf_, consumer_));
}
};

} // namespace pulsar

0 comments on commit f860566

Please sign in to comment.