diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java index 9f1d493247fa1..1424af0c359e4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java @@ -37,6 +37,7 @@ public class RawMessageImpl implements RawMessage { private ReferenceCountedMessageMetadata msgMetadata; private final SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); + private volatile boolean setSingleMessageMetadata; private ByteBuf payload; private static final Recycler RECYCLER = new Recycler() { @@ -57,6 +58,7 @@ public void release() { msgMetadata.release(); msgMetadata = null; singleMessageMetadata.clear(); + setSingleMessageMetadata = false; payload.release(); handle.recycle(this); @@ -72,6 +74,7 @@ public static RawMessage get(ReferenceCountedMessageMetadata msgMetadata, if (singleMessageMetadata != null) { msg.singleMessageMetadata.copyFrom(singleMessageMetadata); + msg.setSingleMessageMetadata = true; } msg.messageId.ledgerId = ledgerId; msg.messageId.entryId = entryId; @@ -90,7 +93,7 @@ public RawMessage updatePayloadForChunkedMessage(ByteBuf chunkedTotalPayload) { @Override public Map getProperties() { - if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) { + if (setSingleMessageMetadata && singleMessageMetadata.getPropertiesCount() > 0) { return singleMessageMetadata.getPropertiesList().stream() .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue, (oldValue, newValue) -> newValue)); @@ -119,7 +122,7 @@ public long getPublishTime() { @Override public long getEventTime() { - if (singleMessageMetadata != null && singleMessageMetadata.hasEventTime()) { + if (setSingleMessageMetadata && singleMessageMetadata.hasEventTime()) { return singleMessageMetadata.getEventTime(); } else if (msgMetadata.getMetadata().hasEventTime()) { return msgMetadata.getMetadata().getEventTime(); @@ -140,7 +143,7 @@ public String getProducerName() { @Override public Optional getKey() { - if (singleMessageMetadata != null && singleMessageMetadata.hasPartitionKey()) { + if (setSingleMessageMetadata && singleMessageMetadata.hasPartitionKey()) { return Optional.of(singleMessageMetadata.getPartitionKey()); } else if (msgMetadata.getMetadata().hasPartitionKey()){ return Optional.of(msgMetadata.getMetadata().getPartitionKey()); @@ -171,7 +174,7 @@ public Optional getKeyBytes() { @Override public boolean hasBase64EncodedKey() { - if (singleMessageMetadata != null) { + if (setSingleMessageMetadata) { return singleMessageMetadata.isPartitionKeyB64Encoded(); } return msgMetadata.getMetadata().isPartitionKeyB64Encoded(); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java index 99a5f02a2e27c..e7270e4546faa 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java @@ -18,15 +18,20 @@ */ package org.apache.pulsar.common.api.raw; +import static java.util.Collections.singletonList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; +import java.util.Map; +import org.apache.pulsar.common.api.proto.KeyValue; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; -import org.mockito.Mockito; import org.testng.annotations.Test; -import java.util.Map; - -import static org.testng.Assert.assertEquals; - public class RawMessageImplTest { private static final String HARD_CODE_KEY = "__pfn_input_topic__"; @@ -38,7 +43,7 @@ public class RawMessageImplTest { @Test public void testGetProperties() { ReferenceCountedMessageMetadata refCntMsgMetadata = - ReferenceCountedMessageMetadata.get(Mockito.mock(ByteBuf.class)); + ReferenceCountedMessageMetadata.get(mock(ByteBuf.class)); SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_FIRST); singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_SECOND); @@ -50,4 +55,42 @@ public void testGetProperties() { assertEquals(KEY_VALUE_SECOND, properties.get(HARD_CODE_KEY)); assertEquals(HARD_CODE_KEY_ID_VALUE, properties.get(HARD_CODE_KEY_ID)); } + + @Test + public void testNonBatchedMessage() { + MessageMetadata messageMetadata = new MessageMetadata(); + messageMetadata.setPartitionKeyB64Encoded(true); + messageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key1").setValue("value1"))); + messageMetadata.setEventTime(100L); + + ReferenceCountedMessageMetadata refCntMsgMetadata = mock(ReferenceCountedMessageMetadata.class); + when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata); + + // Non-batched message's singleMessageMetadata is null + RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, null, null, 0, 0, 0); + assertTrue(msg.hasBase64EncodedKey()); + assertEquals(msg.getProperties(), ImmutableMap.of("key1", "value1")); + assertEquals(msg.getEventTime(), 100L); + } + + @Test + public void testBatchedMessage() { + MessageMetadata messageMetadata = new MessageMetadata(); + messageMetadata.setPartitionKeyB64Encoded(true); + messageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key1").setValue("value1"))); + messageMetadata.setEventTime(100L); + + ReferenceCountedMessageMetadata refCntMsgMetadata = mock(ReferenceCountedMessageMetadata.class); + when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata); + + SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); + singleMessageMetadata.setPartitionKeyB64Encoded(false); + singleMessageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key2").setValue("value2"))); + singleMessageMetadata.setEventTime(200L); + + RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, singleMessageMetadata, null, 0, 0, 0); + assertFalse(msg.hasBase64EncodedKey()); + assertEquals(msg.getProperties(), ImmutableMap.of("key2", "value2")); + assertEquals(msg.getEventTime(), 200L); + } } \ No newline at end of file