diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java index d51fe0ad26f73..278e8b82c03e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java @@ -19,11 +19,10 @@ package org.apache.pulsar.broker.service; import com.google.common.collect.Sets; - import java.util.Optional; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -32,6 +31,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; @@ -41,6 +41,7 @@ import org.testng.annotations.Test; @Test(groups = "broker") +@Slf4j public class MaxMessageSizeTest { PulsarService pulsar; @@ -55,7 +56,7 @@ void setup() { try { bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); ServerConfiguration conf = new ServerConfiguration(); - conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024); + conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024 + 10 * 1024); bkEnsemble.startStandalone(conf, false); configuration = new ServiceConfiguration(); @@ -78,7 +79,8 @@ void setup() { admin = PulsarAdmin.builder().serviceHttpUrl(url).build(); admin.clusters().createCluster("max_message_test", ClusterData.builder().serviceUrl(url).build()); admin.tenants() - .createTenant("test", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("max_message_test"))); + .createTenant("test", + new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("max_message_test"))); admin.namespaces().createNamespace("test/message", Sets.newHashSet("max_message_test")); } catch (Exception e) { e.printStackTrace(); @@ -101,8 +103,8 @@ public void testMaxMessageSetting() throws PulsarClientException { @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); String topicName = "persistent://test/message/topic1"; - Producer producer = client.newProducer().topic(topicName).sendTimeout(60, TimeUnit.SECONDS).create(); - Consumer consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe(); + Producer producer = client.newProducer().topic(topicName).sendTimeout(60, TimeUnit.SECONDS).create(); + Consumer consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe(); // less than 5MB message @@ -139,6 +141,14 @@ public void testMaxMessageSetting() throws PulsarClientException { byte[] consumerNewNormalMsg = consumer.receive().getData(); Assert.assertEquals(newNormalMsg, consumerNewNormalMsg); + // 2MB metadata and 8 MB payload + try { + producer.newMessage().keyBytes(new byte[2 * 1024 * 1024]).value(newNormalMsg).send(); + Assert.fail("Shouldn't send out this message"); + } catch (PulsarClientException e) { + //no-op + } + // equals 10MB message byte[] newLimitMsg = new byte[10 * 1024 * 1024]; try { @@ -151,6 +161,87 @@ public void testMaxMessageSetting() throws PulsarClientException { consumer.unsubscribe(); consumer.close(); producer.close(); + } + + @Test + public void testNonBatchingMaxMessageSize() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + String topicName = "persistent://test/message/testNonBatchingMaxMessageSize"; + @Cleanup + Producer producer = client.newProducer() + .topic(topicName) + .enableBatching(false) + .sendTimeout(30, TimeUnit.SECONDS).create(); + @Cleanup + Consumer consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe(); + + byte[] data = new byte[8 * 1024 * 1024]; + try { + producer.newMessage().value(data).send(); + } catch (PulsarClientException e) { + Assert.fail("Shouldn't have exception at here", e); + } + Assert.assertEquals(consumer.receive().getData(), data); + + // 1MB metadata and 8 MB payload + try { + producer.newMessage().property("P", new String(new byte[1024 * 1024])).value(data).send(); + } catch (PulsarClientException e) { + Assert.fail("Shouldn't have exception at here", e); + } + Assert.assertEquals(consumer.receive().getData(), data); + + // 2MB metadata and 8 MB payload, should fail. + try { + producer.newMessage().property("P", new String(new byte[2 * 1024 * 1024])).value(data).send(); + Assert.fail("Shouldn't send out this message"); + } catch (PulsarClientException e) { + //no-op + } + } + @Test + public void testChunkingMaxMessageSize() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + String topicName = "persistent://test/message/testChunkingMaxMessageSize"; + @Cleanup + Producer producer = client.newProducer() + .topic(topicName) + .enableBatching(false) + .enableChunking(true) + .sendTimeout(30, TimeUnit.SECONDS).create(); + @Cleanup + Consumer consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe(); + + // 12 MB metadata, should fail + try { + producer.newMessage().orderingKey(new byte[12 * 1024 * 1024]).send(); + Assert.fail("Shouldn't send out this message"); + } catch (PulsarClientException e) { + //no-op + } + + // 12 MB payload, there should be 2 chunks + byte[] data = new byte[12 * 1024 * 1024]; + try { + producer.newMessage().value(data).send(); + } catch (PulsarClientException e) { + Assert.fail("Shouldn't have exception at here", e); + } + MessageImpl msg = (MessageImpl) consumer.receive(); + Assert.assertEquals(msg.getData(), data); + Assert.assertEquals(msg.getMessageBuilder().getNumChunksFromMsg(), 2); + + // 5MB metadata and 12 MB payload, there should be 3 chunks + try { + producer.newMessage().property("P", new String(new byte[5 * 1024 * 1024])).value(data).send(); + } catch (PulsarClientException e) { + Assert.fail("Shouldn't have exception at here", e); + } + msg = (MessageImpl) consumer.receive(); + Assert.assertEquals(msg.getData(), data); + Assert.assertEquals(msg.getMessageBuilder().getNumChunksFromMsg(), 3); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index 3504e263d515f..726dc1a2cb2f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -108,7 +108,7 @@ public void testInvalidConfig() throws Exception { public void testLargeMessage(boolean ackReceiptEnabled) throws Exception { log.info("-- Starting {} test --", methodName); - this.conf.setMaxMessageSize(5); + this.conf.setMaxMessageSize(50); final int totalMessages = 5; final String topicName = "persistent://my-property/my-ns/my-topic1"; @@ -125,7 +125,7 @@ public void testLargeMessage(boolean ackReceiptEnabled) throws Exception { List publishedMessages = Lists.newArrayList(); for (int i = 0; i < totalMessages; i++) { - String message = createMessagePayload(i * 10); + String message = createMessagePayload(i * 100); publishedMessages.add(message); producer.send(message.getBytes()); } @@ -171,7 +171,7 @@ public void testLargeMessage(boolean ackReceiptEnabled) throws Exception { @Test public void testChunkingWithOrderingKey() throws Exception { - this.conf.setMaxMessageSize(5); + this.conf.setMaxMessageSize(100); final String topicName = "persistent://my-property/my-ns/testChunkingWithOrderingKey"; @@ -183,8 +183,8 @@ public void testChunkingWithOrderingKey() throws Exception { Producer producer = pulsarClient.newProducer().topic(topicName).enableChunking(true) .enableBatching(false).create(); - byte[] data = RandomUtils.nextBytes(20); - byte[] ok = RandomUtils.nextBytes(10); + byte[] data = RandomUtils.nextBytes(200); + byte[] ok = RandomUtils.nextBytes(50); producer.newMessage().value(data).orderingKey(ok).send(); Message msg = consumer.receive(); @@ -196,7 +196,7 @@ public void testChunkingWithOrderingKey() throws Exception { public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Exception { log.info("-- Starting {} test --", methodName); - this.conf.setMaxMessageSize(5); + this.conf.setMaxMessageSize(50); final int totalMessages = 5; final String topicName = "persistent://my-property/my-ns/my-topic1"; @@ -213,7 +213,7 @@ public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Excepti List publishedMessages = Lists.newArrayList(); for (int i = 0; i < totalMessages; i++) { - String message = createMessagePayload(i * 10); + String message = createMessagePayload(i * 100); publishedMessages.add(message); producer.send(message.getBytes()); } @@ -263,7 +263,7 @@ public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Excepti @Test public void testPublishWithFailure() throws Exception { log.info("-- Starting {} test --", methodName); - this.conf.setMaxMessageSize(5); + this.conf.setMaxMessageSize(50); final String topicName = "persistent://my-property/my-ns/my-topic1"; ProducerBuilder producerBuilder = pulsarClient.newProducer().topic(topicName); @@ -274,7 +274,7 @@ public void testPublishWithFailure() throws Exception { stopBroker(); try { - producer.send(createMessagePayload(100).getBytes()); + producer.send(createMessagePayload(1000).getBytes()); fail("should have failed with timeout exception"); } catch (PulsarClientException.TimeoutException e) { // Ok @@ -403,7 +403,7 @@ public void testExpireIncompleteChunkMessage() throws Exception{ public void testChunksEnqueueFailed() throws Exception { final String topicName = "persistent://my-property/my-ns/test-chunks-enqueue-failed"; log.info("-- Starting {} test --", methodName); - this.conf.setMaxMessageSize(5); + this.conf.setMaxMessageSize(50); final MemoryLimitController controller = ((PulsarClientImpl) pulsarClient).getMemoryLimitController(); assertEquals(controller.currentUsage(), 0); @@ -423,7 +423,7 @@ public void testChunksEnqueueFailed() throws Exception { assertEquals(semaphore.availablePermits(), maxPendingMessages); producer.send(createMessagePayload(1).getBytes()); try { - producer.send(createMessagePayload(100).getBytes(StandardCharsets.UTF_8)); + producer.send(createMessagePayload(1000).getBytes(StandardCharsets.UTF_8)); fail("It should fail with ProducerQueueIsFullError"); } catch (PulsarClientException e) { assertTrue(e instanceof PulsarClientException.ProducerQueueIsFullError); @@ -440,7 +440,7 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { @Test public void testSeekChunkMessages() throws PulsarClientException { log.info("-- Starting {} test --", methodName); - this.conf.setMaxMessageSize(5); + this.conf.setMaxMessageSize(50); final int totalMessages = 5; final String topicName = "persistent://my-property/my-ns/test-seek-chunk"; @@ -463,7 +463,7 @@ public void testSeekChunkMessages() throws PulsarClientException { .subscribe(); for (int i = 0; i < totalMessages; i++) { - String message = createMessagePayload(10); + String message = createMessagePayload(100); producer.send(message.getBytes()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 573e17a88f8f5..67f6011eb6315 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -76,6 +76,7 @@ import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.transaction.TransactionImpl; +import org.apache.pulsar.client.util.MathUtils; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProtocolVersion; @@ -470,9 +471,30 @@ public void sendAsync(Message message, SendCallback callback) { } // send in chunks - int totalChunks = canAddToBatch(msg) ? 1 - : Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize() - + (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1); + int totalChunks; + int payloadChunkSize; + if (canAddToBatch(msg) || !conf.isChunkingEnabled()) { + totalChunks = 1; + payloadChunkSize = ClientCnx.getMaxMessageSize(); + } else { + // Reserve current metadata size for chunk size to avoid message size overflow. + // NOTE: this is not strictly bounded, as metadata will be updated after chunking. + // So there is a small chance that the final message size is larger than ClientCnx.getMaxMessageSize(). + // But it won't cause produce failure as broker have 10 KB padding space for these cases. + payloadChunkSize = ClientCnx.getMaxMessageSize() - msgMetadata.getSerializedSize(); + if (payloadChunkSize <= 0) { + PulsarClientException.InvalidMessageException invalidMessageException = + new PulsarClientException.InvalidMessageException( + format("The producer %s of the topic %s sends a message with %d bytes metadata that " + + "exceeds %d bytes", producerName, topic, + msgMetadata.getSerializedSize(), ClientCnx.getMaxMessageSize())); + completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); + compressedPayload.release(); + return; + } + totalChunks = MathUtils.ceilDiv(Math.max(1, compressedPayload.readableBytes()), payloadChunkSize); + } + // chunked message also sent individually so, try to acquire send-permits for (int i = 0; i < (totalChunks - 1); i++) { if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) { @@ -512,9 +534,9 @@ public void sendAsync(Message message, SendCallback callback) { } } serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks, - readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed, + readStartIndex, payloadChunkSize, compressedPayload, compressed, compressedPayload.readableBytes(), uncompressedSize, callback, chunkedMessageCtx); - readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize()); + readStartIndex = ((chunkId + 1) * payloadChunkSize); } } } catch (PulsarClientException e) { @@ -1401,6 +1423,19 @@ void setMessageId(ChunkMessageIdImpl chunkMessageId) { } } + public int getMessageHeaderAndPayloadSize() { + if (cmd == null) { + return 0; + } + ByteBuf cmdHeader = cmd.getFirst(); + cmdHeader.markReaderIndex(); + int totalSize = cmdHeader.readInt(); + int cmdSize = cmdHeader.readInt(); + int msgHeadersAndPayloadSize = totalSize - cmdSize - 4; + cmdHeader.resetReaderIndex(); + return msgHeadersAndPayloadSize; + } + private OpSendMsg(Handle recyclerHandle) { this.recyclerHandle = recyclerHandle; } @@ -1989,6 +2024,9 @@ protected void processOpSendMsg(OpSendMsg op) { if (op.msg != null && isBatchMessagingEnabled()) { batchMessageAndSend(); } + if (isMessageSizeExceeded(op)) { + return; + } pendingMessages.add(op); if (op.msg != null) { LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this, @@ -2056,6 +2094,9 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e if (op.cmd == null) { checkState(op.rePopulate != null); op.rePopulate.run(); + if (isMessageSizeExceeded(op)) { + continue; + } } if (stripChecksum) { stripChecksum(op); @@ -2081,6 +2122,24 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e } } + /** + * Check if final message size for non-batch and non-chunked messages is larger than max message size. + */ + public boolean isMessageSizeExceeded(OpSendMsg op) { + if (op.msg != null && op.totalChunks <= 1) { + int messageSize = op.getMessageHeaderAndPayloadSize(); + if (messageSize > ClientCnx.getMaxMessageSize()) { + releaseSemaphoreForSendOp(op); + op.sendComplete(new PulsarClientException.InvalidMessageException( + format("The producer %s of the topic %s sends a message with %d bytes that exceeds %d bytes", + producerName, topic, messageSize, ClientCnx.getMaxMessageSize()), + op.sequenceId)); + return true; + } + } + return false; + } + public long getDelayInMillis() { OpSendMsg firstMsg = pendingMessages.peek(); if (firstMsg != null) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/MathUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MathUtils.java index 6763fec9586d4..11297ec18cfb0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/MathUtils.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MathUtils.java @@ -38,4 +38,15 @@ public static int signSafeMod(long dividend, int divisor) { return mod; } + + /** + * Ceil version of Math.floorDiv(). + * @param x the dividend + * @param y the divisor + * @return the smallest value that is larger than or equal to the algebraic quotient. + * + */ + public static int ceilDiv(int x, int y) { + return -Math.floorDiv(-x, y); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/util/MathUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/util/MathUtilsTest.java new file mode 100644 index 0000000000000..f9eb6bebc62c2 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/util/MathUtilsTest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.util; + + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class MathUtilsTest { + + @Test + public void testCeilDiv() { + Assert.assertEquals(MathUtils.ceilDiv(0, 1024), 0); + Assert.assertEquals(MathUtils.ceilDiv(1, 1024), 1); + Assert.assertEquals(MathUtils.ceilDiv(1023, 1024), 1); + Assert.assertEquals(MathUtils.ceilDiv(1024, 1024), 1); + Assert.assertEquals(MathUtils.ceilDiv(1025, 1024), 2); + + Assert.assertEquals(MathUtils.ceilDiv(0, Integer.MAX_VALUE), 0); + Assert.assertEquals(MathUtils.ceilDiv(1, Integer.MAX_VALUE), 1); + Assert.assertEquals(MathUtils.ceilDiv(Integer.MAX_VALUE - 1, Integer.MAX_VALUE), 1); + Assert.assertEquals(MathUtils.ceilDiv(Integer.MAX_VALUE, Integer.MAX_VALUE), 1); + } +} \ No newline at end of file