Skip to content

Commit f28f985

Browse files
authored
[improve][client-c++]Support include message header size when check maxMessageSize (apache#17289)
### Motivation See: apache#17188 ### Modifications Support include message header size when check maxMessageSize for cpp client
1 parent 63d4cf2 commit f28f985

File tree

3 files changed

+110
-26
lines changed

3 files changed

+110
-26
lines changed

pulsar-client-cpp/lib/ProducerImpl.cc

+46-24
Original file line numberDiff line numberDiff line change
@@ -417,37 +417,18 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
417417
callback(result, {});
418418
};
419419

420+
auto& msgMetadata = msg.impl_->metadata;
420421
const bool compressed = !canAddToBatch(msg);
421422
const auto payload =
422423
compressed ? applyCompression(uncompressedPayload, conf_.getCompressionType()) : uncompressedPayload;
423424
const auto compressedSize = static_cast<uint32_t>(payload.readableBytes());
424425
const auto maxMessageSize = static_cast<uint32_t>(ClientConnection::getMaxMessageSize());
425426

426-
if (compressed && compressedSize > ClientConnection::getMaxMessageSize() && !chunkingEnabled_) {
427-
LOG_WARN(getName() << " - compressed Message payload size " << payload.readableBytes()
428-
<< " cannot exceed " << ClientConnection::getMaxMessageSize()
429-
<< " bytes unless chunking is enabled");
430-
handleFailedResult(ResultMessageTooBig);
431-
return;
432-
}
433-
434-
auto& msgMetadata = msg.impl_->metadata;
435427
if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name()) {
436428
handleFailedResult(ResultInvalidMessage);
437429
return;
438430
}
439431

440-
const int totalChunks =
441-
canAddToBatch(msg) ? 1 : getNumOfChunks(compressedSize, ClientConnection::getMaxMessageSize());
442-
// Each chunk should be sent individually, so try to acquire extra permits for chunks.
443-
for (int i = 0; i < (totalChunks - 1); i++) {
444-
const auto result = canEnqueueRequest(0); // size is 0 because the memory has already reserved
445-
if (result != ResultOk) {
446-
handleFailedResult(result);
447-
return;
448-
}
449-
}
450-
451432
Lock lock(mutex_);
452433
uint64_t sequenceId;
453434
if (!msgMetadata.has_sequence_id()) {
@@ -457,6 +438,31 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
457438
}
458439
setMessageMetadata(msg, sequenceId, uncompressedSize);
459440

441+
auto payloadChunkSize = maxMessageSize;
442+
int totalChunks;
443+
if (!compressed || !chunkingEnabled_) {
444+
totalChunks = 1;
445+
} else {
446+
const auto metadataSize = static_cast<uint32_t>(msgMetadata.ByteSizeLong());
447+
if (metadataSize >= maxMessageSize) {
448+
LOG_WARN(getName() << " - metadata size " << metadataSize << " cannot exceed " << maxMessageSize
449+
<< " bytes");
450+
handleFailedResult(ResultMessageTooBig);
451+
return;
452+
}
453+
payloadChunkSize = maxMessageSize - metadataSize;
454+
totalChunks = getNumOfChunks(compressedSize, payloadChunkSize);
455+
}
456+
457+
// Each chunk should be sent individually, so try to acquire extra permits for chunks.
458+
for (int i = 0; i < (totalChunks - 1); i++) {
459+
const auto result = canEnqueueRequest(0); // size is 0 because the memory has already reserved
460+
if (result != ResultOk) {
461+
handleFailedResult(result);
462+
return;
463+
}
464+
}
465+
460466
if (canAddToBatch(msg)) {
461467
// Batching is enabled and the message is not delayed
462468
if (!batchMessageContainer_->hasEnoughSpace(msg)) {
@@ -508,7 +514,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
508514
if (sendChunks) {
509515
msgMetadata.set_chunk_id(chunkId);
510516
}
511-
const uint32_t endIndex = std::min(compressedSize, beginIndex + maxMessageSize);
517+
const uint32_t endIndex = std::min(compressedSize, beginIndex + payloadChunkSize);
512518
auto chunkedPayload = payload.slice(beginIndex, endIndex - beginIndex);
513519
beginIndex = endIndex;
514520

@@ -517,10 +523,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
517523
handleFailedResult(ResultCryptoError);
518524
return;
519525
}
526+
OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr,
527+
producerId_, sequenceId, conf_.getSendTimeout(),
528+
1, uncompressedSize};
529+
530+
if (!chunkingEnabled_) {
531+
const uint32_t msgMetadataSize = op.metadata_.ByteSize();
532+
const uint32_t payloadSize = op.payload_.readableBytes();
533+
const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize;
534+
if (msgHeadersAndPayloadSize > maxMessageSize) {
535+
lock.unlock();
536+
releaseSemaphoreForSendOp(op);
537+
LOG_WARN(getName()
538+
<< " - compressed Message size " << msgHeadersAndPayloadSize << " cannot exceed "
539+
<< maxMessageSize << " bytes unless chunking is enabled");
540+
handleFailedResult(ResultMessageTooBig);
541+
return;
542+
}
543+
}
520544

