From b59e07e51c4dff5dd28b049c256abfde3cd0e5fc Mon Sep 17 00:00:00 2001 From: WEI Xikai Date: Tue, 15 Aug 2023 13:58:27 +0800 Subject: [PATCH] refactor: improve the partition compute (#1151) ## Rationale The previous implementation of computing partition key is coarse, leading to poor performance. ## Detailed Changes Refactor the implementation of computing partition key, and the most of memory allocation is avoided. ## Test Plan Existing unit tests. --- common_types/src/datum.rs | 5 +- components/hash_ext/src/lib.rs | 10 +- interpreters/src/insert.rs | 2 +- query_frontend/src/promql/udf.rs | 2 +- table_engine/Cargo.toml | 3 + .../src/partition/rule/df_adapter/mod.rs | 15 +- table_engine/src/partition/rule/key.rs | 236 ++++++++++++++---- 7 files changed, 202 insertions(+), 71 deletions(-) diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index 04fa2b683b..f716357d54 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -498,7 +498,7 @@ impl Datum { Datum::Timestamp(v) => v.as_i64() as u64, Datum::Double(v) => *v as u64, Datum::Float(v) => *v as u64, - Datum::Varbinary(v) => hash64(v), + Datum::Varbinary(v) => hash64(&v[..]), Datum::String(v) => hash64(v.as_bytes()), Datum::UInt64(v) => *v, Datum::UInt32(v) => *v as u64, @@ -937,7 +937,6 @@ impl Datum { } } - #[cfg(test)] pub fn as_view(&self) -> DatumView { match self { Datum::Null => DatumView::Null, @@ -1056,7 +1055,7 @@ impl Serialize for Datum { /// A view to a datum. /// /// Holds copy of integer like datum and reference of string like datum. -#[derive(Debug, PartialEq, PartialOrd)] +#[derive(Clone, Debug, PartialEq, PartialOrd)] pub enum DatumView<'a> { Null, Timestamp(Timestamp), diff --git a/components/hash_ext/src/lib.rs b/components/hash_ext/src/lib.rs index b8f99e124e..fd835bc794 100644 --- a/components/hash_ext/src/lib.rs +++ b/components/hash_ext/src/lib.rs @@ -16,7 +16,7 @@ /// - Memory: aHash /// - Disk: SeaHash /// https://github.com/CeresDB/hash-benchmark-rs -use std::hash::BuildHasher; +use std::{hash::BuildHasher, io::Read}; pub use ahash; use byteorder::{ByteOrder, LittleEndian}; @@ -34,9 +34,9 @@ impl BuildHasher for SeaHasherBuilder { } } -pub fn hash64(mut bytes: &[u8]) -> u64 { +pub fn hash64(mut source: R) -> u64 { let mut out = [0; 16]; - murmur3_x64_128(&mut bytes, 0, &mut out); + murmur3_x64_128(&mut source, 0, &mut out); // in most cases we run on little endian target LittleEndian::read_u64(&out[0..8]) } @@ -53,13 +53,13 @@ mod test { #[test] fn test_murmur_hash() { - assert_eq!(hash64(&[]), 0); + assert_eq!(hash64(&(vec![])[..]), 0); for (key, code) in [ (b"cse_engine_hash_mod_test_bytes1", 6401327391689448380), (b"cse_engine_hash_mod_test_bytes2", 10824100215277000151), ] { - assert_eq!(code, hash64(key)); + assert_eq!(code, hash64(key.as_slice())); } } diff --git a/interpreters/src/insert.rs b/interpreters/src/insert.rs index 8c094f41c8..26d3746a5d 100644 --- a/interpreters/src/insert.rs +++ b/interpreters/src/insert.rs @@ -204,7 +204,7 @@ impl<'a> TsidBuilder<'a> { } fn finish(self) -> u64 { - hash64(self.hash_bytes) + hash64(&self.hash_bytes[..]) } } diff --git a/query_frontend/src/promql/udf.rs b/query_frontend/src/promql/udf.rs index 8fa798191e..27d7366bef 100644 --- a/query_frontend/src/promql/udf.rs +++ b/query_frontend/src/promql/udf.rs @@ -152,7 +152,7 @@ impl UUIDBuilder { } fn finish(self) -> u64 { - hash64(&self.buf) + hash64(&self.buf[..]) } } diff --git a/table_engine/Cargo.toml b/table_engine/Cargo.toml index 0d38d72483..27027f9b49 100644 --- a/table_engine/Cargo.toml +++ b/table_engine/Cargo.toml @@ -48,3 +48,6 @@ smallvec = { workspace = true } snafu = { workspace = true } tokio = { workspace = true } trace_metric = { workspace = true } + +[dev-dependencies] +common_types = { workspace = true, features = ["test"] } diff --git a/table_engine/src/partition/rule/df_adapter/mod.rs b/table_engine/src/partition/rule/df_adapter/mod.rs index 85df128f58..6869b08eb0 100644 --- a/table_engine/src/partition/rule/df_adapter/mod.rs +++ b/table_engine/src/partition/rule/df_adapter/mod.rs @@ -74,7 +74,6 @@ impl DfPartitionRuleAdapter { #[cfg(test)] mod tests { - use bytes_ext::BytesMut; use common_types::{ column_schema, datum::{Datum, DatumKind}, @@ -121,9 +120,8 @@ mod tests { Datum::String(StringBytes::from("test")), Datum::UInt64(42), ]; - let partition_key_refs = partition_keys.iter().collect::>(); - let mut buf = BytesMut::new(); - let expected = compute_partition(&partition_key_refs, partition_num, &mut buf); + let partition_key_refs = partition_keys.iter().map(Datum::as_view); + let expected = compute_partition(partition_key_refs, partition_num); assert_eq!(partitions[0], expected); @@ -239,12 +237,11 @@ mod tests { // Expected let partition_keys_1 = test_datums[0].clone(); - let partition_key_refs_1 = partition_keys_1.iter().collect::>(); + let partition_key_refs_1 = partition_keys_1.iter().map(Datum::as_view); let partition_keys_2 = test_datums[1].clone(); - let partition_key_refs_2 = partition_keys_2.iter().collect::>(); - let mut buf = BytesMut::new(); - let expected_1 = compute_partition(&partition_key_refs_1, partition_num, &mut buf); - let expected_2 = compute_partition(&partition_key_refs_2, partition_num, &mut buf); + let partition_key_refs_2 = partition_keys_2.iter().map(Datum::as_view); + let expected_1 = compute_partition(partition_key_refs_1, partition_num); + let expected_2 = compute_partition(partition_key_refs_2, partition_num); let expecteds = vec![expected_1, expected_2]; assert_eq!(partitions, expecteds); diff --git a/table_engine/src/partition/rule/key.rs b/table_engine/src/partition/rule/key.rs index cc5b59f309..554b0ddf71 100644 --- a/table_engine/src/partition/rule/key.rs +++ b/table_engine/src/partition/rule/key.rs @@ -16,9 +16,8 @@ use std::collections::{HashMap, HashSet}; -use bytes_ext::{BufMut, BytesMut}; use common_types::{ - datum::Datum, + datum::{Datum, DatumView}, row::{Row, RowGroup}, }; use hash_ext::hash64; @@ -109,37 +108,32 @@ impl KeyRule { Ok(groups) } - fn compute_partition_for_inserted_row( - &self, - row: &Row, - target_column_idxs: &[usize], - buf: &mut BytesMut, - ) -> usize { + fn compute_partition_for_inserted_row(&self, row: &Row, target_column_idxs: &[usize]) -> usize { let partition_keys = target_column_idxs .iter() - .map(|col_idx| &row[*col_idx]) - .collect_vec(); - compute_partition(&partition_keys, self.partition_num, buf) + .map(|col_idx| row[*col_idx].as_view()); + compute_partition(partition_keys, self.partition_num) } fn compute_partition_for_keys_group( &self, group: &[usize], filters: &[PartitionFilter], - buf: &mut BytesMut, ) -> Result> { - buf.clear(); - let mut partitions = HashSet::new(); let expanded_group = expand_partition_keys_group(group, filters)?; for partition_keys in expanded_group { - let partition_key_refs = partition_keys.iter().collect_vec(); - let partition = compute_partition(&partition_key_refs, self.partition_num, buf); + let partition = compute_partition(partition_keys.into_iter(), self.partition_num); partitions.insert(partition); } Ok(partitions) } + + #[inline] + fn all_partitions(&self) -> Vec { + (0..self.partition_num).collect_vec() + } } impl PartitionRule for KeyRule { @@ -167,20 +161,17 @@ impl PartitionRule for KeyRule { })?; // Compute partitions. - let mut buf = BytesMut::new(); let partitions = row_group .iter() - .map(|row| self.compute_partition_for_inserted_row(row, &typed_idxs, &mut buf)) + .map(|row| self.compute_partition_for_inserted_row(row, &typed_idxs)) .collect(); Ok(partitions) } fn locate_partitions_for_read(&self, filters: &[PartitionFilter]) -> Result> { - let all_partitions = (0..self.partition_num).collect(); - // Filters are empty. if filters.is_empty() { - return Ok(all_partitions); + return Ok(self.all_partitions()); } // Group the filters by their columns. @@ -192,20 +183,18 @@ impl PartitionRule for KeyRule { }) .unwrap_or_default(); if candidate_partition_keys_groups.is_empty() { - return Ok(all_partitions); + return Ok(self.all_partitions()); } - let mut buf = BytesMut::new(); let (first_group, rest_groups) = candidate_partition_keys_groups.split_first().unwrap(); - let mut target_partitions = - self.compute_partition_for_keys_group(first_group, filters, &mut buf)?; + let mut target_partitions = self.compute_partition_for_keys_group(first_group, filters)?; for group in rest_groups { // Same as above, if found invalid, return all partitions. - let partitions = match self.compute_partition_for_keys_group(group, filters, &mut buf) { + let partitions = match self.compute_partition_for_keys_group(group, filters) { Ok(partitions) => partitions, Err(e) => { error!("KeyRule locate partition for read, err:{}", e); - return Ok(all_partitions); + return Ok(self.all_partitions()); } }; @@ -219,18 +208,18 @@ impl PartitionRule for KeyRule { } } -fn expand_partition_keys_group( +fn expand_partition_keys_group<'a>( group: &[usize], - filters: &[PartitionFilter], -) -> Result>> { + filters: &'a [PartitionFilter], +) -> Result>>> { let mut datum_by_columns = Vec::with_capacity(group.len()); for filter_idx in group { let filter = &filters[*filter_idx]; let datums = match &filter.condition { // Only `Eq` is supported now. // TODO: to support `In`'s extracting. - PartitionCondition::Eq(datum) => vec![datum.clone()], - PartitionCondition::In(datums) => datums.clone(), + PartitionCondition::Eq(datum) => vec![datum.as_view()], + PartitionCondition::In(datums) => datums.iter().map(Datum::as_view).collect_vec(), _ => { return Internal { msg: format!("invalid partition filter found, filter:{filter:?},"), @@ -242,36 +231,176 @@ fn expand_partition_keys_group( datum_by_columns.push(datums); } - let expanded_group = datum_by_columns + Ok(datum_by_columns .into_iter() .map(|filters| filters.into_iter()) - .multi_cartesian_product() - .collect_vec(); - Ok(expanded_group) + .multi_cartesian_product()) +} + +/// The adapter to implement [`std::io::Read`] over the partition keys, which is +/// used for computing hash. +struct PartitionKeysReadAdapter<'a, T> { + key_views: T, + /// The current key for reading bytes. + /// + /// It can be `None` if the whole current key is exhausted. + curr_key: Option>, + /// The offset in the serialized bytes from `curr_key`. + /// + /// This field has no meaning when the `curr_key` is `None`. + curr_key_offset: usize, +} + +impl<'a, T> PartitionKeysReadAdapter<'a, T> { + fn new(key_views: T) -> Self { + Self { + key_views, + curr_key: None, + curr_key_offset: 0, + } + } +} + +impl<'a, T> PartitionKeysReadAdapter<'a, T> +where + T: Iterator>, +{ + /// Read the serialized bytes from `curr_key` to fill the `buf` as much + /// as possible. + fn read_once(&mut self, buf: &mut [u8]) -> std::io::Result { + // Fetch the next key. + if self.curr_key.is_none() { + self.curr_key = self.key_views.next(); + self.curr_key_offset = 0; + } + + // The `key_views` has been exhausted. + if self.curr_key.is_none() { + return Ok(0); + } + + let datum_view = self.curr_key.as_ref().unwrap(); + let offset = self.curr_key_offset; + let mut n_bytes = 0; + let mut key_exhausted = false; + datum_view.do_with_bytes(|source: &[u8]| { + debug_assert!(!source.is_empty()); + debug_assert!(offset < source.len()); + + let end = (offset + buf.len()).min(source.len()); + let read_slice = &source[offset..end]; + buf[..read_slice.len()].copy_from_slice(read_slice); + + // Update the offset to the end. + self.curr_key_offset = end; + // Current key is exhausted. + key_exhausted = end == source.len(); + // Record the number of bytes that has been read. + n_bytes = read_slice.len(); + }); + + // Clear the current key if it is exhausted. + if key_exhausted { + self.curr_key = None; + } + + Ok(n_bytes) + } +} + +impl<'a, T> std::io::Read for PartitionKeysReadAdapter<'a, T> +where + T: Iterator>, +{ + /// This implementation will fill the whole buf as much as possible, and the + /// only scenario where the buf is not fully filled is that the serialized + /// bytes from the `self.key_views` are exhausted. + /// + /// NOTE: The best way is to fill the `buf` key after key rather than fill + /// the `buf` as much as possible. And an example can illustrate the reason + /// for this way, saying there are two `key_views`s, ['a', 'bc'], and + /// ['ab', 'c'], in the way to fill the buf as much as possible, the two + /// `key_views`s will output the same hash result, while the best way + /// can generate different results. However, here the best way is not + /// chosen for compatibility. + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let mut total_n_bytes = 0; + loop { + if total_n_bytes == buf.len() { + break; + } + debug_assert!(total_n_bytes < buf.len()); + + let next_buf = &mut buf[total_n_bytes..]; + let n_bytes = self.read_once(next_buf)?; + if n_bytes == 0 { + // No more data can be pulled. + break; + } + total_n_bytes += n_bytes; + } + + Ok(total_n_bytes) + } } // Compute partition -pub(crate) fn compute_partition( - partition_keys: &[&Datum], +pub(crate) fn compute_partition<'a>( + partition_keys: impl Iterator>, partition_num: usize, - buf: &mut BytesMut, ) -> usize { - buf.clear(); - partition_keys - .iter() - .for_each(|datum| buf.put_slice(&datum.to_bytes())); - - (hash64(buf) % (partition_num as u64)) as usize + let reader = PartitionKeysReadAdapter::new(partition_keys); + (hash64(reader) as usize) % partition_num } #[cfg(test)] mod tests { - use std::collections::BTreeSet; + use std::{collections::BTreeSet, io::Read}; - use common_types::{datum::DatumKind, string::StringBytes}; + use bytes_ext::{BufMut, BytesMut}; + use common_types::{ + datum::{Datum, DatumKind}, + string::StringBytes, + }; + use hash_ext::hash64; use super::*; + /// This test ensures the output of `read` method of + /// [PartitionKeysReadAdapter] to be the same as the provided `buf`'s length + /// as much as possible. + #[test] + fn test_partition_keys_read_adapter() { + let datums = vec![ + Datum::Int32(1), + Datum::String(StringBytes::copy_from_str( + "test_partition_keys_read_adapter_key0", + )), + Datum::String(StringBytes::copy_from_str( + "test_partition_keys_read_adapter_key1", + )), + Datum::Null, + ]; + let datum_views = datums.iter().map(Datum::as_view); + let mut adapter = PartitionKeysReadAdapter::new(datum_views); + let mut buf = vec![0; 16]; + let mut total_bytes = 0; + loop { + let n = adapter.read(&mut buf).unwrap(); + total_bytes += n; + if n < buf.len() { + // The reader has been exhausted. Let's check it. + assert_eq!(adapter.read(&mut buf).unwrap(), 0); + break; + } + + assert_eq!(n, buf.len()); + } + + let expect_total_bytes: usize = datums.iter().map(|v| v.size()).sum(); + assert_eq!(total_bytes, expect_total_bytes); + } + #[test] fn test_compute_partition_for_inserted_row() { let partition_num = 16; @@ -288,19 +417,19 @@ mod tests { Datum::Null, ]; let row = Row::from_datums(datums.clone()); - let defined_idxs = vec![1_usize, 2, 3, 4]; + let defined_idxs = vec![1, 2, 3, 4]; // Actual - let mut buf = BytesMut::new(); - let actual = key_rule.compute_partition_for_inserted_row(&row, &defined_idxs, &mut buf); + let actual = key_rule.compute_partition_for_inserted_row(&row, &defined_idxs); // Expected + let mut buf = BytesMut::new(); buf.clear(); buf.put_slice(&datums[1].to_bytes()); buf.put_slice(&datums[2].to_bytes()); buf.put_slice(&datums[3].to_bytes()); buf.put_slice(&datums[4].to_bytes()); - let expected = (hash64(&buf) % (partition_num as u64)) as usize; + let expected = (hash64(&buf[..]) % (partition_num as u64)) as usize; assert_eq!(actual, expected); } @@ -399,7 +528,10 @@ mod tests { let group = vec![0, 1, 2]; // Expanded group - let expanded_group = expand_partition_keys_group(&group, &filters).unwrap(); + let expanded_group = expand_partition_keys_group(&group, &filters) + .unwrap() + .map(|v| v.iter().map(|view| view.to_datum()).collect_vec()) + .collect_vec(); let expected = vec![ vec![Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)], vec![Datum::UInt32(1), Datum::UInt32(22), Datum::UInt32(3)],