diff --git a/components/sst_importer/src/sst_importer.rs b/components/sst_importer/src/sst_importer.rs index 6d2e1faa1fb6..42faa696227f 100644 --- a/components/sst_importer/src/sst_importer.rs +++ b/components/sst_importer/src/sst_importer.rs @@ -266,26 +266,30 @@ impl SSTImporter { let range_start = meta.get_range().get_start(); let range_end = meta.get_range().get_end(); + let range_start_bound = key_to_bound(range_start); + let range_end_bound = if meta.get_end_key_exclusive() { + key_to_exclusive_bound(range_end) + } else { + key_to_bound(range_end) + }; - let range_start = keys::rewrite::rewrite_prefix_of_start_bound( - new_prefix, - old_prefix, - key_to_bound(range_start), - ) - .map_err(|_| { - Error::WrongKeyPrefix("SST start range", range_start.to_vec(), new_prefix.to_vec()) - })?; - let range_end = keys::rewrite::rewrite_prefix_of_end_bound( - new_prefix, - old_prefix, - key_to_bound(range_end), - ) - .map_err(|_| { - Error::WrongKeyPrefix("SST end range", range_end.to_vec(), new_prefix.to_vec()) - })?; + let range_start = + keys::rewrite::rewrite_prefix_of_start_bound(new_prefix, old_prefix, range_start_bound) + .map_err(|_| { + Error::WrongKeyPrefix( + "SST start range", + range_start.to_vec(), + new_prefix.to_vec(), + ) + })?; + let range_end = + keys::rewrite::rewrite_prefix_of_end_bound(new_prefix, old_prefix, range_end_bound) + .map_err(|_| { + Error::WrongKeyPrefix("SST end range", range_end.to_vec(), new_prefix.to_vec()) + })?; let start_rename_rewrite = Instant::now(); - // read and first and last keys from the SST, determine if we could + // read the first and last keys from the SST, determine if we could // simply move the entire SST instead of iterating and generate a new one. let mut iter = sst_reader.iter(); let direct_retval = (|| -> Result> { @@ -889,19 +893,27 @@ fn key_to_bound(key: &[u8]) -> Bound<&[u8]> { } } -fn is_before_start_bound(value: &[u8], bound: &Bound>) -> bool { +fn key_to_exclusive_bound(key: &[u8]) -> Bound<&[u8]> { + if key.is_empty() { + Bound::Unbounded + } else { + Bound::Excluded(key) + } +} + +fn is_before_start_bound>(value: &[u8], bound: &Bound) -> bool { match bound { Bound::Unbounded => false, - Bound::Included(b) => *value < **b, - Bound::Excluded(b) => *value <= **b, + Bound::Included(b) => *value < *b.as_ref(), + Bound::Excluded(b) => *value <= *b.as_ref(), } } -fn is_after_end_bound(value: &[u8], bound: &Bound>) -> bool { +fn is_after_end_bound>(value: &[u8], bound: &Bound) -> bool { match bound { Bound::Unbounded => false, - Bound::Included(b) => *value > **b, - Bound::Excluded(b) => *value >= **b, + Bound::Included(b) => *value > *b.as_ref(), + Bound::Excluded(b) => *value >= *b.as_ref(), } } @@ -922,7 +934,7 @@ mod tests { }; use tempfile::Builder; use test_sst_importer::{ - new_sst_reader, new_sst_writer, new_test_engine, PROP_TEST_MARKER_CF_NAME, + new_sst_reader, new_sst_writer, new_test_engine, RocksSstWriter, PROP_TEST_MARKER_CF_NAME, }; use txn_types::{Value, WriteType}; @@ -1105,15 +1117,16 @@ mod tests { assert_eq!(meta, new_meta); } - fn create_sample_external_sst_file() -> Result<(tempfile::TempDir, StorageBackend, SstMeta)> { + fn create_external_sst_file_with_write_fn( + write_fn: F, + ) -> Result<(tempfile::TempDir, StorageBackend, SstMeta)> + where + F: FnOnce(&mut RocksSstWriter) -> Result<()>, + { let ext_sst_dir = tempfile::tempdir()?; let mut sst_writer = new_sst_writer(ext_sst_dir.path().join("sample.sst").to_str().unwrap()); - sst_writer.put(b"zt123_r01", b"abc")?; - sst_writer.put(b"zt123_r04", b"xyz")?; - sst_writer.put(b"zt123_r07", b"pqrst")?; - // sst_writer.delete(b"t123_r10")?; // FIXME: can't handle DELETE ops yet. - sst_writer.put(b"zt123_r13", b"www")?; + write_fn(&mut sst_writer)?; let sst_info = sst_writer.finish()?; // make up the SST meta for downloading. @@ -1130,6 +1143,38 @@ mod tests { Ok((ext_sst_dir, backend, meta)) } + fn create_sample_external_sst_file() -> Result<(tempfile::TempDir, StorageBackend, SstMeta)> { + create_external_sst_file_with_write_fn(|writer| { + writer.put(b"zt123_r01", b"abc")?; + writer.put(b"zt123_r04", b"xyz")?; + writer.put(b"zt123_r07", b"pqrst")?; + // writer.delete(b"t123_r10")?; // FIXME: can't handle DELETE ops yet. + writer.put(b"zt123_r13", b"www")?; + Ok(()) + }) + } + + fn create_sample_external_rawkv_sst_file( + start_key: &[u8], + end_key: &[u8], + end_key_exclusive: bool, + ) -> Result<(tempfile::TempDir, StorageBackend, SstMeta)> { + let (dir, backend, mut meta) = create_external_sst_file_with_write_fn(|writer| { + writer.put(b"za", b"v1")?; + writer.put(b"zb", b"v2")?; + writer.put(b"zb\x00", b"v3")?; + writer.put(b"zc", b"v4")?; + writer.put(b"zc\x00", b"v5")?; + writer.put(b"zc\x00\x00", b"v6")?; + writer.put(b"zd", b"v7")?; + Ok(()) + })?; + meta.mut_range().set_start(start_key.to_vec()); + meta.mut_range().set_end(end_key.to_vec()); + meta.set_end_key_exclusive(end_key_exclusive); + Ok((dir, backend, meta)) + } + fn get_encoded_key(key: &[u8], ts: u64) -> Vec { keys::data_key( txn_types::Key::from_raw(key) @@ -1704,6 +1749,7 @@ mod tests { _ => panic!("unexpected download result: {:?}", result), } } + #[test] fn test_write_sst() { let mut meta = SstMeta::default(); @@ -1747,4 +1793,148 @@ mod tests { let metas = w.finish().unwrap(); assert_eq!(metas.len(), 2); } + + #[test] + fn test_download_rawkv_sst() { + // creates a sample SST file. + let (_ext_sst_dir, backend, meta) = + create_sample_external_rawkv_sst_file(b"0", b"z", false).unwrap(); + + // performs the download. + let importer_dir = tempfile::tempdir().unwrap(); + let importer = SSTImporter::new(&importer_dir, None).unwrap(); + let sst_writer = create_sst_writer_with_db(&importer, &meta).unwrap(); + + let range = importer + .download::( + &meta, + &backend, + "sample.sst", + &RewriteRule::default(), + Limiter::new(INFINITY), + sst_writer, + ) + .unwrap() + .unwrap(); + + assert_eq!(range.get_start(), b"a"); + assert_eq!(range.get_end(), b"d"); + + // verifies that the file is saved to the correct place. + let sst_file_path = importer.dir.join(&meta).unwrap().save; + let sst_file_metadata = sst_file_path.metadata().unwrap(); + assert!(sst_file_metadata.is_file()); + assert_eq!(sst_file_metadata.len(), meta.get_length()); + + // verifies the SST content is correct. + let sst_reader = new_sst_reader(sst_file_path.to_str().unwrap()); + sst_reader.verify_checksum().unwrap(); + let mut iter = sst_reader.iter(); + iter.seek(SeekKey::Start).unwrap(); + assert_eq!( + collect(iter), + vec![ + (b"za".to_vec(), b"v1".to_vec()), + (b"zb".to_vec(), b"v2".to_vec()), + (b"zb\x00".to_vec(), b"v3".to_vec()), + (b"zc".to_vec(), b"v4".to_vec()), + (b"zc\x00".to_vec(), b"v5".to_vec()), + (b"zc\x00\x00".to_vec(), b"v6".to_vec()), + (b"zd".to_vec(), b"v7".to_vec()), + ] + ); + } + + #[test] + fn test_download_rawkv_sst_partial() { + // creates a sample SST file. + let (_ext_sst_dir, backend, meta) = + create_sample_external_rawkv_sst_file(b"b", b"c\x00", false).unwrap(); + + // performs the download. + let importer_dir = tempfile::tempdir().unwrap(); + let importer = SSTImporter::new(&importer_dir, None).unwrap(); + let sst_writer = create_sst_writer_with_db(&importer, &meta).unwrap(); + + let range = importer + .download::( + &meta, + &backend, + "sample.sst", + &RewriteRule::default(), + Limiter::new(INFINITY), + sst_writer, + ) + .unwrap() + .unwrap(); + + assert_eq!(range.get_start(), b"b"); + assert_eq!(range.get_end(), b"c\x00"); + + // verifies that the file is saved to the correct place. + let sst_file_path = importer.dir.join(&meta).unwrap().save; + let sst_file_metadata = sst_file_path.metadata().unwrap(); + assert!(sst_file_metadata.is_file()); + + // verifies the SST content is correct. + let sst_reader = new_sst_reader(sst_file_path.to_str().unwrap()); + sst_reader.verify_checksum().unwrap(); + let mut iter = sst_reader.iter(); + iter.seek(SeekKey::Start).unwrap(); + assert_eq!( + collect(iter), + vec![ + (b"zb".to_vec(), b"v2".to_vec()), + (b"zb\x00".to_vec(), b"v3".to_vec()), + (b"zc".to_vec(), b"v4".to_vec()), + (b"zc\x00".to_vec(), b"v5".to_vec()), + ] + ); + } + + #[test] + fn test_download_rawkv_sst_partial_exclusive_end_key() { + // creates a sample SST file. + let (_ext_sst_dir, backend, meta) = + create_sample_external_rawkv_sst_file(b"b", b"c\x00", true).unwrap(); + + // performs the download. + let importer_dir = tempfile::tempdir().unwrap(); + let importer = SSTImporter::new(&importer_dir, None).unwrap(); + let sst_writer = create_sst_writer_with_db(&importer, &meta).unwrap(); + + let range = importer + .download::( + &meta, + &backend, + "sample.sst", + &RewriteRule::default(), + Limiter::new(INFINITY), + sst_writer, + ) + .unwrap() + .unwrap(); + + assert_eq!(range.get_start(), b"b"); + assert_eq!(range.get_end(), b"c"); + + // verifies that the file is saved to the correct place. + let sst_file_path = importer.dir.join(&meta).unwrap().save; + let sst_file_metadata = sst_file_path.metadata().unwrap(); + assert!(sst_file_metadata.is_file()); + + // verifies the SST content is correct. + let sst_reader = new_sst_reader(sst_file_path.to_str().unwrap()); + sst_reader.verify_checksum().unwrap(); + let mut iter = sst_reader.iter(); + iter.seek(SeekKey::Start).unwrap(); + assert_eq!( + collect(iter), + vec![ + (b"zb".to_vec(), b"v2".to_vec()), + (b"zb\x00".to_vec(), b"v3".to_vec()), + (b"zc".to_vec(), b"v4".to_vec()), + ] + ); + } } diff --git a/components/test_sst_importer/src/lib.rs b/components/test_sst_importer/src/lib.rs index 37271b8dad3f..39ed8d55cb52 100644 --- a/components/test_sst_importer/src/lib.rs +++ b/components/test_sst_importer/src/lib.rs @@ -6,7 +6,7 @@ use std::path::Path; use engine_rocks::RocksEngine; use engine_rocks::RocksSstReader; -use engine_rocks::RocksSstWriter; +pub use engine_rocks::RocksSstWriter; use engine_rocks::RocksSstWriterBuilder; use engine_traits::KvEngine; use engine_traits::SstReader;