Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] DLQ to handle bytes key properly #23172

Merged
merged 1 commit into from
Aug 15, 2024
Merged

[fix] DLQ to handle bytes key properly #23172

merged 1 commit into from
Aug 15, 2024

Conversation

dlg99
Copy link
Contributor

@dlg99 dlg99 commented Aug 14, 2024

Motivation

DLQ producer implicitly converts byte[] key into a bas64-encoded string representing that byte array.

Modifications

Handling setting of keys that are hasBase64EncodedKey() differently.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added a unit test.

Does this pull request potentially affect one of the following parts:

NO

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: dlg99#19

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 14, 2024
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Good catch!

@lhotari
Copy link
Member

lhotari commented Aug 15, 2024

A message could also have a separate orderingKey. It is used as the secondary key in Key_Shared ordering:

protected byte[] peekMessageKey(Message<?> msg) {
byte[] key = NONE_KEY;
if (msg.hasKey()) {
key = msg.getKeyBytes();
}
if (msg.hasOrderingKey()) {
key = msg.getOrderingKey();
}
return key;
}

The handling for orderingKey is missing completely. I filed a separate issue #23173

@lhotari
Copy link
Member

lhotari commented Aug 15, 2024

These code snippets helped understand this PR:
key and keyBytes in TypedMessageBuilderImpl:

@Override
public TypedMessageBuilder<T> key(String key) {
getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument(
keyValueSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED,
"This method is not allowed to set keys when in encoding type is SEPARATED"));
if (key == null) {
msgMetadata.setNullPartitionKey(true);
return this;
}
msgMetadata.setPartitionKey(key);
msgMetadata.setPartitionKeyB64Encoded(false);
return this;
}
@Override
public TypedMessageBuilder<T> keyBytes(byte[] key) {
getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument(
keyValueSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED,
"This method is not allowed to set keys when in encoding type is SEPARATED"));
if (key == null) {
msgMetadata.setNullPartitionKey(true);
return this;
}
msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(key));
msgMetadata.setPartitionKeyB64Encoded(true);
return this;
}

getKey and getKeyBytes in MessageImpl:
@Override
public String getKey() {
if (msgMetadata.hasPartitionKey()) {
return msgMetadata.getPartitionKey();
} else {
return null;
}
}
@Override
public boolean hasBase64EncodedKey() {
return msgMetadata.isPartitionKeyB64Encoded();
}
@Override
public byte[] getKeyBytes() {
if (!msgMetadata.hasPartitionKey() || msgMetadata.isNullPartitionKey()) {
return null;
} else if (hasBase64EncodedKey()) {
return Base64.getDecoder().decode(getKey());
} else {
return getKey().getBytes(UTF_8);
}
}

@lhotari
Copy link
Member

lhotari commented Aug 15, 2024

DLQ producer implicitly converts byte[] key into a bas64-encoded string representing that byte array.

@dlg99 I guess the key bytes itself doesn't change, but the encoding changes?
What type of problems has this caused?

@codecov-commenter
Copy link

codecov-commenter commented Aug 15, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 74.53%. Comparing base (bbc6224) to head (e167ec0).
Report is 530 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23172      +/-   ##
============================================
+ Coverage     73.57%   74.53%   +0.96%     
+ Complexity    32624     2755   -29869     
============================================
  Files          1877     1920      +43     
  Lines        139502   144468    +4966     
  Branches      15299    15804     +505     
============================================
+ Hits         102638   107679    +5041     
+ Misses        28908    28531     -377     
- Partials       7956     8258     +302     
Flag Coverage Δ
inttests 27.87% <44.44%> (+3.29%) ⬆️
systests 24.78% <27.77%> (+0.46%) ⬆️
unittests 73.86% <100.00%> (+1.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...va/org/apache/pulsar/client/impl/ConsumerImpl.java 79.27% <100.00%> (+1.70%) ⬆️

... and 504 files with indirect coverage changes

@dlg99
Copy link
Contributor Author

dlg99 commented Aug 15, 2024

DLQ producer implicitly converts byte[] key into a bas64-encoded string representing that byte array.

@dlg99 I guess the key bytes itself doesn't change, but the encoding changes? What type of problems has this caused?

Correct, the hasBase64EncodedKey state is lost thus the key is not decoded into bytes.
It caused problems with KeyValue<AVRO, AVRO, SEPARATED> data sent to DLQ - data from the DLQ cannot be read (avro exception, wrong format) with expected schema; it can be read only using KeyValue<BYTES, AVRO, SEPARATED> and manually decoding and parsing the key. This happens because it goes through getKeyBytes() which relies on hasBase64EncodedKey() status to decide whether to to do the base64 decoding

public byte[] getKeyBytes() {
if (!msgMetadata.hasPartitionKey() || msgMetadata.isNullPartitionKey()) {
return null;
} else if (hasBase64EncodedKey()) {
return Base64.getDecoder().decode(getKey());
} else {
return getKey().getBytes(UTF_8);
}
}

@dlg99 dlg99 merged commit 46c25ac into apache:master Aug 15, 2024
62 checks passed
@lhotari
Copy link
Member

lhotari commented Aug 15, 2024

Correct, the hasBase64EncodedKey state is lost thus the key is not decoded into bytes. It caused problems with KeyValue<AVRO, AVRO, SEPARATED> data sent to DLQ - data from the DLQ cannot be read (avro exception, wrong format) with expected schema; it can be read only using KeyValue<BYTES, AVRO, SEPARATED> and manually decoding and parsing the key. This happens because it goes through getKeyBytes() which relies on hasBase64EncodedKey() status to decide whether to to do the base64 decoding

Thanks for sharing the context @dlg99

lhotari pushed a commit that referenced this pull request Aug 15, 2024
lhotari pushed a commit that referenced this pull request Aug 15, 2024
lhotari pushed a commit that referenced this pull request Aug 15, 2024
lhotari pushed a commit that referenced this pull request Aug 15, 2024
dlg99 added a commit to datastax/pulsar that referenced this pull request Aug 15, 2024
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 16, 2024
(cherry picked from commit 46c25ac)
(cherry picked from commit 79cae0a)
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 16, 2024
(cherry picked from commit 46c25ac)
(cherry picked from commit 79cae0a)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 20, 2024
(cherry picked from commit 46c25ac)
(cherry picked from commit 79cae0a)
grssam pushed a commit to grssam/pulsar that referenced this pull request Sep 4, 2024
@lhotari lhotari added this to the 4.0.0 milestone Oct 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants