Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 77afe26

Browse files
BewareMyPowermerlimat
authored andcommittedSep 30, 2020
[C++] Support key based batching (apache#7996)
### Motivation Support key based batching for C++ client. This is a catch up work on apache#4435. In addition, currently the implementation of `BatchMessageContainer` is coupling to `ProducerImpl` tightly. The batch message container registers a timer to producer's executor and the timeout callback is also producer's method. Even its `add` method could call `sendMessage` to send batch to producer's pending queue. These should be producer's work. ### Modifications - Add a `MessageAndCallbackBatch` to store a `MessageImpl` of serialized single messages and a callback list. - Add a `BatchMessageContainerBase` to provide interface methods and methods like update/clear message number/bytes, create `OpSendMsg`. - Let `ProducerImpl` manage the batch timer and determine whether to create `OpSendMsg` from `BatchMessageContainerBase` and send. - Make `BatchMessageContainer` inherit `BatchMessageContainerBase`, it only manages a `MessageAndCallbackBatch`. - Add a `BatchMessageKeyBasedContainer` that inherits `BatchMessageContainerBase`, it manages a map of message key and `MessageAndCallbackBatch`. - Add a producer config to change batching type. - Add some units tests for key based batching and a unit test for key shared subscription. ### Verifying this change This change added tests and can be verified as follows: - Current tests affected by default batching type, like `BatchMessageTest.*` and `BasicEndToEndTest.*` - Newly added tests: `KeyBasedBatchingTest.*` and `KeySharedConsumerTest.testKeyBasedBatching` * Change ClientConnection::getMaxMessageSize() to static method * Refactor the BatchMessageContainer * Add batching type to producer config * Fix testBigMessageSizeBatching error * Add key based batching container * Make batching type config work * Add unit tests for key based batching and key shared subscription * Fix flush test error possibly caused by timeout overflow * Make BatchMessageKeyBasedContainer work for a single batch
1 parent cf42f41 commit 77afe26

26 files changed

+1182
-381
lines changed
 

‎pulsar-client-cpp/include/pulsar/Message.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ class PULSAR_PUBLIC Message {
171171
friend class ConsumerImpl;
172172
friend class ProducerImpl;
173173
friend class Commands;
174-
friend class BatchMessageContainer;
174+
friend class BatchMessageContainerBase;
175175
friend class BatchAcknowledgementTracker;
176176
friend class PulsarWrapper;
177177
friend class MessageBatch;

‎pulsar-client-cpp/include/pulsar/ProducerConfiguration.h

+31-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,30 @@ class PULSAR_PUBLIC ProducerConfiguration {
5555
BoostHash,
5656
JavaStringHash
5757
};
58+
enum BatchingType
59+
{
60+
/**
61+
* Default batching.
62+
*
63+
* <p>incoming single messages:
64+
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
65+
*
66+
* <p>batched into single batch message:
67+
* [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
68+
*/
69+
DefaultBatching,
70+
71+
/**
72+
* Key based batching.
73+
*
74+
* <p>incoming single messages:
75+
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
76+
*
77+
* <p>batched into single batch message:
78+
* [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
79+
*/
80+
KeyBasedBatching
81+
};
5882

5983
ProducerConfiguration();
6084
~ProducerConfiguration();
@@ -152,13 +176,19 @@ class PULSAR_PUBLIC ProducerConfiguration {
152176
ProducerConfiguration& setBatchingMaxPublishDelayMs(const unsigned long& batchingMaxPublishDelayMs);
153177
const unsigned long& getBatchingMaxPublishDelayMs() const;
154178

179+
/**
180+
* @see BatchingType
181+
*/
182+
ProducerConfiguration& setBatchingType(BatchingType batchingType);
183+
BatchingType getBatchingType() const;
184+
155185
const CryptoKeyReaderPtr getCryptoKeyReader() const;
156186
ProducerConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
157187

158188
ProducerCryptoFailureAction getCryptoFailureAction() const;
159189
ProducerConfiguration& setCryptoFailureAction(ProducerCryptoFailureAction action);
160190

161-
std::set<std::string>& getEncryptionKeys();
191+
const std::set<std::string>& getEncryptionKeys() const;
162192
bool isEncryptionEnabled() const;
163193
ProducerConfiguration& addEncryptionKey(std::string key);
164194

‎pulsar-client-cpp/lib/BatchMessageContainer.cc

+42-159
Original file line numberDiff line numberDiff line change
@@ -17,180 +17,63 @@
1717
* under the License.
1818
*/
1919
#include "BatchMessageContainer.h"
20-
#include <memory>
21-
#include <functional>
20+
#include "ClientConnection.h"
21+
#include "Commands.h"
22+
#include "LogUtils.h"
23+
#include "MessageImpl.h"
24+
#include "ProducerImpl.h"
25+
#include "TimeUtils.h"
26+
#include <stdexcept>
2227

23-
namespace pulsar {
24-
25-
static ObjectPool<MessageImpl, 1000> messagePool;
26-
static ObjectPool<BatchMessageContainer::MessageContainerList, 1000> messageContainerListPool;
2728
DECLARE_LOG_OBJECT()
2829

29-
BatchMessageContainer::BatchMessageContainer(ProducerImpl& producer)
30-
: maxAllowedNumMessagesInBatch_(producer.conf_.getBatchingMaxMessages()),
31-
maxAllowedMessageBatchSizeInBytes_(producer.conf_.getBatchingMaxAllowedSizeInBytes()),
32-
topicName_(producer.topic_),
33-
producerName_(producer.producerName_),
34-
compressionType_(producer.conf_.getCompressionType()),
35-
producer_(producer),
36-
impl_(messagePool.create()),
37-
timer_(producer.executor_->createDeadlineTimer()),
38-
batchSizeInBytes_(0),
39-
messagesContainerListPtr_(messageContainerListPool.create()),
40-
averageBatchSize_(0),
41-
numberOfBatchesSent_(0) {
42-
messagesContainerListPtr_->reserve(1000);
43-
LOG_INFO(*this << " BatchMessageContainer constructed");
44-
}
45-
46-
bool BatchMessageContainer::add(const Message& msg, SendCallback sendCallback, bool disableCheck) {
47-
// disableCheck is needed to avoid recursion in case the batchSizeInKB < IndividualMessageSizeInKB
48-
LOG_DEBUG(*this << " Called add function for [message = " << msg << "] [disableCheck = " << disableCheck
49-
<< "]");
50-
if (!(disableCheck || hasSpaceInBatch(msg))) {
51-
LOG_DEBUG(*this << " Batch is full");
52-
bool hasMessages = !messagesContainerListPtr_->empty();
53-
bool pushedToPendingQueue = sendMessage(NULL);
54-
bool result = add(msg, sendCallback, true);
55-
if (hasMessages && !pushedToPendingQueue) {
56-
// The msg failed to be pushed to the producer's queue, so the reserved spot before won't be
57-
// released and we must return false to tell the producer to release the spot.
58-
// Exceptionally, `hasSpaceInBatch` returns false just because `msg` is too big before compressed,
59-
// while there're no messages before. In this case, the spots have already been released so we
60-
// can't return false simply.
61-
return false;
62-
}
63-
return result;
64-
}
65-
if (messagesContainerListPtr_->empty()) {
66-
// First message to be added
67-
startTimer();
68-
Commands::initBatchMessageMetadata(msg, impl_->metadata);
69-
// TODO - add this to Commands.cc
70-
impl_->metadata.set_producer_name(producerName_);
71-
}
72-
batchSizeInBytes_ += msg.impl_->payload.readableBytes();
73-
74-
LOG_DEBUG(*this << " Before serialization payload size in bytes = " << impl_->payload.readableBytes());
75-
Commands::serializeSingleMessageInBatchWithPayload(msg, impl_->payload,
76-
maxAllowedMessageBatchSizeInBytes_);
77-
LOG_DEBUG(*this << " After serialization payload size in bytes = " << impl_->payload.readableBytes());
30+
namespace pulsar {
7831

79-
messagesContainerListPtr_->emplace_back(msg, sendCallback);
32+
BatchMessageContainer::BatchMessageContainer(const ProducerImpl& producer)
33+
: BatchMessageContainerBase(producer) {}
8034

81-
LOG_DEBUG(*this << " Number of messages in Batch = " << messagesContainerListPtr_->size());
82-
LOG_DEBUG(*this << " Batch Payload Size In Bytes = " << batchSizeInBytes_);
83-
bool hasOnlyOneMessage = (messagesContainerListPtr_->size() == 1);
84-
if (isFull()) {
85-
LOG_DEBUG(*this << " Batch is full.");
86-
// If there're more than one messages in the batch, even if it was pushed to the queue successfully,
87-
// we also returns false to release one spot, because there're two spots to be released. One is
88-
// reserved when the first message arrived, another is reserved when the current message arrived.
89-
bool pushedToPendingQueue = sendMessage(NULL);
90-
return hasOnlyOneMessage && pushedToPendingQueue;
91-
}
92-
// A batch of messages only need one spot, so returns false when more messages were added to the batch,
93-
// then outer ProducerImpl::sendAsync() will release unnecessary reserved spots
94-
return hasOnlyOneMessage;
35+
BatchMessageContainer::~BatchMessageContainer() {
36+
LOG_DEBUG(*this << " destructed");
37+
LOG_INFO("[numberOfBatchesSent = " << numberOfBatchesSent_
38+
<< "] [averageBatchSize_ = " << averageBatchSize_ << "]");
9539
}
9640

97-
void BatchMessageContainer::startTimer() {
98-
const unsigned long& publishDelayInMs = producer_.conf_.getBatchingMaxPublishDelayMs();
99-
LOG_DEBUG(*this << " Timer started with expiry after " << publishDelayInMs);
100-
timer_->expires_from_now(boost::posix_time::milliseconds(publishDelayInMs));
101-
timer_->async_wait(
102-
std::bind(&pulsar::ProducerImpl::batchMessageTimeoutHandler, &producer_, std::placeholders::_1));
41+
bool BatchMessageContainer::add(const Message& msg, const SendCallback& callback) {
42+
LOG_DEBUG("Before add: " << *this << " [message = " << msg << "]");
43+
batch_.add(msg, callback);
44+
updateStats(msg);
45+
LOG_DEBUG("After add: " << *this);
46+
return isFull();
10347
}
10448

105-
bool BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
106-
// Call this function after acquiring the ProducerImpl lock
107-
LOG_DEBUG(*this << "Sending the batch message container");
108-
if (isEmpty()) {
109-
LOG_DEBUG(*this << " Batch is empty - returning.");
110-
if (flushCallback) {
111-
flushCallback(ResultOk);
112-
}
113-
return false;
114-
}
115-
impl_->metadata.set_num_messages_in_batch(messagesContainerListPtr_->size());
116-
compressPayLoad();
117-
118-
SharedBuffer encryptedPayload;
119-
if (!producer_.encryptMessage(impl_->metadata, impl_->payload, encryptedPayload)) {
120-
batchMessageCallBack(ResultCryptoError, MessageId{}, messagesContainerListPtr_, nullptr);
121-
clear();
122-
return false;
123-
}
124-
impl_->payload = encryptedPayload;
125-
126-
if (impl_->payload.readableBytes() > producer_.keepMaxMessageSize_) {
127-
// At this point the compressed batch is above the overall MaxMessageSize. There
128-
// can only 1 single message in the batch at this point.
129-
batchMessageCallBack(ResultMessageTooBig, MessageId{}, messagesContainerListPtr_, nullptr);
130-
clear();
131-
return false;
132-
}
133-
134-
Message msg;
135-
msg.impl_ = impl_;
136-
137-
// bind keeps a copy of the parameters
138-
SendCallback callback = std::bind(&BatchMessageContainer::batchMessageCallBack, std::placeholders::_1,
139-
std::placeholders::_2, messagesContainerListPtr_, flushCallback);
140-
141-
producer_.sendMessage(msg, callback);
142-
clear();
143-
return true;
49+
void BatchMessageContainer::clear() {
50+
averageBatchSize_ =
51+
(batch_.size() + averageBatchSize_ * numberOfBatchesSent_) / (numberOfBatchesSent_ + 1);
52+
numberOfBatchesSent_++;
53+
batch_.clear();
54+
resetStats();
55+
LOG_DEBUG(*this << " clear() called");
14456
}
14557

146-
void BatchMessageContainer::compressPayLoad() {
147-
if (compressionType_ != CompressionNone) {
148-
impl_->metadata.set_compression(CompressionCodecProvider::convertType(compressionType_));
149-
impl_->metadata.set_uncompressed_size(impl_->payload.readableBytes());
150-
}
151-
impl_->payload = CompressionCodecProvider::getCodec(compressionType_).encode(impl_->payload);
58+
Result BatchMessageContainer::createOpSendMsg(OpSendMsg& opSendMsg,
59+
const FlushCallback& flushCallback) const {
60+
return createOpSendMsgHelper(opSendMsg, flushCallback, batch_);
15261
}
15362

154-
SharedBuffer BatchMessageContainer::getBatchedPayload() { return impl_->payload; }
155-
156-
void BatchMessageContainer::clear() {
157-
LOG_DEBUG(*this << " BatchMessageContainer::clear() called");
158-
timer_->cancel();
159-
averageBatchSize_ = (messagesContainerListPtr_->size() + (averageBatchSize_ * numberOfBatchesSent_)) /
160-
(numberOfBatchesSent_ + 1);
161-
numberOfBatchesSent_++;
162-
messagesContainerListPtr_ = messageContainerListPool.create();
163-
// Try to optimize this
164-
messagesContainerListPtr_->reserve(10000);
165-
impl_ = messagePool.create();
166-
batchSizeInBytes_ = 0;
63+
std::vector<Result> BatchMessageContainer::createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
64+
const FlushCallback& flushCallback) const {
65+
throw std::runtime_error("createOpSendMsgs is not supported for BatchMessageContainer");
16766
}
16867

169-
void BatchMessageContainer::batchMessageCallBack(Result r, const MessageId& messageId,
170-
MessageContainerListPtr messagesContainerListPtr,
171-
FlushCallback flushCallback) {
172-
if (!messagesContainerListPtr) {
173-
if (flushCallback) {
174-
flushCallback(ResultOk);
175-
}
176-
return;
177-
}
178-
LOG_DEBUG("BatchMessageContainer::batchMessageCallBack called with [Result = "
179-
<< r << "] [numOfMessages = " << messagesContainerListPtr->size() << "]");
180-
size_t batch_size = messagesContainerListPtr->size();
181-
for (size_t i = 0; i < batch_size; i++) {
182-
MessageId messageIdInBatch(messageId.partition(), messageId.ledgerId(), messageId.entryId(), i);
183-
messagesContainerListPtr->operator[](i).callBack(r, messageIdInBatch);
184-
}
185-
if (flushCallback) {
186-
flushCallback(ResultOk);
187-
}
68+
void BatchMessageContainer::serialize(std::ostream& os) const {
69+
os << "{ BatchMessageContainer [size = " << numMessages_ //
70+
<< "] [bytes = " << sizeInBytes_ //
71+
<< "] [maxSize = " << getMaxNumMessages() //
72+
<< "] [maxBytes = " << getMaxSizeInBytes() //
73+
<< "] [topicName = " << topicName_ //
74+
<< "] [numberOfBatchesSent_ = " << numberOfBatchesSent_ //
75+
<< "] [averageBatchSize_ = " << averageBatchSize_ //
76+
<< "] }";
18877
}
18978

190-
BatchMessageContainer::~BatchMessageContainer() {
191-
timer_->cancel();
192-
LOG_DEBUG(*this << " BatchMessageContainer Object destructed");
193-
LOG_INFO("[numberOfBatchesSent = " << numberOfBatchesSent_
194-
<< "] [averageBatchSize = " << averageBatchSize_ << "]");
195-
}
19679
} // namespace pulsar

0 commit comments

Comments
 (0)
Please sign in to comment.