Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][common]Fix presto sql avro decode error when publish non-batched msgs #17093

Merged
merged 2 commits into from
Aug 18, 2022

Conversation

AnonHxy
Copy link
Contributor

@AnonHxy AnonHxy commented Aug 14, 2022

Motivation

Finally I find the bug, which makes org.apache.pulsar.tests.integration.presto.TestBasicPresto#testForSchema fail, But I still don't know why other PRs can run it success

  • This PR will fix Flaky-test: org.apache.pulsar.tests.integration.presto.TestBasicPresto.testForSchema #16975 OR [Pulsar SQL] Query key-value schema data error #9704
  • The root cause is that:
    • The keyByteBuf we get from currentMessage is not correct (line545 in code block 1), because we didn't decode the key with Base64 when the message is non-batched published. Explain in details:

      • As we know SingleMessageMetadata is only set when publish batched messages. So we will use RawMessageImpl#singleMessageMetadata to decode the key when messages is batched, or use RawMessageImpl#msgMetadata if messages in non-batched, As line174 in code block 2.
      • The problem is that RawMessageImpl#singleMessageMetadata will never be null (line38 code block 3)
      • If we try to decode non-batched message, the line175 in code block 2 will execute and return false.
      • Then line166 in code block 2 will execute and decode the key without Base64

      code block 1: [PulsarRecordCursor.java]:

      int valueLength = dataPayload.readInt();
      valueByteBuf = dataPayload.readSlice(valueLength);
      } else {
      keyByteBuf = this.currentMessage.getKeyBytes().get();
      valueByteBuf = this.currentMessage.getData();
      }

      code block 2 [step into currentMessage.getKeyBytes() in RawMessageImpl.java]:

      public Optional<ByteBuf> getKeyBytes() {
      if (getKey().isPresent()) {
      if (hasBase64EncodedKey()) {
      return Optional.of(Unpooled.wrappedBuffer(Base64.getDecoder().decode(getKey().get())));
      } else {
      return Optional.of(Unpooled.wrappedBuffer(getKey().get().getBytes(StandardCharsets.UTF_8)));
      }
      }
      return Optional.empty();
      }
      @Override
      public boolean hasBase64EncodedKey() {
      if (singleMessageMetadata != null) {
      return singleMessageMetadata.isPartitionKeyB64Encoded();
      }
      return msgMetadata.getMetadata().isPartitionKeyB64Encoded();
      }

      code block 3 [RawMessageImpl.java]

      public class RawMessageImpl implements RawMessage {
      private final RawMessageIdImpl messageId = new RawMessageIdImpl();
      private ReferenceCountedMessageMetadata msgMetadata;
      private final SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
      private ByteBuf payload;

Modifications

  • Add boolean setSingleMessageMetadata in org.apache.pulsar.common.api.raw.RawMessageImpl in order to check if
    singleMessageMetadata is set or not

Verifying this change

  • Make sure that the change passes the CI checks.
  • org.apache.pulsar.tests.integration.presto.TestBasicPresto#testForSchema has covered this change
  • Add UT org.apache.pulsar.common.api.raw.RawMessageImplTest#testNonBatchedMessage
  • Add UT org.apache.pulsar.common.api.raw.RawMessageImplTest#testBatchedMessage

Documentation

Check the box below or label this PR directly.

Need to update docs?

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 14, 2022
@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 14, 2022

/pulsarbot run-failure-checks

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 15, 2022

@codelipenghui @gaoran10 PTAL~

Copy link
Member

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can avoid introducing a new boolean flag associated:

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMes
sageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/Raw
MessageImpl.java
index 9f1d493247f..bc2177a25ed 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImp
l.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImp
l.java
@@ -36,7 +36,7 @@ public class RawMessageImpl implements RawMessage {
     private final RawMessageIdImpl messageId = new RawMessageIdImpl();
 
     private ReferenceCountedMessageMetadata msgMetadata;
-    private final SingleMessageMetadata singleMessageMetadata = new SingleMessa
geMetadata();
+    private volatile SingleMessageMetadata singleMessageMetadata;
     private ByteBuf payload;
 
     private static final Recycler<RawMessageImpl> RECYCLER = new Recycler<RawMessageImpl>() {
@@ -56,7 +56,7 @@ public class RawMessageImpl implements RawMessage {
     public void release() {
         msgMetadata.release();
         msgMetadata = null;
-        singleMessageMetadata.clear();
+        singleMessageMetadata = null;
 
         payload.release();
         handle.recycle(this);
@@ -71,7 +71,7 @@ public class RawMessageImpl implements RawMessage {
         msg.msgMetadata.retain();
 
         if (singleMessageMetadata != null) {
-            msg.singleMessageMetadata.copyFrom(singleMessageMetadata);
+            msg.singleMessageMetadata = singleMessageMetadata;
         }
         msg.messageId.ledgerId = ledgerId;
         msg.messageId.entryId = entryId;

... while I'm unsure whether we can hold the reference, or we should deep copy. It seems we can barely hold the ref from my first glance, though.

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 15, 2022

It seems we can barely hold the ref from my first glance, though

We didn't know weather RawMessageImpl#singleMessageMetadata is set value or has been cleared, through SingleMessageMetadata's properties

There is a field SingleMessageMetadata#_bitField0, which value of 0 means SingleMessageMetadata didn't set any value. But it is private accessible, we can not use this flag @tisonkun

@tisonkun
Copy link
Member

@AnonHxy please take a closer look at the patch, it sets singleMessageMetadata to null when release. Then you don't have to modify other code in the file and we can use singleMessageMetadata == null checker.

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 15, 2022

Now I got that @tisonkun . But I think we have to use deep copy here. The reason is that:

  • There is only one SingleMessageMetadata instance in the thread, which get from threadlocal(Line163). And the
    processor maybe process in another thread(Line173).

  • If we didn't deep copy the SingleMessageMetadata, the SingleMessageMetadata could be modified while
    processor processes the RawMessageImpl.

Although we can create a new SingleMessageMetadata every time when RawMessageImpl#get and deep copy to it, which will save the boolean flag, I think that it is not a good way, because this way will create a lot of SingleMessageMetadata instance.

private static void receiveIndividualMessagesFromBatch(ReferenceCountedMessageMetadata msgMetadata,
ByteBuf uncompressedPayload, long ledgerId, long entryId, MessageProcessor processor) {
int batchSize = msgMetadata.getMetadata().getNumMessagesInBatch();
try {
for (int i = 0; i < batchSize; ++i) {
SingleMessageMetadata singleMessageMetadata = LOCAL_SINGLE_MESSAGE_METADATA.get();
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
singleMessageMetadata, i, batchSize);
if (singleMessageMetadata.isCompactedOut()) {
// message has been compacted out, so don't send to the user
singleMessagePayload.release();
continue;
}
processor.process(RawMessageImpl.get(msgMetadata, singleMessageMetadata, singleMessagePayload,
ledgerId, entryId, i));
}

Copy link
Member

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your explanation @AnonHxy!

LGTM.

Still, I don't like the way introduce an associated boolean flag, so you may think of deep copy based on my suggested patch - that is, set to null when release, create a new SingleMessageMetadata and copyForm the parameter when get. This way may have short coming on object reuse so it's not a net win. I'm OK with either approach.

@codelipenghui codelipenghui added this to the 2.12.0 milestone Aug 15, 2022
@codelipenghui codelipenghui added type/bug The PR fixed a bug or issue reported a bug area/sql Pulsar SQL related features release/2.10.2 release/2.9.4 release/2.8.5 release/2.11.1 labels Aug 15, 2022
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix looks good, could you please help add test to avoid the regression?

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 15, 2022

The fix looks good, could you please help add test to avoid the regression?

OK, have add UT, PTAL @codelipenghui

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 15, 2022

/pulsarbot run-failure-checks

1 similar comment
@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 16, 2022

/pulsarbot run-failure-checks

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 16, 2022

@Jason918 @Technoboy- Could you also please help take a look

@codelipenghui codelipenghui merged commit 4b98b23 into apache:master Aug 18, 2022
@Technoboy- Technoboy- changed the title [fix][test][sql]Fix presto sql avro decode error when publish non-batched msgs [fix][sql]Fix presto sql avro decode error when publish non-batched msgs Aug 19, 2022
@Technoboy- Technoboy- changed the title [fix][sql]Fix presto sql avro decode error when publish non-batched msgs [fix][common]Fix presto sql avro decode error when publish non-batched msgs Aug 19, 2022
@gaoran10
Copy link
Contributor

More context

Before this change we can use this singleMessageMetadata != null to judge the message is batched or not, it seems that after this change, the test should be failed. It's strange.

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 22, 2022

The modification in pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java as show below changed the behavior.

Before this modification, the singleMessageMetadata could be null if the method param is null. But after this modification, the singleMessageMetadata will never be null, and it uses copyFrom to set value. @gaoran10

image

Jason918 pushed a commit that referenced this pull request Sep 4, 2022
@mattisonchao mattisonchao added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Sep 13, 2022
zymap pushed a commit that referenced this pull request Sep 15, 2022
@zymap zymap added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Sep 15, 2022
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Sep 16, 2022
…sgs (apache#17093)

(cherry picked from commit 4b98b23)
(cherry picked from commit 102735f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/sql Pulsar SQL related features cherry-picked/branch-2.8 Archived: 2.8 is end of life cherry-picked/branch-2.9 Archived: 2.9 is end of life cherry-picked/branch-2.10 cherry-picked/branch-2.11 doc-not-needed Your PR changes do not impact docs release/2.8.5 release/2.9.4 release/2.10.2 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flaky-test: org.apache.pulsar.tests.integration.presto.TestBasicPresto.testForSchema
8 participants