Skip to content

Commit

Permalink
refactor: improve the partition compute (#1151)
Browse files Browse the repository at this point in the history
## Rationale
The previous implementation of computing partition key is coarse,
leading to poor performance.

## Detailed Changes
Refactor the implementation of computing partition key, and the most of
memory allocation is avoided.

## Test Plan
Existing unit tests.
  • Loading branch information
ShiKaiWi authored Aug 15, 2023
1 parent 1104866 commit b59e07e
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 71 deletions.
5 changes: 2 additions & 3 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ impl Datum {
Datum::Timestamp(v) => v.as_i64() as u64,
Datum::Double(v) => *v as u64,
Datum::Float(v) => *v as u64,
Datum::Varbinary(v) => hash64(v),
Datum::Varbinary(v) => hash64(&v[..]),
Datum::String(v) => hash64(v.as_bytes()),
Datum::UInt64(v) => *v,
Datum::UInt32(v) => *v as u64,
Expand Down Expand Up @@ -937,7 +937,6 @@ impl Datum {
}
}

#[cfg(test)]
pub fn as_view(&self) -> DatumView {
match self {
Datum::Null => DatumView::Null,
Expand Down Expand Up @@ -1056,7 +1055,7 @@ impl Serialize for Datum {
/// A view to a datum.
///
/// Holds copy of integer like datum and reference of string like datum.
#[derive(Debug, PartialEq, PartialOrd)]
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub enum DatumView<'a> {
Null,
Timestamp(Timestamp),
Expand Down
10 changes: 5 additions & 5 deletions components/hash_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/// - Memory: aHash
/// - Disk: SeaHash
/// https://github.com/CeresDB/hash-benchmark-rs
use std::hash::BuildHasher;
use std::{hash::BuildHasher, io::Read};

pub use ahash;
use byteorder::{ByteOrder, LittleEndian};
Expand All @@ -34,9 +34,9 @@ impl BuildHasher for SeaHasherBuilder {
}
}

pub fn hash64(mut bytes: &[u8]) -> u64 {
pub fn hash64<R: Read>(mut source: R) -> u64 {
let mut out = [0; 16];
murmur3_x64_128(&mut bytes, 0, &mut out);
murmur3_x64_128(&mut source, 0, &mut out);
// in most cases we run on little endian target
LittleEndian::read_u64(&out[0..8])
}
Expand All @@ -53,13 +53,13 @@ mod test {

#[test]
fn test_murmur_hash() {
assert_eq!(hash64(&[]), 0);
assert_eq!(hash64(&(vec![])[..]), 0);

for (key, code) in [
(b"cse_engine_hash_mod_test_bytes1", 6401327391689448380),
(b"cse_engine_hash_mod_test_bytes2", 10824100215277000151),
] {
assert_eq!(code, hash64(key));
assert_eq!(code, hash64(key.as_slice()));
}
}

Expand Down
2 changes: 1 addition & 1 deletion interpreters/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl<'a> TsidBuilder<'a> {
}

fn finish(self) -> u64 {
hash64(self.hash_bytes)
hash64(&self.hash_bytes[..])
}
}

Expand Down
2 changes: 1 addition & 1 deletion query_frontend/src/promql/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl UUIDBuilder {
}

fn finish(self) -> u64 {
hash64(&self.buf)
hash64(&self.buf[..])
}
}

Expand Down
3 changes: 3 additions & 0 deletions table_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ smallvec = { workspace = true }
snafu = { workspace = true }
tokio = { workspace = true }
trace_metric = { workspace = true }

[dev-dependencies]
common_types = { workspace = true, features = ["test"] }
15 changes: 6 additions & 9 deletions table_engine/src/partition/rule/df_adapter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ impl DfPartitionRuleAdapter {

#[cfg(test)]
mod tests {
use bytes_ext::BytesMut;
use common_types::{
column_schema,
datum::{Datum, DatumKind},
Expand Down Expand Up @@ -121,9 +120,8 @@ mod tests {
Datum::String(StringBytes::from("test")),
Datum::UInt64(42),
];
let partition_key_refs = partition_keys.iter().collect::<Vec<_>>();
let mut buf = BytesMut::new();
let expected = compute_partition(&partition_key_refs, partition_num, &mut buf);
let partition_key_refs = partition_keys.iter().map(Datum::as_view);
let expected = compute_partition(partition_key_refs, partition_num);

assert_eq!(partitions[0], expected);

Expand Down Expand Up @@ -239,12 +237,11 @@ mod tests {

// Expected
let partition_keys_1 = test_datums[0].clone();
let partition_key_refs_1 = partition_keys_1.iter().collect::<Vec<_>>();
let partition_key_refs_1 = partition_keys_1.iter().map(Datum::as_view);
let partition_keys_2 = test_datums[1].clone();
let partition_key_refs_2 = partition_keys_2.iter().collect::<Vec<_>>();
let mut buf = BytesMut::new();
let expected_1 = compute_partition(&partition_key_refs_1, partition_num, &mut buf);
let expected_2 = compute_partition(&partition_key_refs_2, partition_num, &mut buf);
let partition_key_refs_2 = partition_keys_2.iter().map(Datum::as_view);
let expected_1 = compute_partition(partition_key_refs_1, partition_num);
let expected_2 = compute_partition(partition_key_refs_2, partition_num);
let expecteds = vec![expected_1, expected_2];

assert_eq!(partitions, expecteds);
Expand Down
Loading

0 comments on commit b59e07e

Please sign in to comment.