Skip to content

Commit

Permalink
[PIP-132] Include message header size when check maxMessageSize for n…
Browse files Browse the repository at this point in the history
…on-batch message on the client side. (#14007)

Master Issue: #13591

### Motivation

See #13591

### Modifications

1. Add max message size check before sending request for non-batch and non-chunked messages.
2. Decrease chunk size by metadata size  for chunked messages.
  • Loading branch information
Jason918 authored Mar 8, 2022
1 parent 32c3cd1 commit 791853f
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;

import java.util.Optional;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -32,6 +31,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
Expand All @@ -41,6 +41,7 @@
import org.testng.annotations.Test;

@Test(groups = "broker")
@Slf4j
public class MaxMessageSizeTest {

PulsarService pulsar;
Expand All @@ -55,7 +56,7 @@ void setup() {
try {
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
ServerConfiguration conf = new ServerConfiguration();
conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024);
conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024 + 10 * 1024);
bkEnsemble.startStandalone(conf, false);

configuration = new ServiceConfiguration();
Expand All @@ -78,7 +79,8 @@ void setup() {
admin = PulsarAdmin.builder().serviceHttpUrl(url).build();
admin.clusters().createCluster("max_message_test", ClusterData.builder().serviceUrl(url).build());
admin.tenants()
.createTenant("test", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("max_message_test")));
.createTenant("test",
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("max_message_test")));
admin.namespaces().createNamespace("test/message", Sets.newHashSet("max_message_test"));
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -101,8 +103,8 @@ public void testMaxMessageSetting() throws PulsarClientException {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
String topicName = "persistent://test/message/topic1";
Producer producer = client.newProducer().topic(topicName).sendTimeout(60, TimeUnit.SECONDS).create();
Consumer consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();
Producer<byte[]> producer = client.newProducer().topic(topicName).sendTimeout(60, TimeUnit.SECONDS).create();
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();

// less than 5MB message

Expand Down Expand Up @@ -139,6 +141,14 @@ public void testMaxMessageSetting() throws PulsarClientException {
byte[] consumerNewNormalMsg = consumer.receive().getData();
Assert.assertEquals(newNormalMsg, consumerNewNormalMsg);

// 2MB metadata and 8 MB payload
try {
producer.newMessage().keyBytes(new byte[2 * 1024 * 1024]).value(newNormalMsg).send();
Assert.fail("Shouldn't send out this message");
} catch (PulsarClientException e) {
//no-op
}

// equals 10MB message
byte[] newLimitMsg = new byte[10 * 1024 * 1024];
try {
Expand All @@ -151,6 +161,87 @@ public void testMaxMessageSetting() throws PulsarClientException {
consumer.unsubscribe();
consumer.close();
producer.close();
}

@Test
public void testNonBatchingMaxMessageSize() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
String topicName = "persistent://test/message/testNonBatchingMaxMessageSize";
@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic(topicName)
.enableBatching(false)
.sendTimeout(30, TimeUnit.SECONDS).create();
@Cleanup
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();

byte[] data = new byte[8 * 1024 * 1024];
try {
producer.newMessage().value(data).send();
} catch (PulsarClientException e) {
Assert.fail("Shouldn't have exception at here", e);
}
Assert.assertEquals(consumer.receive().getData(), data);

// 1MB metadata and 8 MB payload
try {
producer.newMessage().property("P", new String(new byte[1024 * 1024])).value(data).send();
} catch (PulsarClientException e) {
Assert.fail("Shouldn't have exception at here", e);
}
Assert.assertEquals(consumer.receive().getData(), data);

// 2MB metadata and 8 MB payload, should fail.
try {
producer.newMessage().property("P", new String(new byte[2 * 1024 * 1024])).value(data).send();
Assert.fail("Shouldn't send out this message");
} catch (PulsarClientException e) {
//no-op
}
}

@Test
public void testChunkingMaxMessageSize() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
String topicName = "persistent://test/message/testChunkingMaxMessageSize";
@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic(topicName)
.enableBatching(false)
.enableChunking(true)
.sendTimeout(30, TimeUnit.SECONDS).create();
@Cleanup
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();

// 12 MB metadata, should fail
try {
producer.newMessage().orderingKey(new byte[12 * 1024 * 1024]).send();
Assert.fail("Shouldn't send out this message");
} catch (PulsarClientException e) {
//no-op
}

// 12 MB payload, there should be 2 chunks
byte[] data = new byte[12 * 1024 * 1024];
try {
producer.newMessage().value(data).send();
} catch (PulsarClientException e) {
Assert.fail("Shouldn't have exception at here", e);
}
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) consumer.receive();
Assert.assertEquals(msg.getData(), data);
Assert.assertEquals(msg.getMessageBuilder().getNumChunksFromMsg(), 2);

// 5MB metadata and 12 MB payload, there should be 3 chunks
try {
producer.newMessage().property("P", new String(new byte[5 * 1024 * 1024])).value(data).send();
} catch (PulsarClientException e) {
Assert.fail("Shouldn't have exception at here", e);
}
msg = (MessageImpl<byte[]>) consumer.receive();
Assert.assertEquals(msg.getData(), data);
Assert.assertEquals(msg.getMessageBuilder().getNumChunksFromMsg(), 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testInvalidConfig() throws Exception {
public void testLargeMessage(boolean ackReceiptEnabled) throws Exception {

log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
this.conf.setMaxMessageSize(50);
final int totalMessages = 5;
final String topicName = "persistent://my-property/my-ns/my-topic1";

Expand All @@ -125,7 +125,7 @@ public void testLargeMessage(boolean ackReceiptEnabled) throws Exception {

List<String> publishedMessages = Lists.newArrayList();
for (int i = 0; i < totalMessages; i++) {
String message = createMessagePayload(i * 10);
String message = createMessagePayload(i * 100);
publishedMessages.add(message);
producer.send(message.getBytes());
}
Expand Down Expand Up @@ -171,7 +171,7 @@ public void testLargeMessage(boolean ackReceiptEnabled) throws Exception {

@Test
public void testChunkingWithOrderingKey() throws Exception {
this.conf.setMaxMessageSize(5);
this.conf.setMaxMessageSize(100);

final String topicName = "persistent://my-property/my-ns/testChunkingWithOrderingKey";

Expand All @@ -183,8 +183,8 @@ public void testChunkingWithOrderingKey() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableChunking(true)
.enableBatching(false).create();

byte[] data = RandomUtils.nextBytes(20);
byte[] ok = RandomUtils.nextBytes(10);
byte[] data = RandomUtils.nextBytes(200);
byte[] ok = RandomUtils.nextBytes(50);
producer.newMessage().value(data).orderingKey(ok).send();

Message<byte[]> msg = consumer.receive();
Expand All @@ -196,7 +196,7 @@ public void testChunkingWithOrderingKey() throws Exception {
public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Exception {

log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
this.conf.setMaxMessageSize(50);
final int totalMessages = 5;
final String topicName = "persistent://my-property/my-ns/my-topic1";

Expand All @@ -213,7 +213,7 @@ public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Excepti

List<String> publishedMessages = Lists.newArrayList();
for (int i = 0; i < totalMessages; i++) {
String message = createMessagePayload(i * 10);
String message = createMessagePayload(i * 100);
publishedMessages.add(message);
producer.send(message.getBytes());
}
Expand Down Expand Up @@ -263,7 +263,7 @@ public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Excepti
@Test
public void testPublishWithFailure() throws Exception {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
this.conf.setMaxMessageSize(50);
final String topicName = "persistent://my-property/my-ns/my-topic1";

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
Expand All @@ -274,7 +274,7 @@ public void testPublishWithFailure() throws Exception {
stopBroker();

try {
producer.send(createMessagePayload(100).getBytes());
producer.send(createMessagePayload(1000).getBytes());
fail("should have failed with timeout exception");
} catch (PulsarClientException.TimeoutException e) {
// Ok
Expand Down Expand Up @@ -403,7 +403,7 @@ public void testExpireIncompleteChunkMessage() throws Exception{
public void testChunksEnqueueFailed() throws Exception {
final String topicName = "persistent://my-property/my-ns/test-chunks-enqueue-failed";
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
this.conf.setMaxMessageSize(50);

final MemoryLimitController controller = ((PulsarClientImpl) pulsarClient).getMemoryLimitController();
assertEquals(controller.currentUsage(), 0);
Expand All @@ -423,7 +423,7 @@ public void testChunksEnqueueFailed() throws Exception {
assertEquals(semaphore.availablePermits(), maxPendingMessages);
producer.send(createMessagePayload(1).getBytes());
try {
producer.send(createMessagePayload(100).getBytes(StandardCharsets.UTF_8));
producer.send(createMessagePayload(1000).getBytes(StandardCharsets.UTF_8));
fail("It should fail with ProducerQueueIsFullError");
} catch (PulsarClientException e) {
assertTrue(e instanceof PulsarClientException.ProducerQueueIsFullError);
Expand All @@ -440,7 +440,7 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
@Test
public void testSeekChunkMessages() throws PulsarClientException {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
this.conf.setMaxMessageSize(50);
final int totalMessages = 5;
final String topicName = "persistent://my-property/my-ns/test-seek-chunk";

Expand All @@ -463,7 +463,7 @@ public void testSeekChunkMessages() throws PulsarClientException {
.subscribe();

for (int i = 0; i < totalMessages; i++) {
String message = createMessagePayload(10);
String message = createMessagePayload(100);
producer.send(message.getBytes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.MathUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
Expand Down Expand Up @@ -470,9 +471,30 @@ public void sendAsync(Message<?> message, SendCallback callback) {
}

// send in chunks
int totalChunks = canAddToBatch(msg) ? 1
: Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()
+ (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
int totalChunks;
int payloadChunkSize;
if (canAddToBatch(msg) || !conf.isChunkingEnabled()) {
totalChunks = 1;
payloadChunkSize = ClientCnx.getMaxMessageSize();
} else {
// Reserve current metadata size for chunk size to avoid message size overflow.
// NOTE: this is not strictly bounded, as metadata will be updated after chunking.
// So there is a small chance that the final message size is larger than ClientCnx.getMaxMessageSize().
// But it won't cause produce failure as broker have 10 KB padding space for these cases.
payloadChunkSize = ClientCnx.getMaxMessageSize() - msgMetadata.getSerializedSize();
if (payloadChunkSize <= 0) {
PulsarClientException.InvalidMessageException invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a message with %d bytes metadata that "
+ "exceeds %d bytes", producerName, topic,
msgMetadata.getSerializedSize(), ClientCnx.getMaxMessageSize()));
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
compressedPayload.release();
return;
}
totalChunks = MathUtils.ceilDiv(Math.max(1, compressedPayload.readableBytes()), payloadChunkSize);
}

// chunked message also sent individually so, try to acquire send-permits
for (int i = 0; i < (totalChunks - 1); i++) {
if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
Expand Down Expand Up @@ -512,9 +534,9 @@ public void sendAsync(Message<?> message, SendCallback callback) {
}
}
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
readStartIndex, payloadChunkSize, compressedPayload, compressed,
compressedPayload.readableBytes(), uncompressedSize, callback, chunkedMessageCtx);
readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());
readStartIndex = ((chunkId + 1) * payloadChunkSize);
}
}
} catch (PulsarClientException e) {
Expand Down Expand Up @@ -1401,6 +1423,19 @@ void setMessageId(ChunkMessageIdImpl chunkMessageId) {
}
}

public int getMessageHeaderAndPayloadSize() {
if (cmd == null) {
return 0;
}
ByteBuf cmdHeader = cmd.getFirst();
cmdHeader.markReaderIndex();
int totalSize = cmdHeader.readInt();
int cmdSize = cmdHeader.readInt();
int msgHeadersAndPayloadSize = totalSize - cmdSize - 4;
cmdHeader.resetReaderIndex();
return msgHeadersAndPayloadSize;
}

private OpSendMsg(Handle<OpSendMsg> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
Expand Down Expand Up @@ -1989,6 +2024,9 @@ protected void processOpSendMsg(OpSendMsg op) {
if (op.msg != null && isBatchMessagingEnabled()) {
batchMessageAndSend();
}
if (isMessageSizeExceeded(op)) {
return;
}
pendingMessages.add(op);
if (op.msg != null) {
LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this,
Expand Down Expand Up @@ -2056,6 +2094,9 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
if (op.cmd == null) {
checkState(op.rePopulate != null);
op.rePopulate.run();
if (isMessageSizeExceeded(op)) {
continue;
}
}
if (stripChecksum) {
stripChecksum(op);
Expand All @@ -2081,6 +2122,24 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
}
}

/**
* Check if final message size for non-batch and non-chunked messages is larger than max message size.
*/
public boolean isMessageSizeExceeded(OpSendMsg op) {
if (op.msg != null && op.totalChunks <= 1) {
int messageSize = op.getMessageHeaderAndPayloadSize();
if (messageSize > ClientCnx.getMaxMessageSize()) {
releaseSemaphoreForSendOp(op);
op.sendComplete(new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a message with %d bytes that exceeds %d bytes",
producerName, topic, messageSize, ClientCnx.getMaxMessageSize()),
op.sequenceId));
return true;
}
}
return false;
}

public long getDelayInMillis() {
OpSendMsg firstMsg = pendingMessages.peek();
if (firstMsg != null) {
Expand Down
Loading

0 comments on commit 791853f

Please sign in to comment.