From 40ec88dd01172f64ade1c0f1430bedae32bef23c Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Thu, 28 Mar 2024 02:22:08 +0800 Subject: [PATCH 1/9] [fix][client] the nullValue in msgMetadata should be true by default --- .../client/api/ConsumerRedeliveryTest.java | 20 +++++++++++++++++++ .../client/impl/TypedMessageBuilderImpl.java | 3 ++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index 90114add25084..818823de64c0a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -424,4 +424,24 @@ public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType ackTy assertTrue(values.isEmpty()); } } + + @Test + public void testRedeliverMessagesWithoutValue() throws Exception { + String topic = "persistent://my-property/my-ns/testRedeliverMessagesWithoutValue"; + @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .enableRetry(true) + .subscribe(); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(true) + .create(); + for (int i = 0; i < 10; i++) { + producer.newMessage().key("messages without value").send(); + } + + Message message = consumer.receive(); + consumer.reconsumeLater(message, 2, TimeUnit.SECONDS); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 026f8a1e69e0b..1dd85fc5642b2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -62,6 +62,7 @@ public TypedMessageBuilderImpl(ProducerBase producer, this.schema = schema; this.content = EMPTY_CONTENT; this.txn = txn; + this.msgMetadata.setNullValue(true); } private long beforeSend() { @@ -141,9 +142,9 @@ public TypedMessageBuilder orderingKey(byte[] orderingKey) { @Override public TypedMessageBuilder value(T value) { if (value == null) { - msgMetadata.setNullValue(true); return this; } + msgMetadata.setNullValue(false); return getKeyValueSchema().map(keyValueSchema -> { if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { From d35dae4398d238e176bf8406a94cd09fdfc5eed7 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Thu, 28 Mar 2024 10:27:49 +0800 Subject: [PATCH 2/9] fix: avoiding the value method be called multiple times --- .../org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 1dd85fc5642b2..f41da55fae360 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -142,6 +142,7 @@ public TypedMessageBuilder orderingKey(byte[] orderingKey) { @Override public TypedMessageBuilder value(T value) { if (value == null) { + msgMetadata.setNullValue(true); return this; } msgMetadata.setNullValue(false); From af2a4bafab2e11208850366f566b476d19657a68 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Mon, 1 Apr 2024 22:50:37 +0800 Subject: [PATCH 3/9] test: 1. Setting isNullValue field will improve the entry size and 2. The default value of isNullValue is set to true will make the break changes for some usages. --- .../org/apache/pulsar/broker/admin/PersistentTopicsTest.java | 2 +- .../org/apache/pulsar/broker/stats/PrometheusMetricsTest.java | 2 +- .../test/java/org/apache/pulsar/compaction/CompactionTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 9a292175caa59..690ac0d965373 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1095,7 +1095,7 @@ public void testGetBacklogSizeByMessageId() throws Exception { completableFuture.get(); Assert.assertEquals(Optional.ofNullable( admin.topics().getBacklogSizeByMessageId(topicName + "-partition-0", MessageId.earliest)), - Optional.of(320L)); + Optional.of(350L)); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index d3891931496c5..64b1174fba36c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -1765,7 +1765,7 @@ public void testCompaction() throws Exception { assertEquals(cm.get(0).value, 10); cm = (List) metrics.get("pulsar_compaction_compacted_entries_size"); assertEquals(cm.size(), 1); - assertEquals(cm.get(0).value, 840); + assertEquals(cm.get(0).value, 870); pulsarClient.close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index f0010096b1e52..ddfb464de6386 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -1384,7 +1384,7 @@ public void testEmptyPayloadDeletesWhenEncrypted() throws Exception { Message message4 = consumer.receive(); Assert.assertEquals(message4.getKey(), "key2"); - Assert.assertEquals(new String(message4.getData()), ""); + assertNull(message4.getData()); Message message5 = consumer.receive(); Assert.assertEquals(message5.getKey(), "key4"); From 12f169564a7583203cd4f17dac540ac5b87262ab Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Sun, 7 Apr 2024 11:08:26 +0800 Subject: [PATCH 4/9] Revert "test:" This reverts commit af2a4bafab2e11208850366f566b476d19657a68. --- .../org/apache/pulsar/broker/admin/PersistentTopicsTest.java | 2 +- .../org/apache/pulsar/broker/stats/PrometheusMetricsTest.java | 2 +- .../test/java/org/apache/pulsar/compaction/CompactionTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 690ac0d965373..9a292175caa59 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1095,7 +1095,7 @@ public void testGetBacklogSizeByMessageId() throws Exception { completableFuture.get(); Assert.assertEquals(Optional.ofNullable( admin.topics().getBacklogSizeByMessageId(topicName + "-partition-0", MessageId.earliest)), - Optional.of(350L)); + Optional.of(320L)); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 64b1174fba36c..d3891931496c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -1765,7 +1765,7 @@ public void testCompaction() throws Exception { assertEquals(cm.get(0).value, 10); cm = (List) metrics.get("pulsar_compaction_compacted_entries_size"); assertEquals(cm.size(), 1); - assertEquals(cm.get(0).value, 870); + assertEquals(cm.get(0).value, 840); pulsarClient.close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index ddfb464de6386..f0010096b1e52 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -1384,7 +1384,7 @@ public void testEmptyPayloadDeletesWhenEncrypted() throws Exception { Message message4 = consumer.receive(); Assert.assertEquals(message4.getKey(), "key2"); - assertNull(message4.getData()); + Assert.assertEquals(new String(message4.getData()), ""); Message message5 = consumer.receive(); Assert.assertEquals(message5.getKey(), "key4"); From cd701ef5b8a075d0264bb767cc89ed236f95e5e9 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Sun, 7 Apr 2024 11:38:23 +0800 Subject: [PATCH 5/9] improve: modify metadata before sent --- .../client/impl/TypedMessageBuilderImpl.java | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index f41da55fae360..d90c2e8828364 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -50,6 +50,7 @@ public class TypedMessageBuilderImpl implements TypedMessageBuilder { private final transient Schema schema; private transient ByteBuffer content; private final transient TransactionImpl txn; + private transient T value; public TypedMessageBuilderImpl(ProducerBase producer, Schema schema) { this(producer, schema, null); @@ -62,10 +63,25 @@ public TypedMessageBuilderImpl(ProducerBase producer, this.schema = schema; this.content = EMPTY_CONTENT; this.txn = txn; - this.msgMetadata.setNullValue(true); } private long beforeSend() { + if (value == null) { + msgMetadata.setNullValue(true); + } else { + getKeyValueSchema().map(keyValueSchema -> { + if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { + setSeparateKeyValue(value, keyValueSchema); + return this; + } else { + return null; + } + }).orElseGet(() -> { + content = ByteBuffer.wrap(schema.encode(value)); + return this; + }); + } + if (txn == null) { return -1L; } @@ -141,23 +157,8 @@ public TypedMessageBuilder orderingKey(byte[] orderingKey) { @Override public TypedMessageBuilder value(T value) { - if (value == null) { - msgMetadata.setNullValue(true); - return this; - } - msgMetadata.setNullValue(false); - - return getKeyValueSchema().map(keyValueSchema -> { - if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { - setSeparateKeyValue(value, keyValueSchema); - return this; - } else { - return null; - } - }).orElseGet(() -> { - content = ByteBuffer.wrap(schema.encode(value)); - return this; - }); + this.value = value; + return this; } @Override From 0c6e76c8ac15b318608932e91fae164938ca4f03 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Sun, 7 Apr 2024 11:38:42 +0800 Subject: [PATCH 6/9] test: check null instead of "" --- .../test/java/org/apache/pulsar/compaction/CompactionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index f0010096b1e52..ddfb464de6386 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -1384,7 +1384,7 @@ public void testEmptyPayloadDeletesWhenEncrypted() throws Exception { Message message4 = consumer.receive(); Assert.assertEquals(message4.getKey(), "key2"); - Assert.assertEquals(new String(message4.getData()), ""); + assertNull(message4.getData()); Message message5 = consumer.receive(); Assert.assertEquals(message5.getKey(), "key4"); From 3fc9b3325d3b5aa5c1fd423c05accff3446f8c3b Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Sun, 7 Apr 2024 12:09:54 +0800 Subject: [PATCH 7/9] test: receive message from the retry topic --- .../org/apache/pulsar/client/api/ConsumerRedeliveryTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index 818823de64c0a..fcf1a638d5884 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -443,5 +443,9 @@ public void testRedeliverMessagesWithoutValue() throws Exception { Message message = consumer.receive(); consumer.reconsumeLater(message, 2, TimeUnit.SECONDS); + for (int i = 0; i < 9; i++) { + assertNotNull(consumer.receive(5, TimeUnit.SECONDS)); + } + assertTrue(consumer.receive(5, TimeUnit.SECONDS).getTopicName().contains("sub-RETRY")); } } From 8ebcb94189a6cc8e19083792e1132f30c897a381 Mon Sep 17 00:00:00 2001 From: xiangying Date: Fri, 2 Aug 2024 11:49:18 +0800 Subject: [PATCH 8/9] fix test --- .../org/apache/pulsar/client/impl/MessageChecksumTest.java | 5 +++++ .../pulsar/client/impl/TypedMessageBuilderImplTest.java | 7 ++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java index 0b25e3409563a..94e763847506b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java @@ -24,6 +24,8 @@ import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; + +import java.lang.reflect.Method; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -225,6 +227,9 @@ public void testTamperingMessageIsDetected() throws Exception { .create(); TypedMessageBuilderImpl msgBuilder = (TypedMessageBuilderImpl) producer.newMessage() .value("a message".getBytes()); + Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend"); + method.setAccessible(true); + method.invoke(msgBuilder); MessageMetadata msgMetadata = msgBuilder.getMetadataBuilder() .setProducerName("test") .setSequenceId(1) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java index 94c683e527177..f981780bcd25a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java @@ -27,6 +27,8 @@ import org.mockito.Mock; import org.testng.annotations.Test; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.Base64; @@ -45,7 +47,7 @@ public class TypedMessageBuilderImplTest { protected ProducerBase producerBase; @Test - public void testDefaultValue() { + public void testDefaultValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { producerBase = mock(ProducerBase.class); AvroSchema fooSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(SchemaTestUtils.Foo.class).build()); @@ -63,6 +65,9 @@ public void testDefaultValue() { // Check kv.encoding.type default, not set value TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue); + Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend"); + method.setAccessible(true); + method.invoke(typedMessageBuilder); ByteBuffer content = typedMessageBuilder.getContent(); byte[] contentByte = new byte[content.remaining()]; content.get(contentByte); From f8254330b6d51fe159b38bad23729d83aaa1ca2b Mon Sep 17 00:00:00 2001 From: xiangying Date: Sat, 3 Aug 2024 12:44:53 +0800 Subject: [PATCH 9/9] fix test --- .../client/impl/TypedMessageBuilderImplTest.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java index f981780bcd25a..05db4402a1586 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java @@ -78,7 +78,7 @@ public void testDefaultValue() throws NoSuchMethodException, InvocationTargetExc } @Test - public void testInlineValue() { + public void testInlineValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { producerBase = mock(ProducerBase.class); AvroSchema fooSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(SchemaTestUtils.Foo.class).build()); @@ -96,6 +96,9 @@ public void testInlineValue() { // Check kv.encoding.type INLINE TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue); + Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend"); + method.setAccessible(true); + method.invoke(typedMessageBuilder); ByteBuffer content = typedMessageBuilder.getContent(); byte[] contentByte = new byte[content.remaining()]; content.get(contentByte); @@ -106,7 +109,7 @@ public void testInlineValue() { } @Test - public void testSeparatedValue() { + public void testSeparatedValue() throws Exception { producerBase = mock(ProducerBase.class); AvroSchema fooSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(SchemaTestUtils.Foo.class).build()); @@ -124,6 +127,9 @@ public void testSeparatedValue() { // Check kv.encoding.type SEPARATED TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue); + Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend"); + method.setAccessible(true); + method.invoke(typedMessageBuilder); ByteBuffer content = typedMessageBuilder.getContent(); byte[] contentByte = new byte[content.remaining()]; content.get(contentByte);