diff --git a/pulsar-client-cpp/lib/BatchMessageContainerBase.h b/pulsar-client-cpp/lib/BatchMessageContainerBase.h index 8a32d8e9dca8a..71eef5fab6287 100644 --- a/pulsar-client-cpp/lib/BatchMessageContainerBase.h +++ b/pulsar-client-cpp/lib/BatchMessageContainerBase.h @@ -112,6 +112,9 @@ class BatchMessageContainerBase : public boost::noncopyable { bool hasEnoughSpace(const Message& msg) const noexcept; bool isEmpty() const noexcept; + void processAndClear(std::function opSendMsgCallback, + FlushCallback flushCallback); + protected: // references to ProducerImpl's fields const std::string& topicName_; @@ -157,6 +160,29 @@ inline void BatchMessageContainerBase::resetStats() { sizeInBytes_ = 0; } +inline void BatchMessageContainerBase::processAndClear( + std::function opSendMsgCallback, FlushCallback flushCallback) { + if (isEmpty()) { + if (flushCallback) { + flushCallback(ResultOk); + } + } else { + const auto numBatches = getNumBatches(); + if (numBatches == 1) { + OpSendMsg opSendMsg; + Result result = createOpSendMsg(opSendMsg, flushCallback); + opSendMsgCallback(result, opSendMsg); + } else if (numBatches > 1) { + std::vector opSendMsgs; + std::vector results = createOpSendMsgs(opSendMsgs, flushCallback); + for (size_t i = 0; i < results.size(); i++) { + opSendMsgCallback(results[i], opSendMsgs[i]); + } + } // else numBatches is 0, do nothing + } + clear(); +} + inline std::ostream& operator<<(std::ostream& os, const BatchMessageContainerBase& container) { container.serialize(os); return os; diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index bb95584d875a9..a539889ac000d 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -269,13 +269,14 @@ std::shared_ptr ProducerImpl::getPendingCallback } if (batchMessageContainer_) { - OpSendMsg opSendMsg; - if (batchMessageContainer_->createOpSendMsg(opSendMsg) == ResultOk) { - callbacks->opSendMsgs.emplace_back(opSendMsg); - } - - releaseSemaphoreForSendOp(opSendMsg); - batchMessageContainer_->clear(); + batchMessageContainer_->processAndClear( + [this, &callbacks](Result result, const OpSendMsg& opSendMsg) { + if (result == ResultOk) { + callbacks->opSendMsgs.emplace_back(opSendMsg); + } + releaseSemaphoreForSendOp(opSendMsg); + }, + nullptr); } pendingMessagesQueue_.clear(); @@ -570,15 +571,8 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_); batchTimer_->cancel(); - if (PULSAR_UNLIKELY(batchMessageContainer_->isEmpty())) { - if (flushCallback) { - flushCallback(ResultOk); - } - } else { - const size_t numBatches = batchMessageContainer_->getNumBatches(); - if (numBatches == 1) { - OpSendMsg opSendMsg; - Result result = batchMessageContainer_->createOpSendMsg(opSendMsg, flushCallback); + batchMessageContainer_->processAndClear( + [this, &failures](Result result, const OpSendMsg& opSendMsg) { if (result == ResultOk) { sendMessage(opSendMsg); } else { @@ -588,27 +582,8 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall releaseSemaphoreForSendOp(opSendMsg); failures.add([opSendMsg, result] { opSendMsg.complete(result, {}); }); } - } else if (numBatches > 1) { - std::vector opSendMsgs; - std::vector results = batchMessageContainer_->createOpSendMsgs(opSendMsgs, flushCallback); - for (size_t i = 0; i < results.size(); i++) { - if (results[i] == ResultOk) { - sendMessage(opSendMsgs[i]); - } else { - // A spot has been reserved for this batch, but the batch failed to be pushed to the - // queue, so we need to release the spot manually - LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsgs[" << i - << "]: " << results[i]); - releaseSemaphoreForSendOp(opSendMsgs[i]); - const auto& opSendMsg = opSendMsgs[i]; - const auto result = results[i]; - failures.add([opSendMsg, result] { opSendMsg.complete(result, {}); }); - } - } - } // else numBatches is 0, do nothing - } - - batchMessageContainer_->clear(); + }, + flushCallback); return failures; } diff --git a/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc b/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc index 3bec21ac3d738..fcb558a7dadb9 100644 --- a/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc +++ b/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc @@ -41,7 +41,6 @@ class KeyBasedBatchingTest : public ::testing::Test { void TearDown() override { client_.close(); } - void setTopicName(const std::string& topicName) { topicName_ = topicName; } void initTopicName(const std::string& testName) { topicName_ = "KeyBasedBatchingTest-" + testName + "-" + std::to_string(time(nullptr)); } @@ -179,3 +178,34 @@ TEST_F(KeyBasedBatchingTest, testSingleBatch) { ASSERT_EQ(ResultTimeout, consumer_.receive(msg, 3000)); ASSERT_EQ(numMessageSent.load(), numMessages); } + +TEST_F(KeyBasedBatchingTest, testCloseBeforeSend) { + initTopicName("CloseBeforeSend"); + // Any asynchronous send won't be completed unless `close()` or `flush()` is triggered + initProducer(createDefaultProducerConfig().setBatchingMaxMessages(static_cast(-1))); + + std::mutex mtx; + std::vector results; + auto saveResult = [&mtx, &results](Result result) { + std::lock_guard lock(mtx); + results.emplace_back(result); + }; + auto sendAsync = [saveResult, this](const std::string& key, const std::string& value) { + producer_.sendAsync(MessageBuilder().setOrderingKey(key).setContent(value).build(), + [saveResult](Result result, const MessageId& id) { saveResult(result); }); + }; + + constexpr int numKeys = 10; + for (int i = 0; i < numKeys; i++) { + sendAsync("key-" + std::to_string(i), "value"); + } + + ASSERT_EQ(ResultOk, producer_.close()); + + // After close() completed, all callbacks should have failed with ResultAlreadyClosed + std::lock_guard lock(mtx); + ASSERT_EQ(results.size(), numKeys); + for (int i = 0; i < numKeys; i++) { + ASSERT_EQ(results[i], ResultAlreadyClosed) << " results[" << i << "] is " << results[i]; + } +}