diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index 90114add25084..fcf1a638d5884 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -424,4 +424,28 @@ public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType ackTy assertTrue(values.isEmpty()); } } + + @Test + public void testRedeliverMessagesWithoutValue() throws Exception { + String topic = "persistent://my-property/my-ns/testRedeliverMessagesWithoutValue"; + @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .enableRetry(true) + .subscribe(); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(true) + .create(); + for (int i = 0; i < 10; i++) { + producer.newMessage().key("messages without value").send(); + } + + Message message = consumer.receive(); + consumer.reconsumeLater(message, 2, TimeUnit.SECONDS); + for (int i = 0; i < 9; i++) { + assertNotNull(consumer.receive(5, TimeUnit.SECONDS)); + } + assertTrue(consumer.receive(5, TimeUnit.SECONDS).getTopicName().contains("sub-RETRY")); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java index 0b25e3409563a..94e763847506b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java @@ -24,6 +24,8 @@ import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; + +import java.lang.reflect.Method; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -225,6 +227,9 @@ public void testTamperingMessageIsDetected() throws Exception { .create(); TypedMessageBuilderImpl msgBuilder = (TypedMessageBuilderImpl) producer.newMessage() .value("a message".getBytes()); + Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend"); + method.setAccessible(true); + method.invoke(msgBuilder); MessageMetadata msgMetadata = msgBuilder.getMetadataBuilder() .setProducerName("test") .setSequenceId(1) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 081831b0300e0..0cf32859e3dd6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -1384,7 +1384,7 @@ public void testEmptyPayloadDeletesWhenEncrypted() throws Exception { Message message4 = consumer.receive(); Assert.assertEquals(message4.getKey(), "key2"); - Assert.assertEquals(new String(message4.getData()), ""); + assertNull(message4.getData()); Message message5 = consumer.receive(); Assert.assertEquals(message5.getKey(), "key4"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 026f8a1e69e0b..d90c2e8828364 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -50,6 +50,7 @@ public class TypedMessageBuilderImpl implements TypedMessageBuilder { private final transient Schema schema; private transient ByteBuffer content; private final transient TransactionImpl txn; + private transient T value; public TypedMessageBuilderImpl(ProducerBase producer, Schema schema) { this(producer, schema, null); @@ -65,6 +66,22 @@ public TypedMessageBuilderImpl(ProducerBase producer, } private long beforeSend() { + if (value == null) { + msgMetadata.setNullValue(true); + } else { + getKeyValueSchema().map(keyValueSchema -> { + if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { + setSeparateKeyValue(value, keyValueSchema); + return this; + } else { + return null; + } + }).orElseGet(() -> { + content = ByteBuffer.wrap(schema.encode(value)); + return this; + }); + } + if (txn == null) { return -1L; } @@ -140,22 +157,8 @@ public TypedMessageBuilder orderingKey(byte[] orderingKey) { @Override public TypedMessageBuilder value(T value) { - if (value == null) { - msgMetadata.setNullValue(true); - return this; - } - - return getKeyValueSchema().map(keyValueSchema -> { - if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { - setSeparateKeyValue(value, keyValueSchema); - return this; - } else { - return null; - } - }).orElseGet(() -> { - content = ByteBuffer.wrap(schema.encode(value)); - return this; - }); + this.value = value; + return this; } @Override diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java index 94c683e527177..05db4402a1586 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java @@ -27,6 +27,8 @@ import org.mockito.Mock; import org.testng.annotations.Test; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.Base64; @@ -45,7 +47,7 @@ public class TypedMessageBuilderImplTest { protected ProducerBase producerBase; @Test - public void testDefaultValue() { + public void testDefaultValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { producerBase = mock(ProducerBase.class); AvroSchema fooSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(SchemaTestUtils.Foo.class).build()); @@ -63,6 +65,9 @@ public void testDefaultValue() { // Check kv.encoding.type default, not set value TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue); + Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend"); + method.setAccessible(true); + method.invoke(typedMessageBuilder); ByteBuffer content = typedMessageBuilder.getContent(); byte[] contentByte = new byte[content.remaining()]; content.get(contentByte); @@ -73,7 +78,7 @@ public void testDefaultValue() { } @Test - public void testInlineValue() { + public void testInlineValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { producerBase = mock(ProducerBase.class); AvroSchema fooSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(SchemaTestUtils.Foo.class).build()); @@ -91,6 +96,9 @@ public void testInlineValue() { // Check kv.encoding.type INLINE TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue); + Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend"); + method.setAccessible(true); + method.invoke(typedMessageBuilder); ByteBuffer content = typedMessageBuilder.getContent(); byte[] contentByte = new byte[content.remaining()]; content.get(contentByte); @@ -101,7 +109,7 @@ public void testInlineValue() { } @Test - public void testSeparatedValue() { + public void testSeparatedValue() throws Exception { producerBase = mock(ProducerBase.class); AvroSchema fooSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(SchemaTestUtils.Foo.class).build()); @@ -119,6 +127,9 @@ public void testSeparatedValue() { // Check kv.encoding.type SEPARATED TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue); + Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend"); + method.setAccessible(true); + method.invoke(typedMessageBuilder); ByteBuffer content = typedMessageBuilder.getContent(); byte[] contentByte = new byte[content.remaining()]; content.get(contentByte);