Skip to content

Commit

Permalink
support lower_bound for iterator (tikv#2504)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjinpeng87 authored Dec 6, 2017
1 parent 67f068a commit 3a71a36
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 60 deletions.
36 changes: 21 additions & 15 deletions src/coprocessor/dag/executor/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,42 +48,48 @@ impl Scanner {
key_only: bool,
range: KeyRange,
) -> Result<Scanner> {
let (scan_mode, seek_key, upper_bound) = if desc {
(ScanMode::Backward, range.get_end().to_vec(), None)
let (scan_mode, seek_key) = if desc {
(ScanMode::Backward, range.get_end().to_vec())
} else {
(
ScanMode::Forward,
range.get_start().to_vec(),
Some(Key::from_raw(range.get_end()).encoded().to_vec()),
)
(ScanMode::Forward, range.get_start().to_vec())
};
let scanner = Self::range_scanner(store, scan_mode, key_only, &range)?;

Ok(Scanner {
scan_mode: scan_mode,
scan_on: scan_on,
key_only: key_only,
seek_key: seek_key,
scanner: store.scanner(scan_mode, key_only, upper_bound)?,
scanner: scanner,
range: range,
no_more: false,
statistics_cache: Statistics::default(),
})
}

fn range_scanner(
store: &SnapshotStore,
scan_mode: ScanMode,
key_only: bool,
range: &KeyRange,
) -> Result<StoreScanner> {
let lower_bound = Some(Key::from_raw(range.get_start()).encoded().to_vec());
let upper_bound = Some(Key::from_raw(range.get_end()).encoded().to_vec());
store.scanner(scan_mode, key_only, lower_bound, upper_bound)
}

pub fn reset_range(&mut self, range: KeyRange, store: &SnapshotStore) -> Result<()> {
self.range = range;
self.no_more = false;
match self.scan_mode {
ScanMode::Backward => self.seek_key = self.range.get_end().to_vec(),
ScanMode::Forward => {
// For forward scan, we need to re-initialize the scanner.
self.seek_key = self.range.get_start().to_vec();
self.statistics_cache.add(self.scanner.get_statistics());
let upper_bound = Some(Key::from_raw(&self.range.end).encoded().to_vec());
self.scanner = store.scanner(self.scan_mode, self.key_only, upper_bound)?;
}
ScanMode::Forward => self.seek_key = self.range.get_start().to_vec(),
_ => unreachable!(),
};

self.statistics_cache.add(self.scanner.get_statistics());
self.scanner = Self::range_scanner(store, self.scan_mode, self.key_only, &self.range)?;

Ok(())
}

Expand Down
75 changes: 61 additions & 14 deletions src/raftstore/coprocessor/region_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

use std::sync::Arc;
use std::cmp;
use rocksdb::{DBIterator, DBVector, SeekKey, TablePropertiesCollection, DB};
use kvproto::metapb::Region;

Expand Down Expand Up @@ -80,7 +81,8 @@ impl RegionSnapshot {
where
F: FnMut(&[u8], &[u8]) -> Result<bool>,
{
let iter_opt = IterOption::new(Some(end_key.to_vec()), fill_cache);
let iter_opt =
IterOption::new(Some(start_key.to_vec()), Some(end_key.to_vec()), fill_cache);
self.scan_impl(self.iter(iter_opt), start_key, f)
}

Expand All @@ -96,7 +98,8 @@ impl RegionSnapshot {
where
F: FnMut(&[u8], &[u8]) -> Result<bool>,
{
let iter_opt = IterOption::new(Some(end_key.to_vec()), fill_cache);
let iter_opt =
IterOption::new(Some(start_key.to_vec()), Some(end_key.to_vec()), fill_cache);
self.scan_impl(self.iter_cf(cf, iter_opt)?, start_key, f)
}

Expand Down Expand Up @@ -156,24 +159,43 @@ pub struct RegionIterator {
end_key: Vec<u8>,
}

fn set_upper_bound(iter_opt: IterOption, region: &Region) -> IterOption {
fn set_lower_bound(iter_opt: &mut IterOption, region: &Region) {
let region_start_key = keys::enc_start_key(region);
let lower_bound = match iter_opt.lower_bound() {
Some(k) if !k.is_empty() => {
let k = keys::data_key(k);
cmp::max(k, region_start_key)
}
_ => region_start_key,
};
iter_opt.set_lower_bound(lower_bound);
}

fn set_upper_bound(iter_opt: &mut IterOption, region: &Region) {
let region_end_key = keys::enc_end_key(region);
let upper_bound = match iter_opt.upper_bound() {
Some(k) if !k.is_empty() => keys::data_key(k),
_ => keys::enc_end_key(region),
Some(k) if !k.is_empty() => {
let k = keys::data_key(k);
cmp::min(k, region_end_key)
}
_ => region_end_key,
};
iter_opt.set_upper_bound(upper_bound)
iter_opt.set_upper_bound(upper_bound);
}

// we use rocksdb's style iterator, doesn't need to impl std iterator.
impl RegionIterator {
pub fn new(snap: &Snapshot, region: Arc<Region>, mut iter_opt: IterOption) -> RegionIterator {
iter_opt = set_upper_bound(iter_opt, &region);
set_lower_bound(&mut iter_opt, &region);
set_upper_bound(&mut iter_opt, &region);
let start_key = iter_opt.lower_bound().unwrap().to_vec();
let end_key = iter_opt.upper_bound().unwrap().to_vec();
let iter = snap.db_iterator(iter_opt);
RegionIterator {
iter: iter,
valid: false,
start_key: keys::enc_start_key(&region),
end_key: keys::enc_end_key(&region),
start_key: start_key,
end_key: end_key,
region: region,
}
}
Expand All @@ -184,13 +206,16 @@ impl RegionIterator {
mut iter_opt: IterOption,
cf: &str,
) -> RegionIterator {
iter_opt = set_upper_bound(iter_opt, &region);
set_lower_bound(&mut iter_opt, &region);
set_upper_bound(&mut iter_opt, &region);
let start_key = iter_opt.lower_bound().unwrap().to_vec();
let end_key = iter_opt.upper_bound().unwrap().to_vec();
let iter = snap.db_iterator_cf(cf, iter_opt).unwrap();
RegionIterator {
iter: iter,
valid: false,
start_key: keys::enc_start_key(&region),
end_key: keys::enc_end_key(&region),
start_key: start_key,
end_key: end_key,
region: region,
}
}
Expand Down Expand Up @@ -416,7 +441,7 @@ mod tests {
];
let upper_bounds: Vec<Option<&[u8]>> = vec![None, Some(b"a7")];
for upper_bound in upper_bounds {
let iter_opt = IterOption::new(upper_bound.map(|v| v.to_vec()), true);
let iter_opt = IterOption::new(None, upper_bound.map(|v| v.to_vec()), true);
let mut iter = snap.iter(iter_opt);
for (seek_key, in_range, seek_exp, prev_exp) in seek_table.clone() {
let check_res =
Expand Down Expand Up @@ -491,7 +516,7 @@ mod tests {
// test iterator with upper bound
let store = new_peer_storage(engine.clone(), raft_engine.clone(), &region);
let snap = RegionSnapshot::new(&store);
let mut iter = snap.iter(IterOption::new(Some(b"a5".to_vec()), true));
let mut iter = snap.iter(IterOption::new(None, Some(b"a5".to_vec()), true));
assert!(iter.seek_to_first());
let mut res = vec![];
loop {
Expand Down Expand Up @@ -592,4 +617,26 @@ mod tests {
expect.reverse();
assert_eq!(res, expect);
}

#[test]
fn test_reverse_iterate_with_lower_bound() {
let path = TempDir::new("test-raftstore").unwrap();
let (engine, raft_engine) = new_temp_engine(&path);
let (store, test_data) = load_default_dataset(engine.clone(), raft_engine.clone());

let snap = RegionSnapshot::new(&store);
let mut iter_opt = IterOption::default();
iter_opt.set_lower_bound(b"a3".to_vec());
let mut iter = snap.iter(iter_opt);
assert!(iter.seek_to_last());
let mut res = vec![];
loop {
res.push((iter.key().to_vec(), iter.value().to_vec()));
if !iter.prev() {
break;
}
}
res.sort();
assert_eq!(res, test_data[1..3].to_vec());
}
}
3 changes: 2 additions & 1 deletion src/raftstore/coprocessor/split_check/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ fn check_key(status: &mut TableStatus, current_data_key: &[u8]) -> Option<Vec<u8
}

fn last_key_of_region(db: &DB, region: &Region) -> Result<Option<Vec<u8>>> {
let start_key = keys::enc_start_key(region);
let end_key = keys::enc_end_key(region);
let mut last_key = None;

let iter_opt = IterOption::new(Some(end_key), false);
let iter_opt = IterOption::new(Some(start_key), Some(end_key), false);
let mut iter = box_try!(db.new_iterator_cf(CF_WRITE, iter_opt));

// the last key
Expand Down
31 changes: 26 additions & 5 deletions src/raftstore/store/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,21 @@ enum SeekMode {
}

pub struct IterOption {
lower_bound: Option<Vec<u8>>,
upper_bound: Option<Vec<u8>>,
prefix_same_as_start: bool,
fill_cache: bool,
seek_mode: SeekMode,
}

impl IterOption {
pub fn new(upper_bound: Option<Vec<u8>>, fill_cache: bool) -> IterOption {
pub fn new(
lower_bound: Option<Vec<u8>>,
upper_bound: Option<Vec<u8>>,
fill_cache: bool,
) -> IterOption {
IterOption {
lower_bound: lower_bound,
upper_bound: upper_bound,
prefix_same_as_start: false,
fill_cache: fill_cache,
Expand All @@ -208,15 +214,24 @@ impl IterOption {
self.seek_mode == SeekMode::TotalOrder
}

#[inline]
pub fn lower_bound(&self) -> Option<&[u8]> {
self.lower_bound.as_ref().map(|v| v.as_slice())
}

#[inline]
pub fn set_lower_bound(&mut self, bound: Vec<u8>) {
self.lower_bound = Some(bound);
}

#[inline]
pub fn upper_bound(&self) -> Option<&[u8]> {
self.upper_bound.as_ref().map(|v| v.as_slice())
}

#[inline]
pub fn set_upper_bound(mut self, bound: Vec<u8>) -> IterOption {
pub fn set_upper_bound(&mut self, bound: Vec<u8>) {
self.upper_bound = Some(bound);
self
}

#[inline]
Expand All @@ -233,6 +248,9 @@ impl IterOption {
} else if self.prefix_same_as_start {
opts.set_prefix_same_as_start(true);
}
if let Some(ref key) = self.lower_bound {
opts.set_iterate_lower_bound(key);
}
if let Some(ref key) = self.upper_bound {
opts.set_iterate_upper_bound(key);
}
Expand All @@ -243,6 +261,7 @@ impl IterOption {
impl Default for IterOption {
fn default() -> IterOption {
IterOption {
lower_bound: None,
upper_bound: None,
prefix_same_as_start: false,
fill_cache: true,
Expand All @@ -261,7 +280,8 @@ pub trait Iterable {
where
F: FnMut(&[u8], &[u8]) -> Result<bool>,
{
let iter_opt = IterOption::new(Some(end_key.to_vec()), fill_cache);
let iter_opt =
IterOption::new(Some(start_key.to_vec()), Some(end_key.to_vec()), fill_cache);
scan_impl(self.new_iterator(iter_opt), start_key, f)
}

Expand All @@ -277,7 +297,8 @@ pub trait Iterable {
where
F: FnMut(&[u8], &[u8]) -> Result<bool>,
{
let iter_opt = IterOption::new(Some(end_key.to_vec()), fill_cache);
let iter_opt =
IterOption::new(Some(start_key.to_vec()), Some(end_key.to_vec()), fill_cache);
scan_impl(self.new_iterator_cf(cf, iter_opt)?, start_key, f)
}

Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/store/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub fn delete_all_in_range(db: &DB, start_key: &[u8], end_key: &[u8]) -> Result<

pub fn delete_all_in_range_cf(db: &DB, cf: &str, start_key: &[u8], end_key: &[u8]) -> Result<()> {
let handle = rocksdb_util::get_cf_handle(db, cf)?;
let iter_opt = IterOption::new(Some(end_key.to_vec()), false);
let iter_opt = IterOption::new(Some(start_key.to_vec()), Some(end_key.to_vec()), false);
let mut it = db.new_iterator_cf(cf, iter_opt)?;
let mut wb = WriteBatch::new();
it.seek(start_key.into());
Expand Down
3 changes: 2 additions & 1 deletion src/raftstore/store/worker/split_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ impl<'a> MergedIterator<'a> {
let mut iters = Vec::with_capacity(cfs.len());
let mut heap = BinaryHeap::with_capacity(cfs.len());
for (pos, cf) in cfs.into_iter().enumerate() {
let iter_opt = IterOption::new(Some(end_key.to_vec()), fill_cache);
let iter_opt =
IterOption::new(Some(start_key.to_vec()), Some(end_key.to_vec()), fill_cache);
let mut iter = db.new_iterator_cf(cf, iter_opt)?;
if iter.seek(start_key.into()) {
heap.push(KeyEntry::new(iter.key().to_vec(), pos, iter.value().len()));
Expand Down
2 changes: 1 addition & 1 deletion src/server/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl MvccInfoIterator {

let gen_iter = |cf: &str| -> Result<_> {
let to = if to.is_empty() { None } else { Some(to) };
let readopts = IterOption::new(to.map(Vec::from), false).build_read_opts();
let readopts = IterOption::new(None, to.map(Vec::from), false).build_read_opts();
let handle = box_try!(get_cf_handle(db.as_ref(), cf));
let mut iter = DBIterator::new_cf(db.clone(), handle, readopts);
iter.seek(SeekKey::from(from));
Expand Down
Loading

0 comments on commit 3a71a36

Please sign in to comment.