521-
sendMessage(OpSendMsg{msgMetadata, encryptedPayload,
522-
(chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId,
523-
conf_.getSendTimeout(), 1, uncompressedSize});
545+
sendMessage(op);
524546
}
525547
}
526548
}

pulsar-client-cpp/tests/BasicEndToEndTest.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -593,8 +593,8 @@ TEST(BasicEndToEndTest, testMessageTooBig) {
593593
result = producer.send(msg);
594594
ASSERT_EQ(ResultMessageTooBig, result);
595595

596-
// Anything up to MaxMessageSize should be allowed
597-
size = ClientConnection::getMaxMessageSize();
596+
// Anything up to MaxMessageSize - MetadataSize should be allowed
597+
size = ClientConnection::getMaxMessageSize() - 32; /*the default message metadata size for string schema*/
598598
msg = MessageBuilder().setAllocatedContent(content, size).build();
599599
result = producer.send(msg);
600600
ASSERT_EQ(ResultOk, result);

pulsar-client-cpp/tests/ProducerTest.cc

+62
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ using namespace pulsar;
3434
static const std::string serviceUrl = "pulsar://localhost:6650";
3535
static const std::string adminUrl = "http://localhost:8080/";
3636

37+
// See the `maxMessageSize` config in test-conf/standalone-ssl.conf
38+
static constexpr size_t maxMessageSize = 1024000;
39+
3740
TEST(ProducerTest, producerNotInitialized) {
3841
Producer producer;
3942

@@ -211,6 +214,63 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
211214
client.close();
212215
}
213216

217+
class ProducerTest : public ::testing::TestWithParam<bool> {};
218+
219+
TEST_P(ProducerTest, testMaxMessageSize) {
220+
Client client(serviceUrl);
221+
222+
const std::string topic = "ProducerTest-NoBatchMaxMessageSize-" + std::to_string(time(nullptr));
223+
224+
Consumer consumer;
225+
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
226+
227+
Producer producer;
228+
ProducerConfiguration conf;
229+
conf.setBatchingEnabled(GetParam());
230+
ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
231+
232+
std::string msg = std::string(maxMessageSize / 2, 'a');
233+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build()));
234+
Message message;
235+
ASSERT_EQ(ResultOk, consumer.receive(message));
236+
ASSERT_EQ(msg, message.getDataAsString());
237+
238+
std::string orderKey = std::string(maxMessageSize, 'a');
239+
ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
240+
241+
ASSERT_EQ(ResultMessageTooBig,
242+
producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build()));
243+
244+
client.close();
245+
}
246+
247+
TEST_P(ProducerTest, testChunkingMaxMessageSize) {
248+
Client client(serviceUrl);
249+
250+
const std::string topic = "ProducerTest-ChunkingMaxMessageSize-" + std::to_string(time(nullptr));
251+
252+
Consumer consumer;
253+
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
254+
255+
Producer producer;
256+
ProducerConfiguration conf;
257+
conf.setBatchingEnabled(false);
258+
conf.setChunkingEnabled(true);
259+
ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
260+
261+
std::string orderKey = std::string(maxMessageSize, 'a');
262+
ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
263+
264+
std::string msg = std::string(2 * maxMessageSize + 10, 'b');
265+
Message message;
266+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build()));
267+
ASSERT_EQ(ResultOk, consumer.receive(message));
268+
ASSERT_EQ(msg, message.getDataAsString());
269+
ASSERT_LE(1L, message.getMessageId().entryId());
270+
271+
client.close();
272+
}
273+
214274
TEST(ProducerTest, testExclusiveProducer) {
215275
Client client(serviceUrl);
216276

@@ -234,3 +294,5 @@ TEST(ProducerTest, testExclusiveProducer) {
234294
producerConfiguration3.setProducerName("p-name-3");
235295
ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, producerConfiguration3, producer3));
236296
}
297+
298+
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

0 commit comments

Comments
 (0)