Skip to content

Commit

Permalink
Fix the issue that end key of a partial rawkv-restore range is inclus…
Browse files Browse the repository at this point in the history
…ive (tikv#7196) (tikv#9583)


cherry-pick tikv#7196 to release-4.0

---

<!--
Thank you for contributing to TiKV!

If you haven't already, please read TiKV's [CONTRIBUTING](https://github.com/tikv/tikv/blob/master/CONTRIBUTING.md) document.

If you're unsure about anything, just ask; somebody should be along to answer within a day or two.
-->

###  What have you changed?

Partially fixes tikv#7163 , but it still doesn't work for restoring plain transactional kv (without TiDB).

This PR adds is_raw_kv field to DownloadRequest to distinguish rawkv restore requests, and adds end_key_exclusive field to SstMeta so that TiKV knows whether the endkey need to be included.l

###  What is the type of the changes?

- Bugfix (a change which fixes an issue)

###  How is the PR tested?

- Unit test
- Manual test (add detailed scripts or steps below)

###  Does this PR affect documentation (docs) or should it be mentioned in the release notes?

(I'm not sure if it's necessary)
* Fix the issue that when restoring partial of backed up raw kv data, TiKV may write out-of-range keys.

###  Does this PR affect `tidb-ansible`?

###  Refer to a related PR or issue link (optional)

- [x] kvproto pingcap/kvproto#581
- [ ] Corresponding PR of BR

### Release note

* No release note
  • Loading branch information
MyonKeminta authored and gengliqi committed Feb 19, 2021
1 parent c680780 commit 3eaa05b
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 31 deletions.
250 changes: 220 additions & 30 deletions components/sst_importer/src/sst_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,26 +266,30 @@ impl SSTImporter {

let range_start = meta.get_range().get_start();
let range_end = meta.get_range().get_end();
let range_start_bound = key_to_bound(range_start);
let range_end_bound = if meta.get_end_key_exclusive() {
key_to_exclusive_bound(range_end)
} else {
key_to_bound(range_end)
};

let range_start = keys::rewrite::rewrite_prefix_of_start_bound(
new_prefix,
old_prefix,
key_to_bound(range_start),
)
.map_err(|_| {
Error::WrongKeyPrefix("SST start range", range_start.to_vec(), new_prefix.to_vec())
})?;
let range_end = keys::rewrite::rewrite_prefix_of_end_bound(
new_prefix,
old_prefix,
key_to_bound(range_end),
)
.map_err(|_| {
Error::WrongKeyPrefix("SST end range", range_end.to_vec(), new_prefix.to_vec())
})?;
let range_start =
keys::rewrite::rewrite_prefix_of_start_bound(new_prefix, old_prefix, range_start_bound)
.map_err(|_| {
Error::WrongKeyPrefix(
"SST start range",
range_start.to_vec(),
new_prefix.to_vec(),
)
})?;
let range_end =
keys::rewrite::rewrite_prefix_of_end_bound(new_prefix, old_prefix, range_end_bound)
.map_err(|_| {
Error::WrongKeyPrefix("SST end range", range_end.to_vec(), new_prefix.to_vec())
})?;

let start_rename_rewrite = Instant::now();
// read and first and last keys from the SST, determine if we could
// read the first and last keys from the SST, determine if we could
// simply move the entire SST instead of iterating and generate a new one.
let mut iter = sst_reader.iter();
let direct_retval = (|| -> Result<Option<_>> {
Expand Down Expand Up @@ -889,19 +893,27 @@ fn key_to_bound(key: &[u8]) -> Bound<&[u8]> {
}
}

fn is_before_start_bound(value: &[u8], bound: &Bound<Vec<u8>>) -> bool {
fn key_to_exclusive_bound(key: &[u8]) -> Bound<&[u8]> {
if key.is_empty() {
Bound::Unbounded
} else {
Bound::Excluded(key)
}
}

fn is_before_start_bound<K: AsRef<[u8]>>(value: &[u8], bound: &Bound<K>) -> bool {
match bound {
Bound::Unbounded => false,
Bound::Included(b) => *value < **b,
Bound::Excluded(b) => *value <= **b,
Bound::Included(b) => *value < *b.as_ref(),
Bound::Excluded(b) => *value <= *b.as_ref(),
}
}

fn is_after_end_bound(value: &[u8], bound: &Bound<Vec<u8>>) -> bool {
fn is_after_end_bound<K: AsRef<[u8]>>(value: &[u8], bound: &Bound<K>) -> bool {
match bound {
Bound::Unbounded => false,
Bound::Included(b) => *value > **b,
Bound::Excluded(b) => *value >= **b,
Bound::Included(b) => *value > *b.as_ref(),
Bound::Excluded(b) => *value >= *b.as_ref(),
}
}

Expand All @@ -922,7 +934,7 @@ mod tests {
};
use tempfile::Builder;
use test_sst_importer::{
new_sst_reader, new_sst_writer, new_test_engine, PROP_TEST_MARKER_CF_NAME,
new_sst_reader, new_sst_writer, new_test_engine, RocksSstWriter, PROP_TEST_MARKER_CF_NAME,
};
use txn_types::{Value, WriteType};

Expand Down Expand Up @@ -1105,15 +1117,16 @@ mod tests {
assert_eq!(meta, new_meta);
}

fn create_sample_external_sst_file() -> Result<(tempfile::TempDir, StorageBackend, SstMeta)> {
fn create_external_sst_file_with_write_fn<F>(
write_fn: F,
) -> Result<(tempfile::TempDir, StorageBackend, SstMeta)>
where
F: FnOnce(&mut RocksSstWriter) -> Result<()>,
{
let ext_sst_dir = tempfile::tempdir()?;
let mut sst_writer =
new_sst_writer(ext_sst_dir.path().join("sample.sst").to_str().unwrap());
sst_writer.put(b"zt123_r01", b"abc")?;
sst_writer.put(b"zt123_r04", b"xyz")?;
sst_writer.put(b"zt123_r07", b"pqrst")?;
// sst_writer.delete(b"t123_r10")?; // FIXME: can't handle DELETE ops yet.
sst_writer.put(b"zt123_r13", b"www")?;
write_fn(&mut sst_writer)?;
let sst_info = sst_writer.finish()?;

// make up the SST meta for downloading.
Expand All @@ -1130,6 +1143,38 @@ mod tests {
Ok((ext_sst_dir, backend, meta))
}

fn create_sample_external_sst_file() -> Result<(tempfile::TempDir, StorageBackend, SstMeta)> {
create_external_sst_file_with_write_fn(|writer| {
writer.put(b"zt123_r01", b"abc")?;
writer.put(b"zt123_r04", b"xyz")?;
writer.put(b"zt123_r07", b"pqrst")?;
// writer.delete(b"t123_r10")?; // FIXME: can't handle DELETE ops yet.
writer.put(b"zt123_r13", b"www")?;
Ok(())
})
}

fn create_sample_external_rawkv_sst_file(
start_key: &[u8],
end_key: &[u8],
end_key_exclusive: bool,
) -> Result<(tempfile::TempDir, StorageBackend, SstMeta)> {
let (dir, backend, mut meta) = create_external_sst_file_with_write_fn(|writer| {
writer.put(b"za", b"v1")?;
writer.put(b"zb", b"v2")?;
writer.put(b"zb\x00", b"v3")?;
writer.put(b"zc", b"v4")?;
writer.put(b"zc\x00", b"v5")?;
writer.put(b"zc\x00\x00", b"v6")?;
writer.put(b"zd", b"v7")?;
Ok(())
})?;
meta.mut_range().set_start(start_key.to_vec());
meta.mut_range().set_end(end_key.to_vec());
meta.set_end_key_exclusive(end_key_exclusive);
Ok((dir, backend, meta))
}

fn get_encoded_key(key: &[u8], ts: u64) -> Vec<u8> {
keys::data_key(
txn_types::Key::from_raw(key)
Expand Down Expand Up @@ -1704,6 +1749,7 @@ mod tests {
_ => panic!("unexpected download result: {:?}", result),
}
}

#[test]
fn test_write_sst() {
let mut meta = SstMeta::default();
Expand Down Expand Up @@ -1747,4 +1793,148 @@ mod tests {
let metas = w.finish().unwrap();
assert_eq!(metas.len(), 2);
}

#[test]
fn test_download_rawkv_sst() {
// creates a sample SST file.
let (_ext_sst_dir, backend, meta) =
create_sample_external_rawkv_sst_file(b"0", b"z", false).unwrap();

// performs the download.
let importer_dir = tempfile::tempdir().unwrap();
let importer = SSTImporter::new(&importer_dir, None).unwrap();
let sst_writer = create_sst_writer_with_db(&importer, &meta).unwrap();

let range = importer
.download::<TestEngine>(
&meta,
&backend,
"sample.sst",
&RewriteRule::default(),
Limiter::new(INFINITY),
sst_writer,
)
.unwrap()
.unwrap();

assert_eq!(range.get_start(), b"a");
assert_eq!(range.get_end(), b"d");

// verifies that the file is saved to the correct place.
let sst_file_path = importer.dir.join(&meta).unwrap().save;
let sst_file_metadata = sst_file_path.metadata().unwrap();
assert!(sst_file_metadata.is_file());
assert_eq!(sst_file_metadata.len(), meta.get_length());

// verifies the SST content is correct.
let sst_reader = new_sst_reader(sst_file_path.to_str().unwrap());
sst_reader.verify_checksum().unwrap();
let mut iter = sst_reader.iter();
iter.seek(SeekKey::Start).unwrap();
assert_eq!(
collect(iter),
vec![
(b"za".to_vec(), b"v1".to_vec()),
(b"zb".to_vec(), b"v2".to_vec()),
(b"zb\x00".to_vec(), b"v3".to_vec()),
(b"zc".to_vec(), b"v4".to_vec()),
(b"zc\x00".to_vec(), b"v5".to_vec()),
(b"zc\x00\x00".to_vec(), b"v6".to_vec()),
(b"zd".to_vec(), b"v7".to_vec()),
]
);
}

#[test]
fn test_download_rawkv_sst_partial() {
// creates a sample SST file.
let (_ext_sst_dir, backend, meta) =
create_sample_external_rawkv_sst_file(b"b", b"c\x00", false).unwrap();

// performs the download.
let importer_dir = tempfile::tempdir().unwrap();
let importer = SSTImporter::new(&importer_dir, None).unwrap();
let sst_writer = create_sst_writer_with_db(&importer, &meta).unwrap();

let range = importer
.download::<TestEngine>(
&meta,
&backend,
"sample.sst",
&RewriteRule::default(),
Limiter::new(INFINITY),
sst_writer,
)
.unwrap()
.unwrap();

assert_eq!(range.get_start(), b"b");
assert_eq!(range.get_end(), b"c\x00");

// verifies that the file is saved to the correct place.
let sst_file_path = importer.dir.join(&meta).unwrap().save;
let sst_file_metadata = sst_file_path.metadata().unwrap();
assert!(sst_file_metadata.is_file());

// verifies the SST content is correct.
let sst_reader = new_sst_reader(sst_file_path.to_str().unwrap());
sst_reader.verify_checksum().unwrap();
let mut iter = sst_reader.iter();
iter.seek(SeekKey::Start).unwrap();
assert_eq!(
collect(iter),
vec![
(b"zb".to_vec(), b"v2".to_vec()),
(b"zb\x00".to_vec(), b"v3".to_vec()),
(b"zc".to_vec(), b"v4".to_vec()),
(b"zc\x00".to_vec(), b"v5".to_vec()),
]
);
}

#[test]
fn test_download_rawkv_sst_partial_exclusive_end_key() {
// creates a sample SST file.
let (_ext_sst_dir, backend, meta) =
create_sample_external_rawkv_sst_file(b"b", b"c\x00", true).unwrap();

// performs the download.
let importer_dir = tempfile::tempdir().unwrap();
let importer = SSTImporter::new(&importer_dir, None).unwrap();
let sst_writer = create_sst_writer_with_db(&importer, &meta).unwrap();

let range = importer
.download::<TestEngine>(
&meta,
&backend,
"sample.sst",
&RewriteRule::default(),
Limiter::new(INFINITY),
sst_writer,
)
.unwrap()
.unwrap();

assert_eq!(range.get_start(), b"b");
assert_eq!(range.get_end(), b"c");

// verifies that the file is saved to the correct place.
let sst_file_path = importer.dir.join(&meta).unwrap().save;
let sst_file_metadata = sst_file_path.metadata().unwrap();
assert!(sst_file_metadata.is_file());

// verifies the SST content is correct.
let sst_reader = new_sst_reader(sst_file_path.to_str().unwrap());
sst_reader.verify_checksum().unwrap();
let mut iter = sst_reader.iter();
iter.seek(SeekKey::Start).unwrap();
assert_eq!(
collect(iter),
vec![
(b"zb".to_vec(), b"v2".to_vec()),
(b"zb\x00".to_vec(), b"v3".to_vec()),
(b"zc".to_vec(), b"v4".to_vec()),
]
);
}
}
2 changes: 1 addition & 1 deletion components/test_sst_importer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::path::Path;

use engine_rocks::RocksEngine;
use engine_rocks::RocksSstReader;
use engine_rocks::RocksSstWriter;
pub use engine_rocks::RocksSstWriter;
use engine_rocks::RocksSstWriterBuilder;
use engine_traits::KvEngine;
use engine_traits::SstReader;
Expand Down

0 comments on commit 3eaa05b

Please sign in to comment.