Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(common): fix memcomparable encoding for list #4599

Merged
merged 10 commits into from
Aug 12, 2022

Conversation

wzzzzd
Copy link
Contributor

@wzzzzd wzzzzd commented Aug 11, 2022

I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.

What's changed and what's your intention?

unfortunately, after #4575, the encoding for list is still wrong😇. The length tag before list elements corrupts the ordering. In this pr I use the same encoding pattern for varchar and list and the result becomes correct finally.

(BTW, the deserialization of list seems a bit dirty, maybe it can be written in a prettier way...

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

Documentation

If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.

Types of user-facing changes

Please keep the types that apply to your changes, and remove those that do not apply.

  • Installation and deployment
  • Connector (sources & sinks)
  • SQL commands, functions, and operators
  • RisingWave cluster configuration changes
  • Other (please specify in the release note below)

Release note

Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.

Refer to a related PR or issue link (optional)

@wzzzzd wzzzzd added type/fix Bug fix component/common Common components, such as array, data chunk, expression. labels Aug 11, 2022
@codecov
Copy link

codecov bot commented Aug 11, 2022

Codecov Report

Merging #4599 (7393202) into main (944090f) will increase coverage by 0.05%.
The diff coverage is 68.62%.

@@            Coverage Diff             @@
##             main    #4599      +/-   ##
==========================================
+ Coverage   74.15%   74.20%   +0.05%     
==========================================
  Files         862      860       -2     
  Lines      128207   128188      -19     
==========================================
+ Hits        95070    95124      +54     
+ Misses      33137    33064      -73     
Flag Coverage Δ
rust 74.20% <68.62%> (+0.05%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/common/src/catalog/physical_table.rs 82.60% <ø> (ø)
src/common/src/hash/key.rs 85.33% <0.00%> (-0.17%) ⬇️
src/common/src/types/mod.rs 65.58% <0.00%> (+0.89%) ⬆️
...c/compute/src/compute_observer/observer_manager.rs 0.00% <0.00%> (ø)
.../src/source/filesystem/s3/source/s3_file_reader.rs 0.00% <0.00%> (ø)
src/connector/src/source/kafka/source/message.rs 0.00% <0.00%> (ø)
src/connector/src/source/kafka/split.rs 48.57% <0.00%> (-1.43%) ⬇️
.../connector/src/source/kinesis/enumerator/client.rs 0.00% <0.00%> (ø)
src/connector/src/source/kinesis/source/message.rs 0.00% <0.00%> (ø)
src/connector/src/source/kinesis/source/reader.rs 0.00% <0.00%> (ø)
... and 95 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Comment on lines +454 to +462
ListRef::Indexed { arr, idx } => (arr.offsets[*idx]..arr.offsets[*idx + 1])
.map(|o| arr.value.value_at(o))
.try_for_each(|datum_ref| {
serialize_datum_ref_into(&datum_ref, &mut inner_serializer)
})?,
ListRef::ValueRef { val } => val
.values
.iter()
.try_for_each(|datum| serialize_datum_into(datum, &mut inner_serializer))?,
Copy link
Contributor

@neverchanje neverchanje Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the values_ref function to iterate the elements. Actually, we can do a slight refactoring to it and make it efficient for iterating:

fn values_ref(&self) -> Vec<DatumRef<'a>> {
  self.iter_values().collect()
}

fn iter_values() -> Iterator<DatumRef<'a>> {
  ...
}

Then you can simplify serialize by using iter_values()

    pub fn serialize(
        &self,
        serializer: &mut memcomparable::Serializer<impl BufMut>,
    ) -> memcomparable::Result<()> {
      self.iter_values_().try_for_each(|d| ...)
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For different variants, the concrete type of the iterator is different, so we may need to introduce auto_enum here which needs to match the variant on every call of next. Not sure whether this refactoring is worthwhile.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From tests we can see that currently we support nested array, e.g. [[1, 2], [2, 3]]. Is memcomparable really suitable for this case?

Copy link
Contributor

@neverchanje neverchanje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore, we can find some e2e queries that the prior version will panic or give incorrect answers.

Comment on lines +456 to +458
.try_for_each(|datum_ref| {
serialize_datum_ref_into(&datum_ref, &mut inner_serializer)
})?,
Copy link
Contributor

@xiangjinwu xiangjinwu Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be wrong if we compare the 2 rows (have not tested on this branch yet):
values (array[1, 2], 3), (array[1], 3) order by 1, 2;

The memcomparable encoding for varchar contains a signal byte after each 8-byte chunk to indicate termination, and prevents accidental comparison between a longer value (array[1, 2]) to the next column of shorter value (3 after array[1]).

https://github.com/singularity-data/risingwave/blob/dfefd0e4cfdf0a4507e4ff8d7d286ba563186d42/src/utils/memcomparable/src/ser.rs#L201-L205

Similar test case in varchar:
values ('ab', ''), ('a', 'c') order by 1, 2;

Applying to list type here, it should also have a signal byte after every N elements, with 0 padding for the last chunk. The choice of a proper N can be discussed separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad that I missed serializer.serialize_bytes(&inner_serializer.into_inner()) at the end ... Just make sure it works as expected for the case above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests added

@@ -242,6 +242,25 @@ impl DataType {
List { datatype } => datatype.mem_cmp_eq_value_enc(),
}
}

/// Check if the datatype has fixed length.
pub fn has_fixed_length(&self) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used this for deserialization, but we no longer need it now😇

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +351 to +369
// This is a bit dirty, but idk how to correctly deserialize bytes in memcomparable
// format without this...
struct Visitor<'a>(&'a DataType);
impl<'a> serde::de::Visitor<'a> for Visitor<'a> {
type Value = Vec<u8>;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "a list of {}", self.0)
}

fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(v)
}
}
let visitor = Visitor(datatype);
let bytes = deserializer.deserialize_byte_buf(visitor)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I found this tentative solution:

let bytes: &[u8] = Deserialize::deserialize(deserializer)?;

However it is reporting error:

invalid type: byte array, expected a borrowed byte array

Internally there is a builtin BytesVisitor, which only implements visit_borrowed_bytes but not visit_bytes.

While the comment of visit_bytes says:

It is never correct to implement visit_byte_buf without implementing visit_bytes. Implement neither, both, or just visit_bytes.

Not sure if this is a bug in the serde library ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, so I choose to define a Visitor myself....

@mergify mergify bot merged commit bb66ea8 into risingwavelabs:main Aug 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/common Common components, such as array, data chunk, expression. type/fix Bug fix
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants