diff --git a/components/br-stream/src/endpoint.rs b/components/br-stream/src/endpoint.rs index 7946c0ff6ca..d4da3de713e 100644 --- a/components/br-stream/src/endpoint.rs +++ b/components/br-stream/src/endpoint.rs @@ -294,7 +294,8 @@ where .register_task(task.clone(), ranges.clone()) .await { - err.report(format!("failed to register task {}", task.info.name)) + err.report(format!("failed to register task {}", task.info.name)); + return; } for (start_key, end_key) in ranges { let init = init.clone(); diff --git a/components/br-stream/src/observer.rs b/components/br-stream/src/observer.rs index ec034b2ceae..16cb32ad155 100644 --- a/components/br-stream/src/observer.rs +++ b/components/br-stream/src/observer.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, RwLock}; use crate::errors::Error; use crate::try_send; -use crate::utils::SegmentTree; +use crate::utils::SegmentSet; use dashmap::DashMap; use engine_traits::KvEngine; use kvproto::metapb::Region; @@ -24,7 +24,7 @@ pub struct BackupStreamObserver { scheduler: Scheduler, // Note: maybe wrap those fields to methods? pub subs: SubscriptionTracer, - pub ranges: Arc>>>, + pub ranges: Arc>>>, } impl BackupStreamObserver { diff --git a/components/br-stream/src/router.rs b/components/br-stream/src/router.rs index 939fc49d333..18657c459b0 100644 --- a/components/br-stream/src/router.rs +++ b/components/br-stream/src/router.rs @@ -1,6 +1,6 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, io, path::{Path, PathBuf}, result, @@ -18,7 +18,7 @@ use crate::{ errors::Error, metadata::StreamTask, metrics::SKIP_KV_COUNTER, - utils::{self, SlotMap}, + utils::{self, SegmentMap, SlotMap}, }; use super::errors::Result; @@ -231,7 +231,7 @@ pub struct RouterInner { /// It uses the `start_key` of range as the key. /// Given there isn't overlapping, we can simply use binary search to find /// which range a point belongs to. - ranges: SyncRwLock>, + ranges: SyncRwLock, String>>, /// The temporary files associated to some task. tasks: Mutex>>, /// The temporary directory for all tasks. @@ -256,7 +256,7 @@ impl std::fmt::Debug for RouterInner { impl RouterInner { pub fn new(prefix: PathBuf, scheduler: Scheduler, temp_file_size_limit: u64) -> Self { RouterInner { - ranges: SyncRwLock::new(BTreeMap::default()), + ranges: SyncRwLock::new(SegmentMap::default()), tasks: Mutex::new(HashMap::default()), prefix, scheduler, @@ -276,18 +276,13 @@ impl RouterInner { let mut w = self.ranges.write().unwrap(); for range in ranges { - let key_range = KeyRange(range.0); - let task_range = TaskRange { - end: range.1, - task_name: task_name.to_string(), - }; debug!( "backup stream register observe range"; "task_name" => task_name, - "start_key" => &log_wrappers::Value::key(&key_range.0), - "end_key" => &log_wrappers::Value::key(&task_range.end), + "start_key" => utils::redact(&range.0), + "end_key" => utils::redact(&range.1), ); - w.insert(key_range, task_range); + w.insert(range, task_name.to_owned()); } } @@ -299,38 +294,24 @@ impl RouterInner { ) -> Result<()> { let task_name = task.info.take_name(); - // register ragnes - self.register_ranges(&task_name, ranges); - // register task info let prefix_path = self.prefix.join(&task_name); let stream_task = StreamTaskInfo::new(prefix_path, task).await?; - - let _ = self - .tasks + self.tasks .lock() .await - .insert(task_name, Arc::new(stream_task)); + .insert(task_name.clone(), Arc::new(stream_task)); + + // register ragnes + self.register_ranges(&task_name, ranges); + Ok(()) } /// get the task name by a key. pub fn get_task_by_key(&self, key: &[u8]) -> Option { - // TODO avoid key.to_vec() let r = self.ranges.read().unwrap(); - let k = &KeyRange(key.to_vec()); - r.range(..k) - .next_back() - .filter(|r| key <= &r.1.end[..] && key >= &r.0.0[..]) - .map_or_else( - || { - r.range(k..) - .next() - .filter(|r| key <= &r.1.end[..] && key >= &r.0.0[..]) - .map(|r| r.1.task_name.clone()) - }, - |r| Some(r.1.task_name.clone()), - ) + r.get_value_by_point(key).cloned() } pub async fn get_task_info(&self, task_name: &str) -> Result> { diff --git a/components/br-stream/src/utils.rs b/components/br-stream/src/utils.rs index 3b0c1433b56..89d16d13e90 100644 --- a/components/br-stream/src/utils.rs +++ b/components/br-stream/src/utils.rs @@ -140,48 +140,90 @@ impl<'a, T: ?Sized> RangeBounds for RangeToInclusiveRef<'a, T> { } } +#[derive(Default, Debug, Clone)] +pub struct SegmentMap(BTreeMap>); + +#[derive(Clone, Debug)] +pub struct SegmentValue { + range_end: R, + item: T, +} + /// A container for holding ranges without overlapping. /// supports fast(`O(log(n))`) query of overlapping and points in segments. /// /// Maybe replace it with extended binary search tree or the real segment tree? /// So it can contains overlapping segments. -#[derive(Default)] -pub struct SegmentTree(BTreeMap); +pub type SegmentSet = SegmentMap; -impl SegmentTree { - /// Try to add a element into the segment tree. +impl SegmentMap { + /// Try to add a element into the segment tree, with default value. + /// (This is useful when using the segment tree as a `Set`, i.e. `SegmentMap`) /// /// - If no overlapping, insert the range into the tree and returns `true`. /// - If overlapping detected, do nothing and return `false`. - pub fn add(&mut self, (start, end): (T, T)) -> bool { + pub fn add(&mut self, (start, end): (K, K)) -> bool { + self.insert((start, end), V::default()) + } +} + +impl SegmentMap { + /// Like `add`, but insert a value associated to the key. + pub fn insert(&mut self, (start, end): (K, K), value: V) -> bool { if self.is_overlapping((&start, &end)) { return false; } - self.0.insert(start, end); + self.0.insert( + start, + SegmentValue { + range_end: end, + item: value, + }, + ); true } - pub fn get_interval_by_point(&self, point: &R) -> Option<(&T, &T)> + /// Find a segment with its associated value by the point. + pub fn get_by_point(&self, point: &R) -> Option<(&K, &K, &V)> where - T: Borrow, + K: Borrow, R: Ord + ?Sized, { self.0 .range(RangeToInclusiveRef(point)) .next_back() - .filter(|(_, end)| >::borrow(end) > point) + .filter(|(_, end)| >::borrow(&end.range_end) > point) + .map(|(k, v)| (k, &v.range_end, &v.item)) + } + + /// Like `get_by_point`, but omit the segment. + pub fn get_value_by_point(&self, point: &R) -> Option<&V> + where + K: Borrow, + R: Ord + ?Sized, + { + self.get_by_point(point).map(|(_, _, v)| v) + } + + /// Like `get_by_point`, but omit the segment. + pub fn get_interval_by_point(&self, point: &R) -> Option<(&K, &K)> + where + K: Borrow, + R: Ord + ?Sized, + { + self.get_by_point(point).map(|(k, v, _)| (k, v)) } + /// Check whether the range is overlapping with any range in the segment tree. pub fn is_overlapping(&self, range: (&R, &R)) -> bool where - T: Borrow, + K: Borrow, R: Ord + ?Sized, { self.get_interval_by_point(range.0).is_some() || self .get_interval_by_point(range.1) - .map(|rng| >::borrow(rng.0) != range.1) - .unwrap_or(false) + .map_or(false, |rng| >::borrow(rng.0) != range.1) } } @@ -232,15 +274,20 @@ macro_rules! try_send { #[cfg(test)] mod test { - use super::SegmentTree; + use crate::utils::SegmentMap; #[test] fn test_segment_tree() { - let mut tree = SegmentTree::default(); + let mut tree = SegmentMap::default(); assert!(tree.add((1, 4))); assert!(tree.add((4, 8))); assert!(tree.add((42, 46))); assert!(!tree.add((3, 8))); + assert!(tree.insert((47, 88), "hello".to_owned())); + assert_eq!( + tree.get_value_by_point(&49).map(String::as_str), + Some("hello") + ); assert_eq!(tree.get_interval_by_point(&3), Some((&1, &4))); assert_eq!(tree.get_interval_by_point(&7), Some((&4, &8))); assert_eq!(tree.get_interval_by_point(&90), None);