Skip to content

Commit

Permalink
[fix][sql]Fix presto sql avro decode error when publish non-batched m…
Browse files Browse the repository at this point in the history
…sgs (#17093)
  • Loading branch information
AnonHxy authored Aug 18, 2022
1 parent 84968e8 commit 4b98b23
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class RawMessageImpl implements RawMessage {

private ReferenceCountedMessageMetadata msgMetadata;
private final SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
private volatile boolean setSingleMessageMetadata;
private ByteBuf payload;

private static final Recycler<RawMessageImpl> RECYCLER = new Recycler<RawMessageImpl>() {
Expand All @@ -57,6 +58,7 @@ public void release() {
msgMetadata.release();
msgMetadata = null;
singleMessageMetadata.clear();
setSingleMessageMetadata = false;

payload.release();
handle.recycle(this);
Expand All @@ -72,6 +74,7 @@ public static RawMessage get(ReferenceCountedMessageMetadata msgMetadata,

if (singleMessageMetadata != null) {
msg.singleMessageMetadata.copyFrom(singleMessageMetadata);
msg.setSingleMessageMetadata = true;
}
msg.messageId.ledgerId = ledgerId;
msg.messageId.entryId = entryId;
Expand All @@ -90,7 +93,7 @@ public RawMessage updatePayloadForChunkedMessage(ByteBuf chunkedTotalPayload) {

@Override
public Map<String, String> getProperties() {
if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) {
if (setSingleMessageMetadata && singleMessageMetadata.getPropertiesCount() > 0) {
return singleMessageMetadata.getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
(oldValue, newValue) -> newValue));
Expand Down Expand Up @@ -119,7 +122,7 @@ public long getPublishTime() {

@Override
public long getEventTime() {
if (singleMessageMetadata != null && singleMessageMetadata.hasEventTime()) {
if (setSingleMessageMetadata && singleMessageMetadata.hasEventTime()) {
return singleMessageMetadata.getEventTime();
} else if (msgMetadata.getMetadata().hasEventTime()) {
return msgMetadata.getMetadata().getEventTime();
Expand All @@ -140,7 +143,7 @@ public String getProducerName() {

@Override
public Optional<String> getKey() {
if (singleMessageMetadata != null && singleMessageMetadata.hasPartitionKey()) {
if (setSingleMessageMetadata && singleMessageMetadata.hasPartitionKey()) {
return Optional.of(singleMessageMetadata.getPartitionKey());
} else if (msgMetadata.getMetadata().hasPartitionKey()){
return Optional.of(msgMetadata.getMetadata().getPartitionKey());
Expand Down Expand Up @@ -171,7 +174,7 @@ public Optional<ByteBuf> getKeyBytes() {

@Override
public boolean hasBase64EncodedKey() {
if (singleMessageMetadata != null) {
if (setSingleMessageMetadata) {
return singleMessageMetadata.isPartitionKeyB64Encoded();
}
return msgMetadata.getMetadata().isPartitionKeyB64Encoded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@
*/
package org.apache.pulsar.common.api.raw;

import static java.util.Collections.singletonList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import java.util.Map;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import java.util.Map;

import static org.testng.Assert.assertEquals;

public class RawMessageImplTest {

private static final String HARD_CODE_KEY = "__pfn_input_topic__";
Expand All @@ -38,7 +43,7 @@ public class RawMessageImplTest {
@Test
public void testGetProperties() {
ReferenceCountedMessageMetadata refCntMsgMetadata =
ReferenceCountedMessageMetadata.get(Mockito.mock(ByteBuf.class));
ReferenceCountedMessageMetadata.get(mock(ByteBuf.class));
SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_FIRST);
singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_SECOND);
Expand All @@ -50,4 +55,42 @@ public void testGetProperties() {
assertEquals(KEY_VALUE_SECOND, properties.get(HARD_CODE_KEY));
assertEquals(HARD_CODE_KEY_ID_VALUE, properties.get(HARD_CODE_KEY_ID));
}

@Test
public void testNonBatchedMessage() {
MessageMetadata messageMetadata = new MessageMetadata();
messageMetadata.setPartitionKeyB64Encoded(true);
messageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key1").setValue("value1")));
messageMetadata.setEventTime(100L);

ReferenceCountedMessageMetadata refCntMsgMetadata = mock(ReferenceCountedMessageMetadata.class);
when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata);

// Non-batched message's singleMessageMetadata is null
RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, null, null, 0, 0, 0);
assertTrue(msg.hasBase64EncodedKey());
assertEquals(msg.getProperties(), ImmutableMap.of("key1", "value1"));
assertEquals(msg.getEventTime(), 100L);
}

@Test
public void testBatchedMessage() {
MessageMetadata messageMetadata = new MessageMetadata();
messageMetadata.setPartitionKeyB64Encoded(true);
messageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key1").setValue("value1")));
messageMetadata.setEventTime(100L);

ReferenceCountedMessageMetadata refCntMsgMetadata = mock(ReferenceCountedMessageMetadata.class);
when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata);

SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
singleMessageMetadata.setPartitionKeyB64Encoded(false);
singleMessageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key2").setValue("value2")));
singleMessageMetadata.setEventTime(200L);

RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, singleMessageMetadata, null, 0, 0, 0);
assertFalse(msg.hasBase64EncodedKey());
assertEquals(msg.getProperties(), ImmutableMap.of("key2", "value2"));
assertEquals(msg.getEventTime(), 200L);
}
}

0 comments on commit 4b98b23

Please sign in to comment.