diff --git a/Cargo.lock b/Cargo.lock index 60cfb4ea1c1..a9563a157c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2332,7 +2332,7 @@ dependencies = [ [[package]] name = "librocksdb_sys" version = "0.1.0" -source = "git+https://github.com/tikv/rust-rocksdb.git?branch=tikv-5.0#651a5c0d17662a0d95c76c18f2bb46036f6c501d" +source = "git+https://github.com/tikv/rust-rocksdb.git?branch=tikv-5.0#53ff7e7a8fff03058ce03b9d520227cc4aff90b8" dependencies = [ "bindgen", "bzip2-sys", @@ -2351,7 +2351,7 @@ dependencies = [ [[package]] name = "libtitan_sys" version = "0.0.1" -source = "git+https://github.com/tikv/rust-rocksdb.git?branch=tikv-5.0#651a5c0d17662a0d95c76c18f2bb46036f6c501d" +source = "git+https://github.com/tikv/rust-rocksdb.git?branch=tikv-5.0#53ff7e7a8fff03058ce03b9d520227cc4aff90b8" dependencies = [ "bzip2-sys", "cc", @@ -4068,7 +4068,7 @@ checksum = "cabe4fa914dec5870285fa7f71f602645da47c486e68486d2b4ceb4a343e90ac" [[package]] name = "rocksdb" version = "0.3.0" -source = "git+https://github.com/tikv/rust-rocksdb.git?branch=tikv-5.0#651a5c0d17662a0d95c76c18f2bb46036f6c501d" +source = "git+https://github.com/tikv/rust-rocksdb.git?branch=tikv-5.0#53ff7e7a8fff03058ce03b9d520227cc4aff90b8" dependencies = [ "libc 0.2.86", "librocksdb_sys", diff --git a/components/engine_rocks/src/options.rs b/components/engine_rocks/src/options.rs index 2fdc62742cd..3f783325ef1 100644 --- a/components/engine_rocks/src/options.rs +++ b/components/engine_rocks/src/options.rs @@ -17,6 +17,7 @@ impl From for RocksReadOptions { fn from(opts: engine_traits::ReadOptions) -> Self { let mut r = RawReadOptions::default(); r.fill_cache(opts.fill_cache()); + r.set_read_tier(opts.read_tier() as i32); RocksReadOptions(r) } } diff --git a/components/engine_traits/src/options.rs b/components/engine_traits/src/options.rs index d8d2c864ec6..6ca72bb477b 100644 --- a/components/engine_traits/src/options.rs +++ b/components/engine_traits/src/options.rs @@ -2,9 +2,18 @@ use std::ops::Bound; use tikv_util::keybuilder::KeyBuilder; +#[repr(i32)] +#[derive(Clone, Copy)] +pub enum ReadTier { + ReadAllTier = 0, + BlockCacheTier = 1, + PersistedTier = 2, +} + #[derive(Clone)] pub struct ReadOptions { fill_cache: bool, + read_tier: ReadTier, } impl ReadOptions { @@ -21,11 +30,24 @@ impl ReadOptions { pub fn set_fill_cache(&mut self, v: bool) { self.fill_cache = v; } + + #[inline] + pub fn read_tier(&self) -> ReadTier { + self.read_tier + } + + #[inline] + pub fn set_read_tier(&mut self, v: ReadTier) { + self.read_tier = v; + } } impl Default for ReadOptions { fn default() -> ReadOptions { - ReadOptions { fill_cache: true } + ReadOptions { + fill_cache: true, + read_tier: ReadTier::ReadAllTier, // all tier + } } } diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index f3e7bea1822..6ffc227633a 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -11,7 +11,8 @@ use batch_system::{BasicMailbox, Fsm}; use collections::HashMap; use engine_traits::CF_RAFT; use engine_traits::{ - Engines, KvEngine, RaftEngine, RaftLogBatch, SSTMetaInfo, WriteBatch, WriteBatchExt, + Engines, KvEngine, RaftEngine, RaftLogBatch, ReadOptions, ReadTier, SSTMetaInfo, WriteBatch, + WriteBatchExt, }; use error_code::ErrorCodeExt; use fail::fail_point; @@ -131,6 +132,9 @@ where max_inflight_msgs: usize, trace: PeerMemoryTrace, + + // read applied idx from sst for GC + check_truncated_idx_for_gc: bool, } pub struct BatchRaftCmdRequestBuilder @@ -231,6 +235,7 @@ where ), max_inflight_msgs: cfg.raft_max_inflight_msgs, trace: PeerMemoryTrace::default(), + check_truncated_idx_for_gc: cfg.disable_kv_wal, }), )) } @@ -276,6 +281,7 @@ where ), max_inflight_msgs: cfg.raft_max_inflight_msgs, trace: PeerMemoryTrace::default(), + check_truncated_idx_for_gc: cfg.disable_kv_wal, }), )) } @@ -2212,18 +2218,50 @@ where } } + fn get_flushed_truncated_idx(&mut self, region_id: u64) -> u64 { + let state_key = keys::apply_state_key(region_id); + let mut opts = ReadOptions::new(); + opts.set_read_tier(ReadTier::PersistedTier); + let value = self + .ctx + .engines + .kv + .get_value_cf_opt(&opts, CF_RAFT, &state_key) + .unwrap(); + if value.is_none() { + return 0; + } + let mut m = RaftApplyState::default(); + m.merge_from_bytes(&value.unwrap()).unwrap(); + cmp::min(m.get_truncated_state().get_index(), m.applied_index) + } + fn on_ready_compact_log(&mut self, first_index: u64, state: RaftTruncatedState) { let total_cnt = self.fsm.peer.last_applying_idx - first_index; + + let mut index = state.get_index(); + if self.fsm.check_truncated_idx_for_gc { + let last_truncated_idx = + self.get_flushed_truncated_idx(self.fsm.peer.get_store().get_region_id()); + if last_truncated_idx != 0 && last_truncated_idx - 1 < index { + debug!("on_ready_compact_log update index."; + "region" => self.fsm.peer.get_store().get_region_id(), + "original" => index, + "new value" => last_truncated_idx-1); + index = last_truncated_idx - 1; + } + } // the size of current CompactLog command can be ignored. let remain_cnt = self.fsm.peer.last_applying_idx - state.get_index() - 1; self.fsm.peer.raft_log_size_hint = self.fsm.peer.raft_log_size_hint * remain_cnt / total_cnt; - let compact_to = state.get_index() + 1; + let compact_to = index + 1; let task = RaftlogGcTask::gc( self.fsm.peer.get_store().get_region_id(), self.fsm.peer.last_compacted_idx, compact_to, ); + let start_idx = self.fsm.peer.last_compacted_idx; self.fsm.peer.last_compacted_idx = compact_to; self.fsm.peer.mut_store().compact_to(compact_to); if let Err(e) = self.ctx.raftlog_gc_scheduler.schedule(task) { @@ -2233,6 +2271,14 @@ where "peer_id" => self.fsm.peer_id(), "err" => %e, ); + } else { + debug!( + "successfully to schedule compact task"; + "region_id" => self.fsm.region_id(), + "peer_id" => self.fsm.peer_id(), + "start_idx" => start_idx, + "end_idx" => compact_to, + ); } } diff --git a/components/raftstore/src/store/mod.rs b/components/raftstore/src/store/mod.rs index ed78726117f..573ca9fd7db 100644 --- a/components/raftstore/src/store/mod.rs +++ b/components/raftstore/src/store/mod.rs @@ -49,9 +49,9 @@ pub use self::region_snapshot::{RegionIterator, RegionSnapshot}; pub use self::replication_mode::{GlobalReplicationState, StoreGroup}; pub use self::snap::{ check_abort, copy_snapshot, - snap_io::{apply_sst_cf_file, build_sst_cf_file}, + snap_io::{apply_sst_cf_file, build_sst_cf_file_list}, ApplyOptions, Error as SnapError, SnapEntry, SnapKey, SnapManager, SnapManagerBuilder, - Snapshot, SnapshotStatistics, + Snapshot, SnapshotStatistics,CfFile, }; pub use self::transport::{CasualRouter, ProposalRouter, StoreRouter, Transport}; pub use self::util::{RegionReadProgress, RegionReadProgressRegistry}; diff --git a/components/raftstore/src/store/snap.rs b/components/raftstore/src/store/snap.rs index 71799ffd600..2bd70d70dcb 100644 --- a/components/raftstore/src/store/snap.rs +++ b/components/raftstore/src/store/snap.rs @@ -213,12 +213,22 @@ fn gen_snapshot_meta(cf_files: &[CfFile]) -> RaftStoreResult { cf_file.cf )); } - - let mut cf_file_meta = SnapshotCfFile::new(); - cf_file_meta.set_cf(cf_file.cf.to_owned()); - cf_file_meta.set_size(cf_file.size); - cf_file_meta.set_checksum(cf_file.checksum); - meta.push(cf_file_meta); + let size_vec = &cf_file.size; + if size_vec.len() != 0 { + for (i, size) in size_vec.iter().enumerate() { + let mut cf_file_meta = SnapshotCfFile::new(); + cf_file_meta.set_cf(cf_file.cf.to_string()); + cf_file_meta.set_size(*size); + cf_file_meta.set_checksum(cf_file.checksum[i]); + meta.push(cf_file_meta); + } + } else { + let mut cf_file_meta = SnapshotCfFile::new(); + cf_file_meta.set_cf(cf_file.cf.to_string()); + cf_file_meta.set_size(0); + cf_file_meta.set_checksum(0); + meta.push(cf_file_meta); + } } let mut snapshot_meta = SnapshotMeta::default(); snapshot_meta.set_cf_files(meta.into()); @@ -291,13 +301,91 @@ struct CfFileForRecving { pub struct CfFile { pub cf: CfName, pub path: PathBuf, - pub tmp_path: PathBuf, - pub clone_path: PathBuf, - file_for_sending: Option>, - file_for_recving: Option, + pub file_prefix: String, + pub file_suffix: String, + file_for_sending: Vec>, + file_for_recving: Vec, + file_names: Vec, pub kv_count: u64, - pub size: u64, - pub checksum: u32, + pub size: Vec, + pub checksum: Vec, +} + +impl CfFile { + pub fn new(cf: CfName, path: PathBuf, file_prefix: String, file_suffix: String) -> Self { + CfFile { + cf, + path, + file_prefix, + file_suffix, + ..Default::default() + } + } + pub fn tmp_file_paths(&self) -> Vec { + self.file_names + .iter() + .map(|file_name| { + self.path + .join(format!("{}{}", file_name, TMP_FILE_SUFFIX)) + .to_str() + .unwrap() + .to_string() + }) + .collect::>() + } + + pub fn clone_file_paths(&self) -> Vec { + self.file_names + .iter() + .map(|file_name| { + self.path + .join(format!("{}{}", file_name, CLONE_FILE_SUFFIX)) + .to_str() + .unwrap() + .to_string() + }) + .collect::>() + } + + pub fn file_paths(&self) -> Vec { + self.file_names + .iter() + .map(|file_name| self.path.join(file_name).to_str().unwrap().to_string()) + .collect::>() + } + + pub fn add_file(&mut self, idx: usize) -> String { + self.add_file_with_size_checksum(idx, 0, 0) + } + + pub fn add_file_with_size_checksum(&mut self, idx: usize, size: u64, checksum: u32) -> String { + assert!(self.size.len() >= idx); + let file_name = self.gen_file_name(idx); + if self.size.len() > idx { // Any logic similar to test_snap_corruption_on_size_or_checksum will trigger this branch + self.size[idx] = size; + self.checksum[idx] = checksum; + self.file_names[idx] = file_name.clone(); + } else { + self.size.push(size); + self.checksum.push(checksum); + self.file_names.push(file_name.clone()); + } + self.path.join(file_name).to_str().unwrap().to_string() + } + + pub fn gen_file_name(&self, file_count: usize) -> String { + format!( + "{}_{:04}{}", + self.file_prefix, file_count, self.file_suffix + ) + } + + pub fn gen_tmp_file_name(&self, file_count: usize) -> String { + format!( + "{}_{:04}{}{}", + self.file_prefix, file_count, self.file_suffix, TMP_FILE_SUFFIX + ) + } } #[derive(Default)] @@ -316,6 +404,7 @@ pub struct Snapshot { dir_path: PathBuf, cf_files: Vec, cf_index: usize, + cf_file_index: usize, meta_file: MetaFile, hold_tmp_files: bool, @@ -351,15 +440,12 @@ impl Snapshot { let mut cf_files = Vec::with_capacity(SNAPSHOT_CFS.len()); for cf in SNAPSHOT_CFS { - let filename = format!("{}_{}{}", prefix, cf, SST_FILE_SUFFIX); - let path = dir_path.join(&filename); - let tmp_path = dir_path.join(format!("{}{}", filename, TMP_FILE_SUFFIX)); - let clone_path = dir_path.join(format!("{}{}", filename, CLONE_FILE_SUFFIX)); + let file_prefix = format!("{}_{}", prefix, cf); let cf_file = CfFile { cf, - path, - tmp_path, - clone_path, + path: dir_path.clone(), + file_prefix, + file_suffix: SST_FILE_SUFFIX.to_string(), ..Default::default() }; cf_files.push(cf_file); @@ -380,6 +466,7 @@ impl Snapshot { dir_path, cf_files, cf_index: 0, + cf_file_index: 0, meta_file, hold_tmp_files: false, mgr: mgr.clone(), @@ -437,9 +524,15 @@ impl Snapshot { } for cf_file in &mut s.cf_files { // initialize cf file size and reader - if cf_file.size > 0 { - let file = File::open(&cf_file.path)?; - cf_file.file_for_sending = Some(Box::new(file) as Box); + let file_paths = cf_file.file_paths(); + for (i, file_path) in file_paths.iter().enumerate() { + if cf_file.size[i] > 0 { + let path = Path::new(file_path); + let file = File::open(&path)?; + cf_file + .file_for_sending + .push(Box::new(file) as Box); + } } } Ok(s) @@ -465,35 +558,43 @@ impl Snapshot { s.hold_tmp_files = true; for cf_file in &mut s.cf_files { - if cf_file.size == 0 { + if cf_file.size.is_empty() { continue; } - let f = OpenOptions::new() - .write(true) - .create_new(true) - .open(&cf_file.tmp_path)?; - cf_file.file_for_recving = Some(CfFileForRecving { - file: f, - encrypter: None, - written_size: 0, - write_digest: crc32fast::Hasher::new(), - }); - - if let Some(mgr) = &s.mgr.encryption_key_manager { - let path = cf_file.path.to_str().unwrap(); - let enc_info = mgr.new_file(path)?; - let mthd = encryption_method_from_db_encryption_method(enc_info.method); - if mthd != EncryptionMethod::Plaintext { - let file_for_recving = cf_file.file_for_recving.as_mut().unwrap(); - file_for_recving.encrypter = Some( - create_aes_ctr_crypter( - mthd, - &enc_info.key, - Mode::Encrypt, - Iv::from_slice(&enc_info.iv)?, - ) - .map_err(|e| RaftStoreError::Snapshot(box_err!(e)))?, - ); + cf_file.file_for_recving = vec![]; + let tmp_file_paths = cf_file.tmp_file_paths(); + let file_paths = cf_file.file_paths(); + for (idx, _) in tmp_file_paths.iter().enumerate() { + if cf_file.size[idx] == 0 { + continue; + } + let file_path = Path::new(&tmp_file_paths[idx]); + let f = OpenOptions::new() + .write(true) + .create_new(true) + .open(&file_path)?; + cf_file.file_for_recving.push(CfFileForRecving { + file: f, + encrypter: None, + written_size: 0, + write_digest: crc32fast::Hasher::new(), + }); + + if let Some(mgr) = &s.mgr.encryption_key_manager { + let enc_info = mgr.new_file(&file_paths[idx])?; + let mthd = encryption_method_from_db_encryption_method(enc_info.method); + if mthd != EncryptionMethod::Plaintext { + let file_for_recving = cf_file.file_for_recving.last_mut().unwrap(); + file_for_recving.encrypter = Some( + create_aes_ctr_crypter( + mthd, + &enc_info.key, + Mode::Encrypt, + Iv::from_slice(&enc_info.iv)?, + ) + .map_err(|e| RaftStoreError::Snapshot(box_err!(e)))?, + ); + } } } } @@ -533,29 +634,64 @@ impl Snapshot { } fn set_snapshot_meta(&mut self, snapshot_meta: SnapshotMeta) -> RaftStoreResult<()> { - if snapshot_meta.get_cf_files().len() != self.cf_files.len() { + let mut cf_file_count_from_meta: Vec = vec![]; + let mut file_count = 0; + let mut current_cf = ""; + info!("set_snapshot_meta total cf files count: {}", snapshot_meta.get_cf_files().len()); + for cf_file in snapshot_meta.get_cf_files() { + if current_cf.is_empty() { + current_cf = cf_file.get_cf(); + file_count = 1; + continue; + } + + if current_cf != cf_file.get_cf() { + cf_file_count_from_meta.push(file_count); + current_cf = cf_file.get_cf(); + file_count = 1; + } else { + file_count += 1; + } + } + cf_file_count_from_meta.push(file_count); + + if cf_file_count_from_meta.len() != self.cf_files.len() { return Err(box_err!( "invalid cf number of snapshot meta, expect {}, got {}", SNAPSHOT_CFS.len(), - snapshot_meta.get_cf_files().len() + cf_file_count_from_meta.len() )); } - for (i, cf_file) in self.cf_files.iter_mut().enumerate() { - let meta = snapshot_meta.get_cf_files().get(i).unwrap(); - if meta.get_cf() != cf_file.cf { - return Err(box_err!( - "invalid {} cf in snapshot meta, expect {}, got {}", - i, - cf_file.cf, - meta.get_cf() - )); - } - cf_file.size = meta.get_size(); - cf_file.checksum = meta.get_checksum(); - if file_exists(&cf_file.path) { - let mgr = self.mgr.encryption_key_manager.as_ref(); - let (_, size) = calc_checksum_and_size(&cf_file.path, mgr)?; - check_file_size(size, cf_file.size, &cf_file.path)?; + let mut file_idx = 0; + let mut cf_idx = 0; + for meta in snapshot_meta.get_cf_files() { + if cf_idx < cf_file_count_from_meta.len() && file_idx < cf_file_count_from_meta[cf_idx] { + if meta.get_cf() != self.cf_files[cf_idx].cf { + return Err(box_err!( + "invalid {} cf in snapshot meta, expect {}, got {}", + cf_idx, + self.cf_files[cf_idx].cf, + meta.get_cf() + )); + } + if meta.get_size() != 0 { + let file_path = self.cf_files[cf_idx].add_file_with_size_checksum(file_idx, meta.get_size(), meta.get_checksum()); + if file_exists(&file_path) { + let mgr = self.mgr.encryption_key_manager.as_ref(); + let file_path = Path::new(&file_path); + let (_, size) = calc_checksum_and_size(&file_path, mgr)?; + check_file_size( + size, + *(self.cf_files[cf_idx].size.last().unwrap()), + &file_path, + )?; + } + } + file_idx += 1; + if file_idx >= cf_file_count_from_meta[cf_idx] { + cf_idx += 1; + file_idx = 0; + } } } self.meta_file.meta = snapshot_meta; @@ -589,29 +725,34 @@ impl Snapshot { fn validate(&self, engine: &impl KvEngine, for_send: bool) -> RaftStoreResult<()> { for cf_file in &self.cf_files { - if cf_file.size == 0 { - // Skip empty file. The checksum of this cf file should be 0 and - // this is checked when loading the snapshot meta. - continue; - } + let file_paths = cf_file.file_paths(); + let clone_file_paths = cf_file.clone_file_paths(); + for (i, file_path) in file_paths.iter().enumerate() { + if cf_file.size[i] == 0 { + // Skip empty file. The checksum of this cf file should be 0 and + // this is checked when loading the snapshot meta. + continue; + } - if !plain_file_used(cf_file.cf) { - // Reset global seq number. - engine.reset_global_seq(&cf_file.cf, &cf_file.path)?; - } - check_file_size_and_checksum( - &cf_file.path, - cf_file.size, - cf_file.checksum, - self.mgr.encryption_key_manager.as_ref(), - )?; - - if !for_send && !plain_file_used(cf_file.cf) { - sst_importer::prepare_sst_for_ingestion( - &cf_file.path, - &cf_file.clone_path, - self.mgr.encryption_key_manager.as_deref(), + let file_path = Path::new(file_path); + if !plain_file_used(cf_file.cf) { + // Reset global seq number. + engine.reset_global_seq(&cf_file.cf, &file_path)?; + } + check_file_size_and_checksum( + &file_path, + cf_file.size[i], + cf_file.checksum[i], + self.mgr.encryption_key_manager.as_ref(), )?; + + if !for_send && !plain_file_used(cf_file.cf) { + sst_importer::prepare_sst_for_ingestion( + &file_path, + &Path::new(&clone_file_paths[i]), + self.mgr.encryption_key_manager.as_deref(), + )?; + } } } Ok(()) @@ -691,20 +832,18 @@ impl Snapshot { for (cf_enum, cf) in SNAPSHOT_CFS_ENUM_PAIR { self.switch_to_cf_file(cf)?; let cf_file = &mut self.cf_files[self.cf_index]; - let path = cf_file.tmp_path.to_str().unwrap(); let cf_stat = if plain_file_used(cf_file.cf) { let key_mgr = self.mgr.encryption_key_manager.as_ref(); - snap_io::build_plain_cf_file::( - path, key_mgr, kv_snap, cf_file.cf, &begin_key, &end_key, - )? + snap_io::build_plain_cf_file::(cf_file, key_mgr, kv_snap, &begin_key, &end_key)? } else { - snap_io::build_sst_cf_file::( - path, + let raw_size_per_file = self.mgr.max_per_file_size; + snap_io::build_sst_cf_file_list::( + cf_file, engine, kv_snap, - cf_file.cf, &begin_key, &end_key, + raw_size_per_file, &self.mgr.limiter, )? }; @@ -714,10 +853,14 @@ impl Snapshot { // contain some metadata so their sizes will never be 0. self.mgr.rename_tmp_cf_file_for_send(cf_file)?; } else { - delete_file_if_exist(&cf_file.tmp_path).unwrap(); + for tmp_file_path in cf_file.tmp_file_paths() { + let tmp_file_path = Path::new(&tmp_file_path); + delete_file_if_exist(tmp_file_path)?; + } if let Some(ref mgr) = self.mgr.encryption_key_manager { - let src = cf_file.tmp_path.to_str().unwrap(); - mgr.delete_file(src)?; + for tmp_file_path in cf_file.tmp_file_paths() { + mgr.delete_file(&tmp_file_path)?; + } } } @@ -752,18 +895,28 @@ impl Snapshot { ); for cf_file in &self.cf_files { // Delete cloned files. - delete_file_if_exist(&cf_file.clone_path).unwrap(); + let clone_file_paths = cf_file.clone_file_paths(); + for clone_file_path in clone_file_paths { + delete_file_if_exist(&clone_file_path).unwrap(); + } // Delete temp files. if self.hold_tmp_files { - delete_file_if_exist(&cf_file.tmp_path).unwrap(); + let tmp_file_paths = cf_file.tmp_file_paths(); + for tmp_file_path in tmp_file_paths { + delete_file_if_exist(&tmp_file_path).unwrap(); + } } // Delete cf files. - delete_file_if_exist(&cf_file.path).unwrap(); + let file_paths = cf_file.file_paths(); + for file_path in &file_paths { + delete_file_if_exist(&file_path).unwrap(); + } if let Some(ref mgr) = self.mgr.encryption_key_manager { - let path = cf_file.path.to_str().unwrap(); - mgr.delete_file(path).unwrap(); + for file_path in &file_paths { + mgr.delete_file(&file_path).unwrap(); + } } } delete_file_if_exist(&self.meta_file.path).unwrap(); @@ -822,13 +975,13 @@ impl Snapshot { let region = options.region; let key_mgr = self.mgr.encryption_key_manager.as_ref(); for cf_file in &mut self.cf_files { - if cf_file.size == 0 { + if cf_file.size.len() == 0 { // Skip empty cf file. continue; } let cf = cf_file.cf; if plain_file_used(cf_file.cf) { - let path = cf_file.path.to_str().unwrap(); + let path = &cf_file.file_paths()[0]; let batch_size = options.write_batch_size; let cb = |kv: &[(Vec, Vec)]| { coprocessor_host.post_apply_plain_kvs_from_snapshot(®ion, cf, kv) @@ -844,8 +997,13 @@ impl Snapshot { )?; } else { let _timer = INGEST_SST_DURATION_SECONDS.start_coarse_timer(); - let path = cf_file.clone_path.to_str().unwrap(); - snap_io::apply_sst_cf_file(path, &options.db, cf)?; + let path = cf_file.path.to_str().unwrap(); // path is not used at all + let clone_file_paths = cf_file.clone_file_paths(); + let clone_files = clone_file_paths + .iter() + .map(|s| s.as_str()) + .collect::>(); + snap_io::apply_sst_cf_file(clone_files.as_slice(), &options.db, cf)?; coprocessor_host.post_apply_sst_from_snapshot(®ion, cf, path); } } @@ -856,11 +1014,14 @@ impl Snapshot { &self.display_path } - pub fn exists(&self) -> bool { - self.cf_files - .iter() - .all(|cf_file| cf_file.size == 0 || file_exists(&cf_file.path)) - && file_exists(&self.meta_file.path) + pub fn exists(&self) -> bool { + self.cf_files.iter().all(|cf_file| { + cf_file.size.len() == 0 + || (cf_file + .file_paths() + .iter() + .all(|file_path| file_exists(&Path::new(file_path)))) + }) && file_exists(&self.meta_file.path) } pub fn meta(&self) -> io::Result { @@ -868,7 +1029,9 @@ impl Snapshot { } pub fn total_size(&self) -> io::Result { - Ok(self.cf_files.iter().fold(0, |acc, x| acc + x.size)) + Ok(self.cf_files.iter().fold(0, |acc, x| { + acc + x.size.iter().fold(0, |acc2, x2| acc2 + x2) + })) } pub fn save(&mut self) -> io::Result<()> { @@ -877,47 +1040,54 @@ impl Snapshot { "snapshot" => %self.path(), ); for cf_file in &mut self.cf_files { - if cf_file.size == 0 { + if cf_file.size.len() == 0 { // Skip empty cf file. continue; } // Check each cf file has been fully written, and the checksum matches. - let mut file_for_recving = cf_file.file_for_recving.take().unwrap(); - file_for_recving.file.flush()?; - file_for_recving.file.sync_all()?; - - if file_for_recving.written_size != cf_file.size { - return Err(io::Error::new( - ErrorKind::Other, - format!( - "snapshot file {} for cf {} size mismatches, \ - real size {}, expected size {}", - cf_file.path.display(), - cf_file.cf, - file_for_recving.written_size, - cf_file.size - ), - )); - } + let mut i = 0; + for mut file_for_recving in cf_file.file_for_recving.drain(..) { + file_for_recving.file.flush()?; + file_for_recving.file.sync_all()?; + + if file_for_recving.written_size != cf_file.size[i] { + return Err(io::Error::new( + ErrorKind::Other, + format!( + "snapshot file {} for cf {} size mismatches, \ + real size {}, expected size {}", + cf_file.path.display(), + cf_file.cf, + file_for_recving.written_size, + cf_file.size[i] + ), + )); + } - let checksum = file_for_recving.write_digest.finalize(); - if checksum != cf_file.checksum { - return Err(io::Error::new( - ErrorKind::Other, - format!( - "snapshot file {} for cf {} checksum \ - mismatches, real checksum {}, expected \ - checksum {}", - cf_file.path.display(), - cf_file.cf, - checksum, - cf_file.checksum - ), - )); + let checksum = file_for_recving.write_digest.finalize(); + if checksum != cf_file.checksum[i] { + return Err(io::Error::new( + ErrorKind::Other, + format!( + "snapshot file {} for cf {} checksum \ + mismatches, real checksum {}, expected \ + checksum {}", + cf_file.path.display(), + cf_file.cf, + checksum, + cf_file.checksum[i] + ), + )); + } + i += 1; } - file_system::rename(&cf_file.tmp_path, &cf_file.path)?; + let tmp_paths = cf_file.tmp_file_paths(); + let paths = cf_file.file_paths(); + for (i, tmp_path) in tmp_paths.iter().enumerate() { + file_system::rename(&tmp_path, &paths[i])?; + } } sync_dir(&self.dir_path)?; @@ -950,15 +1120,23 @@ impl Read for Snapshot { } while self.cf_index < self.cf_files.len() { let cf_file = &mut self.cf_files[self.cf_index]; - if cf_file.size == 0 { + if self.cf_file_index >= cf_file.size.len() || cf_file.size[self.cf_file_index] == 0 { self.cf_index += 1; + self.cf_file_index = 0; continue; } - let reader = cf_file.file_for_sending.as_mut().unwrap(); + let reader = cf_file + .file_for_sending + .get_mut(self.cf_file_index) + .unwrap(); match reader.read(buf) { Ok(0) => { // EOF. Switch to next file. - self.cf_index += 1; + self.cf_file_index += 1; + if self.cf_file_index == cf_file.size.len() { + self.cf_index += 1; + self.cf_file_index = 0; + } } Ok(n) => return Ok(n), e => return e, @@ -977,14 +1155,14 @@ impl Write for Snapshot { let (mut next_buf, mut written_bytes) = (buf, 0); while self.cf_index < self.cf_files.len() { let cf_file = &mut self.cf_files[self.cf_index]; - if cf_file.size == 0 { + if cf_file.size.is_empty() { self.cf_index += 1; continue; } - let mut file_for_recving = cf_file.file_for_recving.as_mut().unwrap(); - - let left = (cf_file.size - file_for_recving.written_size) as usize; + assert!(cf_file.size[self.cf_file_index] != 0); + let mut file_for_recving = cf_file.file_for_recving.get_mut(self.cf_file_index).unwrap(); + let left = (cf_file.size.get(self.cf_file_index).unwrap() - file_for_recving.written_size) as usize; assert!(left > 0 && !next_buf.is_empty()); let (write_len, switch, finished) = match next_buf.len().cmp(&left) { CmpOrdering::Greater => (left, true, false), @@ -1023,10 +1201,13 @@ impl Write for Snapshot { } start += acquire; } - if switch { - self.cf_index += 1; - next_buf = &next_buf[write_len..] + next_buf = &next_buf[write_len..]; + self.cf_file_index += 1; + if self.cf_file_index >= cf_file.size.len() { + self.cf_file_index = 0; + self.cf_index += 1; + } } if finished { break; @@ -1037,8 +1218,9 @@ impl Write for Snapshot { fn flush(&mut self) -> io::Result<()> { if let Some(cf_file) = self.cf_files.get_mut(self.cf_index) { - let file_for_recving = cf_file.file_for_recving.as_mut().unwrap(); - file_for_recving.file.flush()?; + for file_for_recving in &mut cf_file.file_for_recving { + file_for_recving.file.flush()?; + } } Ok(()) } @@ -1076,6 +1258,7 @@ struct SnapManagerCore { limiter: Limiter, temp_sst_id: Arc, encryption_key_manager: Option>, + max_per_file_size: u64, } /// `SnapManagerCore` trace all current processing snapshots. @@ -1171,7 +1354,7 @@ impl SnapManager { .filter_map(|s| s.parse().ok()) .collect() }); - if numbers.len() != 3 { + if numbers.len() < 3 { error!( "failed to parse snapkey"; "snap_key" => %name, @@ -1263,12 +1446,14 @@ impl SnapManager { None => return Ok(Box::new(s)), }; for cf_file in &mut s.cf_files { - if cf_file.size == 0 { - continue; + let file_paths = cf_file.file_paths(); + for (i, file_path) in file_paths.iter().enumerate() { + if cf_file.size[i] == 0 { + continue; + } + let reader = snap_io::get_decrypter_reader(file_path, key_manager)?; + cf_file.file_for_sending.push(reader); } - let p = cf_file.path.to_str().unwrap(); - let reader = snap_io::get_decrypter_reader(p, key_manager)?; - cf_file.file_for_sending = Some(reader); } Ok(Box::new(s)) } @@ -1318,6 +1503,10 @@ impl SnapManager { self.max_total_size.store(max_total_size, Ordering::Release); } + pub fn set_max_per_file_size(&mut self, max_per_file_size: u64) { + self.core.max_per_file_size = max_per_file_size; + } + pub fn set_speed_limit(&self, bytes_per_sec: f64) { self.core.limiter.set_speed_limit(bytes_per_sec); } @@ -1457,25 +1646,30 @@ impl SnapManagerCore { } fn rename_tmp_cf_file_for_send(&self, cf_file: &mut CfFile) -> RaftStoreResult<()> { - file_system::rename(&cf_file.tmp_path, &cf_file.path)?; - let mgr = self.encryption_key_manager.as_ref(); - if let Some(mgr) = &mgr { - let src = cf_file.tmp_path.to_str().unwrap(); - let dst = cf_file.path.to_str().unwrap(); - // It's ok that the cf file is moved but machine fails before `mgr.rename_file` - // because without metadata file, saved cf files are nothing. - while let Err(e) = mgr.link_file(src, dst) { - if e.kind() == ErrorKind::AlreadyExists { - mgr.delete_file(dst)?; - continue; + let tmp_file_paths = cf_file.tmp_file_paths(); + let file_paths = cf_file.file_paths(); + for (i, tmp_file_path) in tmp_file_paths.iter().enumerate() { + file_system::rename(&tmp_file_path, &file_paths[i])?; + + let mgr = self.encryption_key_manager.as_ref(); + if let Some(mgr) = &mgr { + let src = &tmp_file_path; + let dst = &file_paths[i]; + // It's ok that the cf file is moved but machine fails before `mgr.rename_file` + // because without metadata file, saved cf files are nothing. + while let Err(e) = mgr.link_file(src, dst) { + if e.kind() == ErrorKind::AlreadyExists { + mgr.delete_file(dst)?; + continue; + } + return Err(e.into()); } - return Err(e.into()); + mgr.delete_file(src)?; } - mgr.delete_file(src)?; + let file = Path::new(&file_paths[i]); + let (checksum, size) = calc_checksum_and_size(&file, mgr)?; + cf_file.add_file_with_size_checksum(i, size, checksum); } - let (checksum, size) = calc_checksum_and_size(&cf_file.path, mgr)?; - cf_file.checksum = checksum; - cf_file.size = size; Ok(()) } } @@ -1484,6 +1678,7 @@ impl SnapManagerCore { pub struct SnapManagerBuilder { max_write_bytes_per_sec: i64, max_total_size: u64, + max_per_file_size: u64, key_manager: Option>, } @@ -1496,6 +1691,10 @@ impl SnapManagerBuilder { self.max_total_size = bytes; self } + pub fn max_per_file_size(mut self, bytes: u64) -> SnapManagerBuilder { + self.max_per_file_size = bytes; + self + } pub fn encryption_key_manager(mut self, m: Option>) -> SnapManagerBuilder { self.key_manager = m; self @@ -1511,6 +1710,11 @@ impl SnapManagerBuilder { } else { u64::MAX }; + let max_per_file_size = if self.max_per_file_size > 0 { + self.max_per_file_size + } else { + u64::MAX + }; SnapManager { core: SnapManagerCore { base: path.into(), @@ -1518,6 +1722,7 @@ impl SnapManagerBuilder { limiter, temp_sst_id: Arc::new(AtomicU64::new(0)), encryption_key_manager: self.key_manager, + max_per_file_size, }, max_total_size: AtomicU64::new(max_total_size), } @@ -1608,6 +1813,29 @@ pub mod tests { Ok(db) } + pub fn open_test_db_with_100keys( + path: &Path, + db_opt: Option, + cf_opts: Option>>, + ) -> Result + where + E: KvEngine + EngineConstructorExt, + { + let p = path.to_str().unwrap(); + let db = E::new_engine(p, db_opt, ALL_CFS, cf_opts).unwrap(); + // write some data into each cf + for (i, cf) in db.cf_names().into_iter().enumerate() { + let mut p = Peer::default(); + p.set_store_id(TEST_STORE_ID); + p.set_id((i + 1) as u64); + for k in 0..100 { + let key = keys::data_key(format!("akey{}", k).as_bytes()); + db.put_msg_cf(cf, &key[..], &p)?; + } + } + Ok(db) + } + pub fn get_test_db_for_regions( path: &TempDir, raft_db_opt: Option, @@ -1702,6 +1930,7 @@ pub mod tests { limiter: Limiter::new(f64::INFINITY), temp_sst_id: Arc::new(AtomicU64::new(0)), encryption_key_manager: None, + max_per_file_size: u64::MAX, } } @@ -1738,35 +1967,39 @@ pub mod tests { for (i, cf) in super::SNAPSHOT_CFS.iter().enumerate() { let f = super::CfFile { cf, - size: 100 * (i + 1) as u64, - checksum: 1000 * (i + 1) as u32, + size: vec![100 * (i + 1) as u64, 100 * (i + 2) as u64], + checksum: vec![1000 * (i + 1) as u32, 1000 * (i + 2) as u32], ..Default::default() }; cf_file.push(f); } let meta = super::gen_snapshot_meta(&cf_file).unwrap(); + let cf_files = meta.get_cf_files(); + assert_eq!(cf_files.len(), super::SNAPSHOT_CFS.len() * 2); // each CF has two snapshot files; for (i, cf_file_meta) in meta.get_cf_files().iter().enumerate() { - if cf_file_meta.get_cf() != cf_file[i].cf { + let cf_file_idx = i / 2; + let size_idx = i % 2; + if cf_file_meta.get_cf() != cf_file[cf_file_idx].cf { panic!( "{}: expect cf {}, got {}", i, - cf_file[i].cf, + cf_file[cf_file_idx].cf, cf_file_meta.get_cf() ); } - if cf_file_meta.get_size() != cf_file[i].size { + if cf_file_meta.get_size() != cf_file[cf_file_idx].size[size_idx] { panic!( "{}: expect cf size {}, got {}", i, - cf_file[i].size, + cf_file[cf_file_idx].size[size_idx], cf_file_meta.get_size() ); } - if cf_file_meta.get_checksum() != cf_file[i].checksum { + if cf_file_meta.get_checksum() != cf_file[cf_file_idx].checksum[size_idx] { panic!( "{}: expect cf checksum {}, got {}", i, - cf_file[i].checksum, + cf_file[cf_file_idx].checksum[size_idx], cf_file_meta.get_checksum() ); } @@ -1787,17 +2020,23 @@ pub mod tests { #[test] fn test_empty_snap_file() { - test_snap_file(open_test_empty_db, None); - test_snap_file(open_test_empty_db, Some(gen_db_options_with_encryption())); + test_snap_file(open_test_empty_db, None, u64::MAX); + test_snap_file(open_test_empty_db, Some(gen_db_options_with_encryption()), u64::MAX); + + test_snap_file(open_test_empty_db, None, 100); + test_snap_file(open_test_empty_db, Some(gen_db_options_with_encryption()), 100); } #[test] fn test_non_empty_snap_file() { - test_snap_file(open_test_db, None); - test_snap_file(open_test_db, Some(gen_db_options_with_encryption())); + test_snap_file(open_test_db, None, u64::MAX); + test_snap_file(open_test_db, Some(gen_db_options_with_encryption()), u64::MAX); + + test_snap_file(open_test_db_with_100keys, None, 100); + test_snap_file(open_test_db_with_100keys, Some(gen_db_options_with_encryption()), 500); } - fn test_snap_file(get_db: DBBuilder, db_opt: Option) { + fn test_snap_file(get_db: DBBuilder, db_opt: Option, max_file_size: u64) { let region_id = 1; let region = gen_test_region(region_id, 1, 1); let src_db_dir = Builder::new() @@ -1814,7 +2053,8 @@ pub mod tests { let key = SnapKey::new(region_id, 1, 1); - let mgr_core = create_manager_core(src_dir.path().to_str().unwrap()); + let mut mgr_core = create_manager_core(src_dir.path().to_str().unwrap()); + mgr_core.max_per_file_size = max_file_size; let mut s1 = Snapshot::new_for_building(src_dir.path(), &key, &mgr_core).unwrap(); @@ -1906,15 +2146,17 @@ pub mod tests { #[test] fn test_empty_snap_validation() { - test_snap_validation(open_test_empty_db); + test_snap_validation(open_test_empty_db, u64::MAX); + test_snap_validation(open_test_empty_db, 100); } #[test] fn test_non_empty_snap_validation() { - test_snap_validation(open_test_db); + test_snap_validation(open_test_db, u64::MAX); + test_snap_validation(open_test_db_with_100keys, 500); } - fn test_snap_validation(get_db: DBBuilder) { + fn test_snap_validation(get_db: DBBuilder, max_file_size: u64) { let region_id = 1; let region = gen_test_region(region_id, 1, 1); let db_dir = Builder::new() @@ -1929,7 +2171,8 @@ pub mod tests { .tempdir() .unwrap(); let key = SnapKey::new(region_id, 1, 1); - let mgr_core = create_manager_core(dir.path().to_str().unwrap()); + let mut mgr_core = create_manager_core(dir.path().to_str().unwrap()); + mgr_core.max_per_file_size = max_file_size; let mut s1 = Snapshot::new_for_building(dir.path(), &key, &mgr_core).unwrap(); assert!(!s1.exists()); @@ -2177,7 +2420,7 @@ pub mod tests { .prefix("test-snapshot-corruption-meta-db") .tempdir() .unwrap(); - let db: KvTestEngine = open_test_db(&db_dir.path(), None, None).unwrap(); + let db: KvTestEngine = open_test_db_with_100keys(db_dir.path(), None, None).unwrap(); let snapshot = db.snapshot(); let dir = Builder::new() @@ -2185,7 +2428,8 @@ pub mod tests { .tempdir() .unwrap(); let key = SnapKey::new(region_id, 1, 1); - let mgr_core = create_manager_core(dir.path().to_str().unwrap()); + let mut mgr_core = create_manager_core(dir.path().to_str().unwrap()); + mgr_core.max_per_file_size = 500; let mut s1 = Snapshot::new_for_building(dir.path(), &key, &mgr_core).unwrap(); assert!(!s1.exists()); diff --git a/components/raftstore/src/store/snap/io.rs b/components/raftstore/src/store/snap/io.rs index 880bb51817a..2c20dd0f36b 100644 --- a/components/raftstore/src/store/snap/io.rs +++ b/components/raftstore/src/store/snap/io.rs @@ -1,9 +1,11 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use std::cell::RefCell; use std::fs::{File, OpenOptions}; use std::io::{self, BufReader, Read, Write}; use std::sync::Arc; use std::{fs, usize}; +use super::{CfFile, Error, IO_LIMITER_CHUNK_SIZE}; use encryption::{ encryption_method_from_db_encryption_method, DataKeyManager, DecrypterReader, EncrypterWriter, Iv, @@ -15,9 +17,7 @@ use engine_traits::{ use kvproto::encryptionpb::EncryptionMethod; use tikv_util::codec::bytes::{BytesEncoder, CompactBytesFromFileDecoder}; use tikv_util::time::Limiter; -use tikv_util::{box_try, debug}; - -use super::{Error, IO_LIMITER_CHUNK_SIZE}; +use tikv_util::{box_try, debug, info}; /// Used to check a procedure is stale or not. pub trait StaleDetector { @@ -34,16 +34,20 @@ pub struct BuildStatistics { /// If there are no key-value pairs fetched, no files will be created at `path`, /// otherwise the file will be created and synchronized. pub fn build_plain_cf_file( - path: &str, + cf_file: &mut CfFile, key_mgr: Option<&Arc>, snap: &E::Snapshot, - cf: &str, start_key: &[u8], end_key: &[u8], ) -> Result where E: KvEngine, { + let cf = cf_file.cf; + let path = cf_file + .path + .join(cf_file.gen_tmp_file_name(0)); + let path = path.to_str().unwrap(); let mut file = Some(box_try!( OpenOptions::new().write(true).create_new(true).open(path) )); @@ -81,6 +85,7 @@ where })); if stats.key_count > 0 { + cf_file.add_file(0); box_try!(BytesEncoder::encode_compact_bytes(&mut writer, b"")); let file = if !should_encrypt { file.unwrap() @@ -99,23 +104,55 @@ where /// Build a snapshot file for the given column family in sst format. /// If there are no key-value pairs fetched, no files will be created at `path`, /// otherwise the file will be created and synchronized. -pub fn build_sst_cf_file( - path: &str, +pub fn build_sst_cf_file_list( + cf_file: &mut CfFile, engine: &E, snap: &E::Snapshot, - cf: CfName, start_key: &[u8], end_key: &[u8], + raw_size_per_file: u64, io_limiter: &Limiter, ) -> Result where E: KvEngine, { - let mut sst_writer = create_sst_file_writer::(engine, cf, path)?; + let cf = cf_file.cf; let mut stats = BuildStatistics::default(); let mut remained_quota = 0; + let mut file_count:usize = 0; + let mut path = cf_file + .path + .join(cf_file.gen_tmp_file_name(file_count)) + .to_str() + .unwrap() + .to_string(); + let sst_writer = RefCell::new(create_sst_file_writer::(engine, cf, &path)?); + let mut file_length: usize = 0; box_try!(snap.scan_cf(cf, start_key, end_key, false, |key, value| { let entry_len = key.len() + value.len(); + if file_length + entry_len > raw_size_per_file as usize { + cf_file.add_file(file_count); // add previous file + file_length = 0; + file_count += 1; + path = cf_file + .path + .join(cf_file.gen_tmp_file_name(file_count)) + .to_str() + .unwrap() + .to_string(); + let result = create_sst_file_writer::(engine, cf, &path); + match result { + Ok(new_sst_writer) => { + let old_writer = sst_writer.replace(new_sst_writer); + box_try!(old_writer.finish()); + box_try!(File::open(&path).and_then(|f| f.sync_all())); + } + Err(e) => { + let io_error = io::Error::new(io::ErrorKind::Other, e); + return Err(io_error.into()); + } + } + } while entry_len > remained_quota { // It's possible to acquire more than necessary, but let it be. io_limiter.blocking_consume(IO_LIMITER_CHUNK_SIZE); @@ -125,18 +162,24 @@ where stats.key_count += 1; stats.total_size += entry_len; - if let Err(e) = sst_writer.put(key, value) { + if let Err(e) = sst_writer.borrow_mut().put(key, value) { let io_error = io::Error::new(io::ErrorKind::Other, e); return Err(io_error.into()); } + file_length += entry_len; Ok(true) })); if stats.key_count > 0 { - box_try!(sst_writer.finish()); + cf_file.add_file(file_count); + box_try!(sst_writer.into_inner().finish()); box_try!(File::open(path).and_then(|f| f.sync_all())); + info!( + "build_sst_cf_file_list builds {} files for cf {}. Total keys {} ", file_count+1, cf, stats.key_count + ); } else { box_try!(fs::remove_file(path)); } + Ok(stats) } @@ -198,13 +241,16 @@ where } } -pub fn apply_sst_cf_file(path: &str, db: &E, cf: &str) -> Result<(), Error> +pub fn apply_sst_cf_file(files: &[&str], db: &E, cf: &str) -> Result<(), Error> where E: KvEngine, { + if files.len() > 1 { + info!("apply_sst_cf_file starts on cf {}. All files {:?}", cf, files); + } let mut ingest_opt = ::IngestExternalFileOptions::new(); ingest_opt.move_files(true); - box_try!(db.ingest_external_file_cf(cf, &ingest_opt, &[path])); + box_try!(db.ingest_external_file_cf(cf, &ingest_opt, files)); Ok(()) } @@ -247,9 +293,10 @@ mod tests { use super::*; use crate::store::snap::tests::*; - use crate::store::snap::SNAPSHOT_CFS; + use crate::store::snap::{SNAPSHOT_CFS, SST_FILE_SUFFIX}; use engine_test::kv::KvTestEngine; use engine_traits::CF_DEFAULT; + use std::path::PathBuf; use tempfile::Builder; use tikv_util::time::Limiter; @@ -278,27 +325,33 @@ mod tests { let snap = db.snapshot(); for cf in SNAPSHOT_CFS { let snap_cf_dir = Builder::new().prefix("test-snap-cf").tempdir().unwrap(); - let plain_file_path = snap_cf_dir.path().join("plain"); + let mut cf_file = CfFile { + cf, + path: PathBuf::from(snap_cf_dir.path().to_str().unwrap()), + file_prefix: "test_plain_sst".to_string(), + file_suffix: SST_FILE_SUFFIX.to_string(), + ..Default::default() + }; let stats = build_plain_cf_file::( - &plain_file_path.to_str().unwrap(), + &mut cf_file, None, &snap, - cf, &keys::data_key(b"a"), &keys::data_end_key(b"z"), ) .unwrap(); if stats.key_count == 0 { - assert_eq!( - fs::metadata(&plain_file_path).unwrap_err().kind(), - io::ErrorKind::NotFound - ); + assert_eq!(cf_file.file_paths().len(), 0); + assert_eq!(cf_file.clone_file_paths().len(), 0); + assert_eq!(cf_file.tmp_file_paths().len(), 0); + assert_eq!(cf_file.size.len(), 0); continue; } let detector = TestStaleDetector {}; + let tmp_file_path = &cf_file.tmp_file_paths()[0]; apply_plain_cf_file( - &plain_file_path.to_str().unwrap(), + tmp_file_path, None, &detector, &db1, @@ -340,40 +393,59 @@ mod tests { #[test] fn test_cf_build_and_apply_sst_files() { - let db_creaters = &[open_test_empty_db, open_test_db]; + let db_creaters = &[open_test_empty_db, open_test_db_with_100keys]; + let max_file_sizes = &[u64::MAX, 100]; let limiter = Limiter::new(INFINITY); - for db_creater in db_creaters { - for db_opt in vec![None, Some(gen_db_options_with_encryption())] { - let dir = Builder::new().prefix("test-snap-cf-db").tempdir().unwrap(); - let db = db_creater(&dir.path(), db_opt.clone(), None).unwrap(); - let snap_cf_dir = Builder::new().prefix("test-snap-cf").tempdir().unwrap(); - let sst_file_path = snap_cf_dir.path().join("sst"); - let stats = build_sst_cf_file::( - &sst_file_path.to_str().unwrap(), - &db, - &db.snapshot(), - CF_DEFAULT, - b"a", - b"z", - &limiter, - ) - .unwrap(); - if stats.key_count == 0 { - assert_eq!( - fs::metadata(&sst_file_path).unwrap_err().kind(), - io::ErrorKind::NotFound - ); - continue; - } - - let dir1 = Builder::new() - .prefix("test-snap-cf-db-apply") - .tempdir() + for max_file_size in max_file_sizes { + for db_creater in db_creaters { + for db_opt in vec![None, Some(gen_db_options_with_encryption())] { + let dir = Builder::new().prefix("test-snap-cf-db").tempdir().unwrap(); + let db = db_creater(dir.path(), db_opt.clone(), None).unwrap(); + let snap_cf_dir = Builder::new().prefix("test-snap-cf").tempdir().unwrap(); + let mut cf_file = CfFile { + cf: CF_DEFAULT, + path: PathBuf::from(snap_cf_dir.path().to_str().unwrap()), + file_prefix: "test_sst".to_string(), + file_suffix: SST_FILE_SUFFIX.to_string(), + ..Default::default() + }; + let stats = build_sst_cf_file_list::( + &mut cf_file, + &db, + &db.snapshot(), + b"a", + b"z", + *max_file_size, + &limiter, + ) .unwrap(); - let db1: KvTestEngine = open_test_empty_db(&dir1.path(), db_opt, None).unwrap(); - apply_sst_cf_file(&sst_file_path.to_str().unwrap(), &db1, CF_DEFAULT).unwrap(); - assert_eq_db(&db, &db1); + if stats.key_count == 0 { + assert_eq!(cf_file.file_paths().len(), 0); + assert_eq!(cf_file.clone_file_paths().len(), 0); + assert_eq!(cf_file.tmp_file_paths().len(), 0); + assert_eq!(cf_file.size.len(), 0); + continue; + } else { + assert!(cf_file.file_paths().len() > 1); + assert!(cf_file.clone_file_paths().len() > 1); + assert!(cf_file.tmp_file_paths().len() > 1); + assert!(cf_file.size.len() > 1); + } + + let dir1 = Builder::new() + .prefix("test-snap-cf-db-apply") + .tempdir() + .unwrap(); + let db1: KvTestEngine = open_test_empty_db(dir1.path(), db_opt, None).unwrap(); + let tmp_file_paths = cf_file.tmp_file_paths(); + let tmp_file_paths = tmp_file_paths + .iter() + .map(|s| s.as_str()) + .collect::>(); + apply_sst_cf_file(&tmp_file_paths, &db1, CF_DEFAULT).unwrap(); + assert_eq_db(&db, &db1); + } } } } diff --git a/components/server/src/server.rs b/components/server/src/server.rs index 4ad590969e8..84133d93339 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -623,6 +623,7 @@ impl TiKVServer { .max_write_bytes_per_sec(bps) .max_total_size(self.config.server.snap_max_total_size.0) .encryption_key_manager(self.encryption_key_manager.clone()) + .max_per_file_size(self.config.server.max_snapshot_file_raw_size.0) .build(snap_path); // Create coprocessor endpoint. diff --git a/components/test_raftstore/src/node.rs b/components/test_raftstore/src/node.rs index 51c5368430a..b4cf67647fa 100644 --- a/components/test_raftstore/src/node.rs +++ b/components/test_raftstore/src/node.rs @@ -227,6 +227,7 @@ impl Simulator for NodeCluster { .max_write_bytes_per_sec(cfg.server.snap_max_write_bytes_per_sec.0 as i64) .max_total_size(cfg.server.snap_max_total_size.0) .encryption_key_manager(key_manager) + .max_per_file_size(cfg.server.max_snapshot_file_raw_size.0) .build(tmp.path().to_str().unwrap()); (snap_mgr, Some(tmp)) } else { diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index e122a92a8cc..4992027ae30 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -339,6 +339,7 @@ impl Simulator for ServerCluster { .max_write_bytes_per_sec(cfg.server.snap_max_write_bytes_per_sec.0 as i64) .max_total_size(cfg.server.snap_max_total_size.0) .encryption_key_manager(key_manager) + .max_per_file_size(cfg.server.max_snapshot_file_raw_size.0) .build(tmp_str); self.snap_mgrs.insert(node_id, snap_mgr.clone()); let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone())); diff --git a/src/server/config.rs b/src/server/config.rs index 29432a5dc69..b497b168f5a 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -132,6 +132,7 @@ pub struct Config { pub end_point_max_concurrency: usize, pub snap_max_write_bytes_per_sec: ReadableSize, pub snap_max_total_size: ReadableSize, + pub max_snapshot_file_raw_size: ReadableSize, #[config(skip)] pub stats_concurrency: usize, #[config(skip)] @@ -220,6 +221,7 @@ impl Default for Config { end_point_max_concurrency: cmp::max(cpu_num as usize, MIN_ENDPOINT_MAX_CONCURRENCY), snap_max_write_bytes_per_sec: ReadableSize(DEFAULT_SNAP_MAX_BYTES_PER_SEC), snap_max_total_size: ReadableSize(0), + max_snapshot_file_raw_size: ReadableSize(0), stats_concurrency: 1, // 300 means gRPC threads are under heavy load if their total CPU usage // is greater than 300%. diff --git a/src/server/snap.rs b/src/server/snap.rs index 5e7e8c3287a..43175849370 100644 --- a/src/server/snap.rs +++ b/src/server/snap.rs @@ -369,10 +369,16 @@ where } else { u64::MAX }; + let max_per_file_size = if incoming.max_snapshot_file_raw_size.0 > 0 { + incoming.max_snapshot_file_raw_size.0 + } else { + u64::MAX + }; self.snap_mgr.set_speed_limit(limit); self.snap_mgr.set_max_total_snap_size(max_total_size); - info!("refresh snapshot manager config"; - "speed_limit"=> limit, + self.snap_mgr.set_max_per_file_size(max_per_file_size); + info!("refresh snapshot manager config"; + "speed_limit"=> limit, "max_total_snap_size"=> max_total_size); self.cfg = incoming.clone(); } diff --git a/tests/integrations/config/mod.rs b/tests/integrations/config/mod.rs index 3f95a2f8953..52430d2046c 100644 --- a/tests/integrations/config/mod.rs +++ b/tests/integrations/config/mod.rs @@ -97,6 +97,7 @@ fn test_serde_custom_tikv_config() { end_point_max_concurrency: 10, snap_max_write_bytes_per_sec: ReadableSize::mb(10), snap_max_total_size: ReadableSize::gb(10), + max_snapshot_file_raw_size: ReadableSize::gb(10), stats_concurrency: 10, heavy_load_threshold: 1000, heavy_load_wait_duration: ReadableDuration::millis(2), diff --git a/tests/integrations/raftstore/test_snap.rs b/tests/integrations/raftstore/test_snap.rs index 7ef6c12517b..d6f0d6c7f94 100644 --- a/tests/integrations/raftstore/test_snap.rs +++ b/tests/integrations/raftstore/test_snap.rs @@ -21,10 +21,11 @@ use test_raftstore::*; use tikv::server::snap::send_snap; use tikv_util::{config::*, time::Instant, HandyRwLock}; -fn test_huge_snapshot(cluster: &mut Cluster) { +fn test_huge_snapshot(cluster: &mut Cluster, max_snapshot_file_size: u64) { cluster.cfg.raft_store.raft_log_gc_count_limit = 1000; cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10); cluster.cfg.raft_store.snap_apply_batch_size = ReadableSize(500); + cluster.cfg.server.max_snapshot_file_raw_size = ReadableSize(max_snapshot_file_size); let pd_client = Arc::clone(&cluster.pd_client); // Disable default max peer count check. pd_client.disable_default_operator(); @@ -92,7 +93,14 @@ fn test_node_huge_snapshot() { fn test_server_huge_snapshot() { let count = 5; let mut cluster = new_server_cluster(0, count); - test_huge_snapshot(&mut cluster); + test_huge_snapshot(&mut cluster, u64::MAX); +} + +#[test] +fn test_server_huge_snapshot_multi_files() { + let count = 5; + let mut cluster = new_server_cluster(0, count); + test_huge_snapshot(&mut cluster, 1024*1024); } #[test] diff --git a/tests/integrations/storage/test_titan.rs b/tests/integrations/storage/test_titan.rs index 23dfd62cb08..610075c2ae9 100644 --- a/tests/integrations/storage/test_titan.rs +++ b/tests/integrations/storage/test_titan.rs @@ -1,7 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::f64::INFINITY; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::thread; use std::time::Duration; @@ -18,7 +18,7 @@ use engine_traits::{ use keys::data_key; use kvproto::metapb::{Peer, Region}; use raftstore::store::RegionSnapshot; -use raftstore::store::{apply_sst_cf_file, build_sst_cf_file}; +use raftstore::store::{apply_sst_cf_file, build_sst_cf_file_list, CfFile}; use tempfile::Builder; use test_raftstore::*; use tikv::config::TiKvConfig; @@ -361,26 +361,26 @@ fn test_delete_files_in_range_for_titan() { assert_eq!(value, 1); // Generate a snapshot - let default_sst_file_path = path.path().join("default.sst"); - let write_sst_file_path = path.path().join("write.sst"); let limiter = Limiter::new(INFINITY); - build_sst_cf_file::( - &default_sst_file_path.to_str().unwrap(), + let mut cf_file = CfFile::new(CF_DEFAULT, PathBuf::from(path.path().to_str().unwrap()), "default".to_string(), ".sst".to_string()); + build_sst_cf_file_list::( + &mut cf_file, &engines.kv, &engines.kv.snapshot(), - CF_DEFAULT, b"", b"{", + u64::MAX, &limiter, ) .unwrap(); - build_sst_cf_file::( - &write_sst_file_path.to_str().unwrap(), + let mut cf_file_write = CfFile::new(CF_WRITE, PathBuf::from(path.path().to_str().unwrap()), "write".to_string(), ".sst".to_string()); + build_sst_cf_file_list::( + &mut cf_file_write, &engines.kv, &engines.kv.snapshot(), - CF_WRITE, b"", b"{", + u64::MAX, &limiter, ) .unwrap(); @@ -391,14 +391,18 @@ fn test_delete_files_in_range_for_titan() { .tempdir() .unwrap(); let engines1 = new_temp_engine(&dir1); + let tmp_file_paths = cf_file.tmp_file_paths(); + let tmp_file_paths = tmp_file_paths.iter().map(|s| s.as_str()).collect::>(); apply_sst_cf_file( - &default_sst_file_path.to_str().unwrap(), + &tmp_file_paths, &engines1.kv, CF_DEFAULT, ) .unwrap(); + let tmp_file_paths = cf_file_write.tmp_file_paths(); + let tmp_file_paths = tmp_file_paths.iter().map(|s| s.as_str()).collect::>(); apply_sst_cf_file( - &write_sst_file_path.to_str().unwrap(), + &tmp_file_paths, &engines1.kv, CF_WRITE, )