Skip to content

Commit

Permalink
[fix] Fix MessageId serialization when it's a batched message (#153)
Browse files Browse the repository at this point in the history
* [fix] Fix MessageId serialization when it's a batched message

### Motivation

The serialization and deserialization of `MessageId` became wrong after
#132.
1. The batch size is not serialized.
2. `BatchedMessageIdImpl` could never be deserialized.

The wrong behaviors could lead to a result that all MessageId objects
created from deserialization does not have a batch size, which might
make `ReaderTest.testReaderOnSpecificMessageWithBatches` fail when the
cmake build type is `Debug`. What's worse is that a MessageId created
from deserialization is always treated as a `MessageIdImpl`, on which
the acknowledgment will have wrong behavior.

### Modifications

Serialize the batch size if it's valid. In deserialization, create a
`BatchedMessageIdImpl` when the batch index and the batch size are valid
as a batched message.

There is a problem that if a `MessageId` is created from
deserialization, it cannot share a `BatchMessageAcker` with other
`MessageId` objects. In this case, create a fake `BatchMessageAcker`
that returns false for both `ackIndividual` and `ackCumulative` methods.
It will make acknowledgment always fail but will fall back to batch
index ACK if batch index ACK is enabled.

Add the `-DCMAKE_BUILD_TYPE=Debug` for tests to enable assertions.

* Add virtual destructor
  • Loading branch information
BewareMyPower authored Dec 28, 2022
1 parent 5d5ed2a commit d03ff20
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-pr-validation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
sudo curl -o /gtest-parallel https://raw.githubusercontent.com/google/gtest-parallel/master/gtest_parallel.py
- name: CMake
run: cmake . -DBUILD_PERF_TOOLS=ON
run: cmake . -DCMAKE_BUILD_TYPE=Debug -DBUILD_PERF_TOOLS=ON

- name: Check formatting
run: make check-format
Expand Down
41 changes: 28 additions & 13 deletions lib/BatchMessageAcker.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,40 +31,55 @@ class BatchMessageAcker;
using BatchMessageAckerPtr = std::shared_ptr<BatchMessageAcker>;

class BatchMessageAcker {
public:
virtual ~BatchMessageAcker() {}
// Return false for these methods so that batch index ACK will be falled back to if the acker is created
// by deserializing from raw bytes.
virtual bool ackIndividual(int32_t) { return false; }
virtual bool ackCumulative(int32_t) { return false; }

bool shouldAckPreviousMessageId() noexcept {
bool expectedValue = false;
return prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
}

private:
// When a batched message is acknowledged cumulatively, the previous message id will be acknowledged
// without batch index ACK enabled. However, it should be acknowledged only once. Use this flag to
// determine whether to acknowledge the previous message id.
std::atomic_bool prevBatchCumulativelyAcked_{false};
};

