diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index 88215b4ffa601..95d1c739db3cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; import lombok.Cleanup; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.ProducerConsumerBase; @@ -29,13 +31,11 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; - import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +55,39 @@ public void cleanup() throws Exception { super.internalCleanup(); } + @Test(timeOut = 10_000) + public void testProducerSemaphoreInvalidMessage() throws Exception { + final int pendingQueueSize = 100; + + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic("testProducerSemaphoreAcquire") + .maxPendingMessages(pendingQueueSize) + .enableBatching(true) + .create(); + + this.stopBroker(); + + Field maxMessageSizeFiled = ClientCnx.class.getDeclaredField("maxMessageSize"); + maxMessageSizeFiled.setAccessible(true); + maxMessageSizeFiled.set(null, 2); + + try { + producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); + Assert.fail("can not reach here"); + } catch (PulsarClientException.InvalidMessageException ex) { + Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize); + } + + producer.conf.setBatchingEnabled(false); + try { + producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); + Assert.fail("can not reach here"); + } catch (PulsarClientException.InvalidMessageException ex) { + Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize); + } + } + @Test(timeOut = 30000) public void testProducerSemaphoreAcquireAndRelease() throws PulsarClientException, ExecutionException, InterruptedException { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 15bac6d03620c..955eafa6871b9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -182,6 +182,7 @@ public boolean isMultiBatches() { public OpSendMsg createOpSendMsg() throws IOException { ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { + producer.getSemaphore().release(messages.size()); discard(new PulsarClientException.InvalidMessageException( "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes")); return null;