Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] the nullValue in msgMetadata should be true by default #22372

Merged
merged 10 commits into from
Aug 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -424,4 +424,28 @@ public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType ackTy
assertTrue(values.isEmpty());
}
}

@Test
public void testRedeliverMessagesWithoutValue() throws Exception {
String topic = "persistent://my-property/my-ns/testRedeliverMessagesWithoutValue";
@Cleanup Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("sub")
.enableRetry(true)
.subscribe();
@Cleanup Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(true)
.create();
for (int i = 0; i < 10; i++) {
producer.newMessage().key("messages without value").send();
}

Message<Integer> message = consumer.receive();
consumer.reconsumeLater(message, 2, TimeUnit.SECONDS);
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < 9; i++) {
assertNotNull(consumer.receive(5, TimeUnit.SECONDS));
}
assertTrue(consumer.receive(5, TimeUnit.SECONDS).getTopicName().contains("sub-RETRY"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -225,6 +227,9 @@ public void testTamperingMessageIsDetected() throws Exception {
.create();
TypedMessageBuilderImpl<byte[]> msgBuilder = (TypedMessageBuilderImpl<byte[]>) producer.newMessage()
.value("a message".getBytes());
Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
method.setAccessible(true);
method.invoke(msgBuilder);
MessageMetadata msgMetadata = msgBuilder.getMetadataBuilder()
.setProducerName("test")
.setSequenceId(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,7 @@ public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {

Message<byte[]> message4 = consumer.receive();
Assert.assertEquals(message4.getKey(), "key2");
Assert.assertEquals(new String(message4.getData()), "");
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
assertNull(message4.getData());

Message<byte[]> message5 = consumer.receive();
Assert.assertEquals(message5.getKey(), "key4");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
private final transient Schema<T> schema;
private transient ByteBuffer content;
private final transient TransactionImpl txn;
private transient T value;

public TypedMessageBuilderImpl(ProducerBase<?> producer, Schema<T> schema) {
this(producer, schema, null);
Expand All @@ -65,6 +66,22 @@ public TypedMessageBuilderImpl(ProducerBase<?> producer,
}

private long beforeSend() {
if (value == null) {
msgMetadata.setNullValue(true);
} else {
getKeyValueSchema().map(keyValueSchema -> {
if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
setSeparateKeyValue(value, keyValueSchema);
return this;
} else {
return null;
}
}).orElseGet(() -> {
content = ByteBuffer.wrap(schema.encode(value));
return this;
});
}

if (txn == null) {
return -1L;
}
Expand Down Expand Up @@ -140,22 +157,8 @@ public TypedMessageBuilder<T> orderingKey(byte[] orderingKey) {

@Override
public TypedMessageBuilder<T> value(T value) {
if (value == null) {
msgMetadata.setNullValue(true);
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

return getKeyValueSchema().map(keyValueSchema -> {
if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
setSeparateKeyValue(value, keyValueSchema);
return this;
} else {
return null;
}
}).orElseGet(() -> {
content = ByteBuffer.wrap(schema.encode(value));
return this;
});
this.value = value;
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.mockito.Mock;
import org.testng.annotations.Test;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Base64;

Expand All @@ -45,7 +47,7 @@ public class TypedMessageBuilderImplTest {
protected ProducerBase producerBase;

@Test
public void testDefaultValue() {
public void testDefaultValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
producerBase = mock(ProducerBase.class);

AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
Expand All @@ -63,6 +65,9 @@ public void testDefaultValue() {

// Check kv.encoding.type default, not set value
TypedMessageBuilderImpl<KeyValue> typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
method.setAccessible(true);
method.invoke(typedMessageBuilder);
ByteBuffer content = typedMessageBuilder.getContent();
byte[] contentByte = new byte[content.remaining()];
content.get(contentByte);
Expand All @@ -73,7 +78,7 @@ public void testDefaultValue() {
}

@Test
public void testInlineValue() {
public void testInlineValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
producerBase = mock(ProducerBase.class);

AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
Expand All @@ -91,6 +96,9 @@ public void testInlineValue() {

// Check kv.encoding.type INLINE
TypedMessageBuilderImpl<KeyValue> typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
method.setAccessible(true);
method.invoke(typedMessageBuilder);
ByteBuffer content = typedMessageBuilder.getContent();
byte[] contentByte = new byte[content.remaining()];
content.get(contentByte);
Expand All @@ -101,7 +109,7 @@ public void testInlineValue() {
}

@Test
public void testSeparatedValue() {
public void testSeparatedValue() throws Exception {
producerBase = mock(ProducerBase.class);

AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
Expand All @@ -119,6 +127,9 @@ public void testSeparatedValue() {

// Check kv.encoding.type SEPARATED
TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
method.setAccessible(true);
method.invoke(typedMessageBuilder);
ByteBuffer content = typedMessageBuilder.getContent();
byte[] contentByte = new byte[content.remaining()];
content.get(contentByte);
Expand Down
Loading