Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added SegmentMap to replace the BTreeMap #16

Merged
merged 4 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion components/br-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ where
.register_task(task.clone(), ranges.clone())
.await
{
err.report(format!("failed to register task {}", task.info.name))
err.report(format!("failed to register task {}", task.info.name));
return;
}
for (start_key, end_key) in ranges {
let init = init.clone();
Expand Down
4 changes: 2 additions & 2 deletions components/br-stream/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::{Arc, RwLock};

use crate::errors::Error;
use crate::try_send;
use crate::utils::SegmentTree;
use crate::utils::SegmentSet;
use dashmap::DashMap;
use engine_traits::KvEngine;
use kvproto::metapb::Region;
Expand All @@ -24,7 +24,7 @@ pub struct BackupStreamObserver {
scheduler: Scheduler<Task>,
// Note: maybe wrap those fields to methods?
pub subs: SubscriptionTracer,
pub ranges: Arc<RwLock<SegmentTree<Vec<u8>>>>,
pub ranges: Arc<RwLock<SegmentSet<Vec<u8>>>>,
}

impl BackupStreamObserver {
Expand Down
47 changes: 14 additions & 33 deletions components/br-stream/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use std::{
collections::{BTreeMap, HashMap},
collections::HashMap,
io,
path::{Path, PathBuf},
result,
Expand All @@ -18,7 +18,7 @@ use crate::{
errors::Error,
metadata::StreamTask,
metrics::SKIP_KV_COUNTER,
utils::{self, SlotMap},
utils::{self, SegmentMap, SlotMap},
};

use super::errors::Result;
Expand Down Expand Up @@ -231,7 +231,7 @@ pub struct RouterInner {
/// It uses the `start_key` of range as the key.
/// Given there isn't overlapping, we can simply use binary search to find
/// which range a point belongs to.
ranges: SyncRwLock<BTreeMap<KeyRange, TaskRange>>,
ranges: SyncRwLock<SegmentMap<Vec<u8>, String>>,
/// The temporary files associated to some task.
tasks: Mutex<HashMap<String, Arc<StreamTaskInfo>>>,
/// The temporary directory for all tasks.
Expand All @@ -256,7 +256,7 @@ impl std::fmt::Debug for RouterInner {
impl RouterInner {
pub fn new(prefix: PathBuf, scheduler: Scheduler<Task>, temp_file_size_limit: u64) -> Self {
RouterInner {
ranges: SyncRwLock::new(BTreeMap::default()),
ranges: SyncRwLock::new(SegmentMap::default()),
tasks: Mutex::new(HashMap::default()),
prefix,
scheduler,
Expand All @@ -276,18 +276,13 @@ impl RouterInner {

let mut w = self.ranges.write().unwrap();
for range in ranges {
let key_range = KeyRange(range.0);
let task_range = TaskRange {
end: range.1,
task_name: task_name.to_string(),
};
debug!(
"backup stream register observe range";
"task_name" => task_name,
"start_key" => &log_wrappers::Value::key(&key_range.0),
"end_key" => &log_wrappers::Value::key(&task_range.end),
"start_key" => utils::redact(&range.0),
"end_key" => utils::redact(&range.1),
);
w.insert(key_range, task_range);
w.insert(range, task_name.to_owned());
}
}

Expand All @@ -299,38 +294,24 @@ impl RouterInner {
) -> Result<()> {
let task_name = task.info.take_name();

// register ragnes
self.register_ranges(&task_name, ranges);

// register task info
let prefix_path = self.prefix.join(&task_name);
let stream_task = StreamTaskInfo::new(prefix_path, task).await?;

let _ = self
.tasks
self.tasks
.lock()
.await
.insert(task_name, Arc::new(stream_task));
.insert(task_name.clone(), Arc::new(stream_task));

// register ragnes
self.register_ranges(&task_name, ranges);

Ok(())
}

/// get the task name by a key.
pub fn get_task_by_key(&self, key: &[u8]) -> Option<String> {
// TODO avoid key.to_vec()
let r = self.ranges.read().unwrap();
let k = &KeyRange(key.to_vec());
r.range(..k)
.next_back()
.filter(|r| key <= &r.1.end[..] && key >= &r.0.0[..])
.map_or_else(
|| {
r.range(k..)
.next()
.filter(|r| key <= &r.1.end[..] && key >= &r.0.0[..])
.map(|r| r.1.task_name.clone())
},
|r| Some(r.1.task_name.clone()),
)
r.get_value_by_point(key).cloned()
}

pub async fn get_task_info(&self, task_name: &str) -> Result<Arc<StreamTaskInfo>> {
Expand Down
75 changes: 61 additions & 14 deletions components/br-stream/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,48 +140,90 @@ impl<'a, T: ?Sized> RangeBounds<T> for RangeToInclusiveRef<'a, T> {
}
}

#[derive(Default, Debug, Clone)]
pub struct SegmentMap<K: Ord, V>(BTreeMap<K, SegmentValue<K, V>>);

#[derive(Clone, Debug)]
pub struct SegmentValue<R, T> {
range_end: R,
item: T,
}

/// A container for holding ranges without overlapping.
/// supports fast(`O(log(n))`) query of overlapping and points in segments.
///
/// Maybe replace it with extended binary search tree or the real segment tree?
/// So it can contains overlapping segments.
#[derive(Default)]
pub struct SegmentTree<T: Ord>(BTreeMap<T, T>);
pub type SegmentSet<T> = SegmentMap<T, ()>;

impl<T: Ord + std::fmt::Debug> SegmentTree<T> {
/// Try to add a element into the segment tree.
impl<K: Ord, V: Default> SegmentMap<K, V> {
/// Try to add a element into the segment tree, with default value.
/// (This is useful when using the segment tree as a `Set`, i.e. `SegmentMap<T, ()>`)
///
/// - If no overlapping, insert the range into the tree and returns `true`.
/// - If overlapping detected, do nothing and return `false`.
pub fn add(&mut self, (start, end): (T, T)) -> bool {
pub fn add(&mut self, (start, end): (K, K)) -> bool {
self.insert((start, end), V::default())
}
}

impl<K: Ord, V> SegmentMap<K, V> {
/// Like `add`, but insert a value associated to the key.
pub fn insert(&mut self, (start, end): (K, K), value: V) -> bool {
if self.is_overlapping((&start, &end)) {
return false;
}
self.0.insert(start, end);
self.0.insert(
start,
SegmentValue {
range_end: end,
item: value,
},
);
true
}

pub fn get_interval_by_point<R>(&self, point: &R) -> Option<(&T, &T)>
/// Find a segment with its associated value by the point.
pub fn get_by_point<R>(&self, point: &R) -> Option<(&K, &K, &V)>
where
T: Borrow<R>,
K: Borrow<R>,
R: Ord + ?Sized,
{
self.0
.range(RangeToInclusiveRef(point))
.next_back()
.filter(|(_, end)| <T as Borrow<R>>::borrow(end) > point)
.filter(|(_, end)| <K as Borrow<R>>::borrow(&end.range_end) > point)
.map(|(k, v)| (k, &v.range_end, &v.item))
}

/// Like `get_by_point`, but omit the segment.
pub fn get_value_by_point<R>(&self, point: &R) -> Option<&V>
where
K: Borrow<R>,
R: Ord + ?Sized,
{
self.get_by_point(point).map(|(_, _, v)| v)
}

/// Like `get_by_point`, but omit the segment.
pub fn get_interval_by_point<R>(&self, point: &R) -> Option<(&K, &K)>
where
K: Borrow<R>,
R: Ord + ?Sized,
{
self.get_by_point(point).map(|(k, v, _)| (k, v))
}

/// Check whether the range is overlapping with any range in the segment tree.
pub fn is_overlapping<R>(&self, range: (&R, &R)) -> bool
where
T: Borrow<R>,
K: Borrow<R>,
R: Ord + ?Sized,
{
self.get_interval_by_point(range.0).is_some()
|| self
.get_interval_by_point(range.1)
.map(|rng| <T as Borrow<R>>::borrow(rng.0) != range.1)
.unwrap_or(false)
.map_or(false, |rng| <K as Borrow<R>>::borrow(rng.0) != range.1)
}
}

Expand Down Expand Up @@ -232,15 +274,20 @@ macro_rules! try_send {

#[cfg(test)]
mod test {
use super::SegmentTree;
use crate::utils::SegmentMap;

#[test]
fn test_segment_tree() {
let mut tree = SegmentTree::default();
let mut tree = SegmentMap::default();
assert!(tree.add((1, 4)));
assert!(tree.add((4, 8)));
assert!(tree.add((42, 46)));
assert!(!tree.add((3, 8)));
assert!(tree.insert((47, 88), "hello".to_owned()));
assert_eq!(
tree.get_value_by_point(&49).map(String::as_str),
Some("hello")
);
assert_eq!(tree.get_interval_by_point(&3), Some((&1, &4)));
assert_eq!(tree.get_interval_by_point(&7), Some((&4, &8)));
assert_eq!(tree.get_interval_by_point(&90), None);
Expand Down