Skip to content

Commit

Permalink
[C++] Fix send callback might not be invoked in key based batching (#…
Browse files Browse the repository at this point in the history
…14898)

* [C++] Fix send callback might not be invoked in key based batching

### Motivation

When C++ client enables key based batching, there is a chance that the
send callback is not invoked. See
https://github.com/apache/pulsar/blob/32df93f693bfdf42953bd728a12ecdea1796dcc8/pulsar-client-cpp/lib/ProducerImpl.cc#L272-L275

If a batch container has multiple batches, only one batch could be
processed during `closeAsync`. Even worse, the semaphores of other
batches won't be released.

### Modifications

- Add a `clearPendingBatches` method to clear all pending batches and
  process them. Then call this method in `closeAsync` and
  `getPendingCallbacksWhenFailed`.
- Add a test `testCloseBeforeSend` to verify when a producer has
  multiple pending batches, all callbacks can be invoked in
  `closeAsync`.

* Add processAndClear() to batch message container

(cherry picked from commit f3295ff)
  • Loading branch information
BewareMyPower authored and codelipenghui committed Apr 19, 2022
1 parent be6e06e commit a74f242
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 38 deletions.
26 changes: 26 additions & 0 deletions pulsar-client-cpp/lib/BatchMessageContainerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ class BatchMessageContainerBase : public boost::noncopyable {
bool hasEnoughSpace(const Message& msg) const noexcept;
bool isEmpty() const noexcept;

void processAndClear(std::function<void(Result, const OpSendMsg&)> opSendMsgCallback,
FlushCallback flushCallback);

protected:
// references to ProducerImpl's fields
const std::string& topicName_;
Expand Down Expand Up @@ -157,6 +160,29 @@ inline void BatchMessageContainerBase::resetStats() {
sizeInBytes_ = 0;
}

inline void BatchMessageContainerBase::processAndClear(
std::function<void(Result, const OpSendMsg&)> opSendMsgCallback, FlushCallback flushCallback) {
if (isEmpty()) {
if (flushCallback) {
flushCallback(ResultOk);
}
} else {
const auto numBatches = getNumBatches();
if (numBatches == 1) {
OpSendMsg opSendMsg;
Result result = createOpSendMsg(opSendMsg, flushCallback);
opSendMsgCallback(result, opSendMsg);
} else if (numBatches > 1) {
std::vector<OpSendMsg> opSendMsgs;
std::vector<Result> results = createOpSendMsgs(opSendMsgs, flushCallback);
for (size_t i = 0; i < results.size(); i++) {
opSendMsgCallback(results[i], opSendMsgs[i]);
}
} // else numBatches is 0, do nothing
}
clear();
}

inline std::ostream& operator<<(std::ostream& os, const BatchMessageContainerBase& container) {
container.serialize(os);
return os;
Expand Down
49 changes: 12 additions & 37 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,14 @@ std::shared_ptr<ProducerImpl::PendingCallbacks> ProducerImpl::getPendingCallback
}

if (batchMessageContainer_) {
OpSendMsg opSendMsg;
if (batchMessageContainer_->createOpSendMsg(opSendMsg) == ResultOk) {
callbacks->opSendMsgs.emplace_back(opSendMsg);
}

releaseSemaphoreForSendOp(opSendMsg);
batchMessageContainer_->clear();
batchMessageContainer_->processAndClear(
[this, &callbacks](Result result, const OpSendMsg& opSendMsg) {
if (result == ResultOk) {
callbacks->opSendMsgs.emplace_back(opSendMsg);
}
releaseSemaphoreForSendOp(opSendMsg);
},
nullptr);
}
pendingMessagesQueue_.clear();

Expand Down Expand Up @@ -570,15 +571,8 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall
LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_);
batchTimer_->cancel();

if (PULSAR_UNLIKELY(batchMessageContainer_->isEmpty())) {
if (flushCallback) {
flushCallback(ResultOk);
}
} else {
const size_t numBatches = batchMessageContainer_->getNumBatches();
if (numBatches == 1) {
OpSendMsg opSendMsg;
Result result = batchMessageContainer_->createOpSendMsg(opSendMsg, flushCallback);
batchMessageContainer_->processAndClear(
[this, &failures](Result result, const OpSendMsg& opSendMsg) {
if (result == ResultOk) {
sendMessage(opSendMsg);
} else {
Expand All @@ -588,27 +582,8 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall
releaseSemaphoreForSendOp(opSendMsg);
failures.add([opSendMsg, result] { opSendMsg.complete(result, {}); });
}
} else if (numBatches > 1) {
std::vector<OpSendMsg> opSendMsgs;
std::vector<Result> results = batchMessageContainer_->createOpSendMsgs(opSendMsgs, flushCallback);
for (size_t i = 0; i < results.size(); i++) {
if (results[i] == ResultOk) {
sendMessage(opSendMsgs[i]);
} else {
// A spot has been reserved for this batch, but the batch failed to be pushed to the
// queue, so we need to release the spot manually
LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsgs[" << i
<< "]: " << results[i]);
releaseSemaphoreForSendOp(opSendMsgs[i]);
const auto& opSendMsg = opSendMsgs[i];
const auto result = results[i];
failures.add([opSendMsg, result] { opSendMsg.complete(result, {}); });
}
}
} // else numBatches is 0, do nothing
}

batchMessageContainer_->clear();
},
flushCallback);
return failures;
}

Expand Down
32 changes: 31 additions & 1 deletion pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class KeyBasedBatchingTest : public ::testing::Test {

void TearDown() override { client_.close(); }

void setTopicName(const std::string& topicName) { topicName_ = topicName; }
void initTopicName(const std::string& testName) {
topicName_ = "KeyBasedBatchingTest-" + testName + "-" + std::to_string(time(nullptr));
}
Expand Down Expand Up @@ -179,3 +178,34 @@ TEST_F(KeyBasedBatchingTest, testSingleBatch) {
ASSERT_EQ(ResultTimeout, consumer_.receive(msg, 3000));
ASSERT_EQ(numMessageSent.load(), numMessages);
}

TEST_F(KeyBasedBatchingTest, testCloseBeforeSend) {
initTopicName("CloseBeforeSend");
// Any asynchronous send won't be completed unless `close()` or `flush()` is triggered
initProducer(createDefaultProducerConfig().setBatchingMaxMessages(static_cast<unsigned>(-1)));

std::mutex mtx;
std::vector<Result> results;
auto saveResult = [&mtx, &results](Result result) {
std::lock_guard<std::mutex> lock(mtx);
results.emplace_back(result);
};
auto sendAsync = [saveResult, this](const std::string& key, const std::string& value) {
producer_.sendAsync(MessageBuilder().setOrderingKey(key).setContent(value).build(),
[saveResult](Result result, const MessageId& id) { saveResult(result); });
};

constexpr int numKeys = 10;
for (int i = 0; i < numKeys; i++) {
sendAsync("key-" + std::to_string(i), "value");
}

ASSERT_EQ(ResultOk, producer_.close());

// After close() completed, all callbacks should have failed with ResultAlreadyClosed
std::lock_guard<std::mutex> lock(mtx);
ASSERT_EQ(results.size(), numKeys);
for (int i = 0; i < numKeys; i++) {
ASSERT_EQ(results[i], ResultAlreadyClosed) << " results[" << i << "] is " << results[i];
}
}

0 comments on commit a74f242

Please sign in to comment.