class BatchMessageAckerImpl : public BatchMessageAcker {
public:
using Lock = std::lock_guard<std::mutex>;

static BatchMessageAckerPtr create(int32_t batchSize) {
return std::make_shared<BatchMessageAcker>(batchSize);
if (batchSize > 0) {
return std::make_shared<BatchMessageAckerImpl>(batchSize);
} else {
return std::make_shared<BatchMessageAcker>();
}
}

BatchMessageAcker(int32_t batchSize) : bitSet_(batchSize) { bitSet_.set(0, batchSize); }
BatchMessageAckerImpl(int32_t batchSize) : bitSet_(batchSize) { bitSet_.set(0, batchSize); }

bool ackIndividual(int32_t batchIndex) {
bool ackIndividual(int32_t batchIndex) override {
Lock lock{mutex_};
bitSet_.clear(batchIndex);
return bitSet_.isEmpty();
}

bool ackCumulative(int32_t batchIndex) {
bool ackCumulative(int32_t batchIndex) override {
Lock lock{mutex_};
// The range of cumulative acknowledgment is closed while BitSet::clear accepts a left-closed
// right-open range.
bitSet_.clear(0, batchIndex + 1);
return bitSet_.isEmpty();
}

bool shouldAckPreviousMessageId() noexcept {
bool expectedValue = false;
return prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
}

private:
BitSet bitSet_;
// When a batched message is acknowledged cumulatively, the previous message id will be acknowledged
// without batch index ACK enabled. However, it should be acknowledged only once. Use this flag to
// determine whether to acknowledge the previous message id.
std::atomic_bool prevBatchCumulativelyAcked_{false};
mutable std::mutex mutex_;
};

Expand Down
2 changes: 1 addition & 1 deletion lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection

int skippedMessages = 0;

auto acker = BatchMessageAcker::create(batchSize);
auto acker = BatchMessageAckerImpl::create(batchSize);
for (int i = 0; i < batchSize; i++) {
// This is a cheap copy since message contains only one shared pointer (impl_)
Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker);
Expand Down
2 changes: 1 addition & 1 deletion lib/MessageBatch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ MessageBatch& MessageBatch::parseFrom(const SharedBuffer& payload, uint32_t batc
impl_->metadata.set_num_messages_in_batch(batchSize);
batch_.clear();

auto acker = BatchMessageAcker::create(batchSize);
auto acker = BatchMessageAckerImpl::create(batchSize);
for (int i = 0; i < batchSize; ++i) {
batch_.push_back(Commands::deSerializeSingleMessageInBatch(batchMessage_, i, batchSize, acker));
}
Expand Down
4 changes: 4 additions & 0 deletions lib/MessageId.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ void MessageId::serialize(std::string& result) const {
idData.set_batch_index(impl_->batchIndex_);
}

if (impl_->batchSize_ != 0) {
idData.set_batch_size(impl_->batchSize_);
}

auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
if (chunkMsgId) {
proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id();
Expand Down
9 changes: 6 additions & 3 deletions lib/MessageIdBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
#include <assert.h>
#include <pulsar/MessageIdBuilder.h>

#include "BatchedMessageIdImpl.h"
#include "MessageIdImpl.h"
#include "PulsarApi.pb.h"

Expand All @@ -42,8 +42,11 @@ MessageIdBuilder MessageIdBuilder::from(const proto::MessageIdData& messageIdDat
}

MessageId MessageIdBuilder::build() const {
assert(impl_->batchIndex_ < 0 || (impl_->batchSize_ > impl_->batchIndex_));
return MessageId{impl_};
if (impl_->batchIndex_ >= 0 && impl_->batchSize_ > 0) {
return MessageId{std::make_shared<BatchedMessageIdImpl>(*impl_, BatchMessageAckerImpl::create(0))};
} else {
return MessageId{impl_};
}
}

MessageIdBuilder& MessageIdBuilder::ledgerId(int64_t ledgerId) {
Expand Down
28 changes: 26 additions & 2 deletions tests/MessageIdTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,43 @@
#include <string>

#include "PulsarFriend.h"
#include "lib/BatchedMessageIdImpl.h"
#include "lib/Commands.h"
#include "lib/MessageIdUtil.h"

using namespace pulsar;

TEST(MessageIdTest, testSerialization) {
auto msgId = MessageIdBuilder().ledgerId(1L).entryId(2L).batchIndex(3L).build();
auto msgId = MessageIdBuilder().ledgerId(1L).entryId(2L).partition(10).batchIndex(3).build();

std::string serialized;
msgId.serialize(serialized);

MessageId deserialized = MessageId::deserialize(serialized);
ASSERT_FALSE(std::dynamic_pointer_cast<BatchedMessageIdImpl>(Commands::getMessageIdImpl(deserialized)));
ASSERT_EQ(deserialized.ledgerId(), 1L);
ASSERT_EQ(deserialized.entryId(), 2L);
ASSERT_EQ(deserialized.partition(), 10);
ASSERT_EQ(deserialized.batchIndex(), 3);
ASSERT_EQ(deserialized.batchSize(), 0);

ASSERT_EQ(msgId, deserialized);
// Only a MessageId whose batch index and batch size are both valid can be deserialized as a batched
// message id.
msgId = MessageIdBuilder().ledgerId(3L).entryId(1L).batchIndex(0).batchSize(1).build();
msgId.serialize(serialized);
deserialized = MessageId::deserialize(serialized);
auto batchedMessageId =
std::dynamic_pointer_cast<BatchedMessageIdImpl>(Commands::getMessageIdImpl(deserialized));
ASSERT_TRUE(batchedMessageId);
// The BatchMessageAcker object created from deserialization is a fake implementation that all acknowledge
// methods return false.
ASSERT_FALSE(batchedMessageId->ackIndividual(0));
ASSERT_FALSE(batchedMessageId->ackCumulative(0));
ASSERT_EQ(deserialized.ledgerId(), 3L);
ASSERT_EQ(deserialized.entryId(), 1L);
ASSERT_EQ(deserialized.partition(), -1);
ASSERT_EQ(deserialized.batchIndex(), 0);
ASSERT_EQ(deserialized.batchSize(), 1);
}

TEST(MessageIdTest, testCompareLedgerAndEntryId) {
Expand Down

0 comments on commit d03ff20

Please sign in to comment.