Skip to content

Commit

Permalink
fix gc keys doesn't work
Browse files Browse the repository at this point in the history
Signed-off-by: Connor1996 <zbk602423539@gmail.com>
  • Loading branch information
Connor1996 committed Nov 4, 2021
1 parent 7b2e71a commit a0825b1
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 42 deletions.
2 changes: 1 addition & 1 deletion components/raftstore/src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use self::dispatcher::{
};
pub use self::error::{Error, Result};
pub use self::region_info_accessor::{
Callback as RegionInfoCallback, RegionCollector, RegionInfo, RegionInfoAccessor,
Callback as RegionInfoCallback, RangeKey, RegionCollector, RegionInfo, RegionInfoAccessor,
RegionInfoProvider, SeekRegionCallback,
};
pub use self::split_check::{
Expand Down
137 changes: 118 additions & 19 deletions components/raftstore/src/coprocessor/region_info_accessor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::collections::Bound::{Excluded, Included, Unbounded};
use std::fmt::{Display, Formatter, Result as FmtResult};
Expand All @@ -13,7 +14,6 @@ use super::{
};
use collections::HashMap;
use engine_traits::KvEngine;
use keys::{data_end_key, data_key};
use kvproto::metapb::Region;
use raft::StateRole;
use tikv_util::worker::{Builder as WorkerBuilder, Runnable, RunnableWithTimer, Scheduler, Worker};
Expand Down Expand Up @@ -67,7 +67,80 @@ impl RegionInfo {
}

type RegionsMap = HashMap<u64, RegionInfo>;
type RegionRangesMap = BTreeMap<Vec<u8>, u64>;
type RegionRangesMap = BTreeMap<RangeKey, u64>;

// RangeKey is a wrapper used to unify the comparsion between region start key
// and region end key. Region end key is special as empty stands for the infinite,
// so we need to take special care for cases where the end key is empty.
#[derive(Clone, Debug)]
pub struct RangeKey {
key: Vec<u8>,
is_end_key: bool,
}

impl RangeKey {
pub fn from_start_key(key: Vec<u8>) -> Self {
Self {
key: key.to_vec(),
is_end_key: false,
}
}

pub fn from_end_key(key: Vec<u8>) -> Self {
Self {
key: key.to_vec(),
is_end_key: true,
}
}
}

impl PartialEq for RangeKey {
fn eq(&self, other: &Self) -> bool {
if self.key.is_empty() && other.key.is_empty() {
self.is_end_key == other.is_end_key
} else {
self.key.eq(&other.key)
}
}
}

impl Eq for RangeKey {}

impl Ord for RangeKey {
fn cmp(&self, other: &Self) -> Ordering {
if self.is_end_key && other.is_end_key {
if self.key.is_empty() && other.key.is_empty() {
Ordering::Equal
} else if self.key.is_empty() {
Ordering::Greater
} else if other.key.is_empty() {
Ordering::Less
} else {
self.key.cmp(&other.key)
}
} else if self.is_end_key && !other.is_end_key {
if self.key.is_empty() {
Ordering::Greater
} else {
self.key.cmp(&other.key)
}
} else if !self.is_end_key && other.is_end_key {
if other.key.is_empty() {
Ordering::Less
} else {
self.key.cmp(&other.key)
}
} else {
self.key.cmp(&other.key)
}
}
}

impl PartialOrd for RangeKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

pub type Callback<T> = Box<dyn FnOnce(T) + Send>;
pub type SeekRegionCallback = Box<dyn FnOnce(&mut dyn Iterator<Item = &RegionInfo>) + Send>;
Expand Down Expand Up @@ -186,7 +259,7 @@ impl RegionCollector {
}

pub fn create_region(&mut self, region: Region, role: StateRole) {
let end_key = data_end_key(region.get_end_key());
let end_key = RangeKey::from_end_key(region.get_end_key().to_vec());
let region_id = region.get_id();

// Create new region
Expand All @@ -212,13 +285,13 @@ impl RegionCollector {
if old_region.get_end_key() != region.get_end_key() {
// The region's end_key has changed.
// Remove the old entry in `self.region_ranges`.
let old_end_key = data_end_key(old_region.get_end_key());
let old_end_key = RangeKey::from_end_key(old_region.get_end_key().to_vec());

let old_id = self.region_ranges.remove(&old_end_key).unwrap();
assert_eq!(old_id, region.get_id());

// Insert new entry to `region_ranges`.
let end_key = data_end_key(region.get_end_key());
let end_key = RangeKey::from_end_key(region.get_end_key().to_vec());
assert!(
self.region_ranges
.insert(end_key, region.get_id())
Expand Down Expand Up @@ -262,7 +335,7 @@ impl RegionCollector {
let removed_region = removed_region_info.region;
assert_eq!(removed_region.get_id(), region.get_id());

let end_key = data_end_key(removed_region.get_end_key());
let end_key = RangeKey::from_end_key(removed_region.get_end_key().to_vec());

let removed_id = self.region_ranges.remove(&end_key).unwrap();
assert_eq!(removed_id, region.get_id());
Expand Down Expand Up @@ -323,10 +396,10 @@ impl RegionCollector {

let mut stale_regions_in_range = vec![];

for (key, id) in self
.region_ranges
.range((Excluded(data_key(region.get_start_key())), Unbounded))
{
for (key, id) in self.region_ranges.range((
Excluded(RangeKey::from_start_key(region.get_start_key().to_vec())),
Unbounded,
)) {
if *id == region.get_id() {
continue;
}
Expand Down Expand Up @@ -363,10 +436,9 @@ impl RegionCollector {
}

pub fn handle_seek_region(&self, from_key: Vec<u8>, callback: SeekRegionCallback) {
let from_key = data_key(&from_key);
let mut iter = self
.region_ranges
.range((Excluded(from_key), Unbounded))
.range((Excluded(RangeKey::from_start_key(from_key)), Unbounded))
.map(|(_, region_id)| &self.regions[region_id]);
callback(&mut iter)
}
Expand All @@ -381,12 +453,16 @@ impl RegionCollector {
end_key: Vec<u8>,
callback: Callback<Vec<Region>>,
) {
let end_key = RangeKey::from_end_key(end_key);
let mut regions = vec![];
for (_, region_id) in self
.region_ranges
.range((Included(start_key), Included(end_key)))
.range((Excluded(RangeKey::from_start_key(start_key)), Unbounded))
{
let region_info = &self.regions[region_id];
if RangeKey::from_start_key(region_info.region.get_start_key().to_vec()) >= end_key {
break;
}
regions.push(region_info.region.clone());
}
callback(regions);
Expand Down Expand Up @@ -654,7 +730,7 @@ mod tests {
fn check_collection(c: &RegionCollector, regions: &[(Region, StateRole)]) {
let region_ranges: Vec<_> = regions
.iter()
.map(|(r, _)| (data_end_key(r.get_end_key()), r.get_id()))
.map(|(r, _)| (RangeKey::from_end_key(r.get_end_key().to_vec()), r.get_id()))
.collect();

let mut is_regions_equal = c.regions.len() == regions.len();
Expand Down Expand Up @@ -719,7 +795,7 @@ mod tests {

assert_eq!(&c.regions[&region.get_id()].region, region);
assert_eq!(
c.region_ranges[&data_end_key(region.get_end_key())],
c.region_ranges[&RangeKey::from_end_key(region.get_end_key().to_vec())],
region.get_id()
);
}
Expand All @@ -738,11 +814,12 @@ mod tests {
if let Some(r) = c.regions.get(&region.get_id()) {
assert_eq!(r.region, *region);
assert_eq!(
c.region_ranges[&data_end_key(region.get_end_key())],
c.region_ranges[&RangeKey::from_end_key(region.get_end_key().to_vec())],
region.get_id()
);
} else {
let another_region_id = c.region_ranges[&data_end_key(region.get_end_key())];
let another_region_id =
c.region_ranges[&RangeKey::from_end_key(region.get_end_key().to_vec())];
let version = c.regions[&another_region_id]
.region
.get_region_epoch()
Expand All @@ -755,7 +832,7 @@ mod tests {
if old_end_key.as_slice() != region.get_end_key() {
assert!(
c.region_ranges
.get(&data_end_key(&old_end_key))
.get(&RangeKey::from_end_key(old_end_key))
.map_or(true, |id| *id != region.get_id())
);
}
Expand All @@ -774,7 +851,7 @@ mod tests {
if let Some(end_key) = end_key {
assert!(
c.region_ranges
.get(&data_end_key(&end_key))
.get(&RangeKey::from_end_key(end_key))
.map_or(true, |r| *r != id)
);
}
Expand All @@ -791,6 +868,28 @@ mod tests {
}
}

#[test]
fn test_range_key() {
let a = RangeKey::from_start_key(b"".to_vec());
let b = RangeKey::from_start_key(b"".to_vec());
let c = RangeKey::from_end_key(b"a".to_vec());
let d = RangeKey::from_start_key(b"a".to_vec());
let e = RangeKey::from_start_key(b"d".to_vec());
let f = RangeKey::from_end_key(b"f".to_vec());
let g = RangeKey::from_end_key(b"u".to_vec());
let h = RangeKey::from_end_key(b"".to_vec());

assert!(a == b);
assert!(a < c);
assert!(a != h);
assert!(c == d);
assert!(d < e);
assert!(e < f);
assert!(f < g);
assert!(g < h);
assert!(h > g);
}

#[test]
fn test_ignore_invalid_version() {
let mut c = RegionCollector::new();
Expand Down
10 changes: 5 additions & 5 deletions components/raftstore/src/store/compaction_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use engine_traits::{
CfName, SstPartitioner, SstPartitionerContext, SstPartitionerFactory, SstPartitionerRequest,
SstPartitionerResult, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE,
};
use keys::data_end_key;
use keys::{data_end_key, origin_key};
use lazy_static::lazy_static;
use tikv_util::warn;

Expand Down Expand Up @@ -91,10 +91,10 @@ pub struct CompactionGuardGenerator<P: RegionInfoProvider> {

impl<P: RegionInfoProvider> CompactionGuardGenerator<P> {
fn initialize(&mut self) {
self.use_guard = match self
.provider
.get_regions_in_range(&self.smallest_key, &self.largest_key)
{
self.use_guard = match self.provider.get_regions_in_range(
origin_key(&self.smallest_key),
origin_key(&self.largest_key),
) {
Ok(regions) => {
// The regions returned from region_info_provider should have been sorted,
// but we sort it again just in case.
Expand Down
13 changes: 6 additions & 7 deletions src/server/gc_worker/gc_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,10 @@ where
fn next(&mut self) -> Option<Key> {
loop {
let region = self.regions.peek()?;
let key = self.keys.peek()?;
let data_key = keys::data_key(key.as_encoded());
if data_key.as_slice() < region.get_start_key() {
let key = self.keys.peek()?.as_encoded().as_slice();
if key < region.get_start_key() {
self.keys.next();
} else if data_key.as_slice() < region.get_end_key() {
} else if region.get_end_key().is_empty() || key < region.get_end_key() {
return self.keys.next();
} else {
self.regions.next();
Expand Down Expand Up @@ -1415,11 +1414,11 @@ mod tests {

let mut r1 = Region::default();
r1.set_start_key(b"".to_vec());
r1.set_end_key(format!("zk{:02}", 10).into_bytes());
r1.set_end_key(format!("k{:02}", 10).into_bytes());

let mut r2 = Region::default();
r2.set_start_key(format!("zk{:02}", 20).into_bytes());
r2.set_end_key(format!("zk{:02}", 30).into_bytes());
r2.set_start_key(format!("k{:02}", 20).into_bytes());
r2.set_end_key(format!("k{:02}", 30).into_bytes());
r2.mut_peers().push(Peer::default());
r2.mut_peers()[0].set_store_id(1);

Expand Down
8 changes: 5 additions & 3 deletions tests/integrations/raftstore/test_region_info_accessor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use keys::data_end_key;
use kvproto::metapb::Region;
use raft::StateRole;
use raftstore::coprocessor::{RegionInfo, RegionInfoAccessor};
use raftstore::coprocessor::{RangeKey, RegionInfo, RegionInfoAccessor};
use raftstore::store::util::{find_peer, new_peer};
use std::sync::mpsc::channel;
use std::sync::Arc;
Expand All @@ -20,7 +19,10 @@ fn dump(c: &RegionInfoAccessor) -> Vec<(Region, StateRole)> {
let mut res = Vec::new();
for (end_key, id) in region_ranges {
let RegionInfo { ref region, role } = regions[&id];
assert_eq!(end_key, data_end_key(region.get_end_key()));
assert_eq!(
end_key,
RangeKey::from_end_key(region.get_end_key().to_vec())
);
assert_eq!(id, region.get_id());
res.push((region.clone(), role));
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

mod test_raft_storage;
mod test_raftkv;
mod test_seek_region;
mod test_region_info_accessor;
mod test_storage;
mod test_titan;
Loading

0 comments on commit a0825b1

Please sign in to comment.