diff --git a/CHANGELOG.md b/CHANGELOG.md index da5c36a1..72cb9e8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ Bob versions changelog - Added mimalloc allocator for musl target (#688) #### Changed - +- Avoid Pearl Storage clone (#791) #### Fixed diff --git a/bob-backend/src/pearl/disk_controller.rs b/bob-backend/src/pearl/disk_controller.rs index 722cdf8b..be332e90 100644 --- a/bob-backend/src/pearl/disk_controller.rs +++ b/bob-backend/src/pearl/disk_controller.rs @@ -595,16 +595,10 @@ impl DiskController { let holders = group.holders(); let holders = holders.read().await; for holder in holders.iter() { - let storage = holder.storage().read().await; - let storage = storage.storage().clone(); - let id = holder.get_id(); + let holder = holder.clone(); futures.push(async move { - match storage.close().await { - Ok(_) => debug!("holder {} closed", id), - Err(e) => { - error!("error closing holder{}: {} (disk: {:?})", id, e, self.disk) - } - } + holder.close_storage().await; + debug!("holder {} closed", holder.get_id()); }); } } diff --git a/bob-backend/src/pearl/group.rs b/bob-backend/src/pearl/group.rs index dcfeb1c4..ad84350c 100644 --- a/bob-backend/src/pearl/group.rs +++ b/bob-backend/src/pearl/group.rs @@ -91,7 +91,9 @@ impl Group { pub async fn remount(&self, pp: impl Hooks) -> AnyResult<()> { let _reinit_lock = self.reinit_lock.write().await; - self.holders.write().await.clear(); + let cleared = self.holders.write().await.clear_and_get_values(); + close_holders(cleared.iter()).await; // Close old holders + std::mem::drop(cleared); // This is to guarantee, that all resources will be released before `run_under_reinit_lock` is called self.run_under_reinit_lock(pp).await } @@ -223,9 +225,11 @@ impl Group { // holder but instead try to restart the whole disk if !e.is_possible_disk_disconnection() && !e.is_duplicate() && !e.is_not_ready() { error!("pearl holder will restart: {:?}", e); - holder.try_reinit().await?; - holder.prepare_storage().await?; - debug!("backend pearl group put common storage prepared"); + if let Err(err) = holder.try_reinit().await { + warn!("Pearl backend holder reinit ended with error: {:?}", err); + } else { + debug!("Pearl backend holder reinited"); + } } Err(e) } else { @@ -288,9 +292,11 @@ impl Group { let result = holder.read(key).await; if let Err(e) = &result { if !e.is_key_not_found() && !e.is_not_ready() { - holder.try_reinit().await?; - holder.prepare_storage().await?; - debug!("backend pearl group get common storage prepared"); + if let Err(err) = holder.try_reinit().await { + warn!("Pearl backend holder reinit ended with error: {:?}", err); + } else { + debug!("Pearl backend holder reinited"); + } } } result @@ -400,9 +406,11 @@ impl Group { // holder but instead try to restart the whole disk if !e.is_possible_disk_disconnection() && !e.is_duplicate() && !e.is_not_ready() { error!("pearl holder will restart: {:?}", e); - holder.try_reinit().await?; - holder.prepare_storage().await?; - debug!("backend::pearl::group::delete_common storage prepared"); + if let Err(err) = holder.try_reinit().await { + warn!("Pearl backend holder reinit ended with error: {:?}", err); + } else { + debug!("Pearl backend holder reinited"); + } } Err(e) } else { diff --git a/bob-backend/src/pearl/holder.rs b/bob-backend/src/pearl/holder.rs index 307d1050..26920a4d 100644 --- a/bob-backend/src/pearl/holder.rs +++ b/bob-backend/src/pearl/holder.rs @@ -22,14 +22,21 @@ const SMALL_RECORDS_COUNT_MUL: u64 = 10; /// Struct hold pearl and add put/get/restart api #[derive(Clone, Debug)] pub struct Holder { + storage: Arc>, + inner: Arc +} + +/// Inner Holder data moved into HolderInner to reduce clonning overhead +#[derive(Debug)] +struct HolderInner { start_timestamp: u64, end_timestamp: u64, vdisk: VDiskId, disk_path: PathBuf, config: PearlConfig, - storage: Arc>, - last_modification: Arc, dump_sem: Arc, + last_modification: AtomicU64, + init_protection: Semaphore } impl Holder { @@ -42,83 +49,106 @@ impl Holder { dump_sem: Arc, ) -> Self { Self { - start_timestamp, - end_timestamp, - vdisk, - disk_path, - config, storage: Arc::new(RwLock::new(PearlSync::default())), - last_modification: Arc::new(AtomicU64::new(0)), - dump_sem, + inner: Arc::new(HolderInner { + start_timestamp, + end_timestamp, + vdisk, + disk_path, + config, + dump_sem, + last_modification: AtomicU64::new(0), + init_protection: Semaphore::new(1) + }) } } pub fn start_timestamp(&self) -> u64 { - self.start_timestamp + self.inner.start_timestamp } pub fn end_timestamp(&self) -> u64 { - self.end_timestamp + self.inner.end_timestamp } pub fn get_id(&self) -> String { - self.disk_path + self.inner.disk_path .file_name() .and_then(std::ffi::OsStr::to_str) .unwrap_or("unparsable string") .to_owned() } - pub fn storage(&self) -> &RwLock { - &self.storage - } - pub fn cloned_storage(&self) -> Arc> { self.storage.clone() } pub async fn blobs_count(&self) -> usize { let storage = self.storage.read().await; - storage.blobs_count().await + if let Some(storage) = storage.get() { + storage.blobs_count().await + } else { + 0 + } } pub async fn corrupted_blobs_count(&self) -> usize { let storage = self.storage.read().await; - storage.corrupted_blobs_count() + if let Some(storage) = storage.get() { + storage.corrupted_blobs_count() + } else { + 0 + } } pub async fn active_index_memory(&self) -> usize { let storage = self.storage.read().await; - storage.active_index_memory().await + if let Some(storage) = storage.get() { + storage.active_index_memory().await + } else { + 0 + } } pub async fn index_memory(&self) -> usize { let storage = self.storage.read().await; - storage.index_memory().await + if let Some(storage) = storage.get() { + storage.index_memory().await + } else { + 0 + } } pub async fn has_excess_resources(&self) -> bool { let storage = self.storage.read().await; - storage.inactive_index_memory().await > 0 + if let Some(storage) = storage.get() { + storage.inactive_index_memory().await > 0 + } else { + false + } } pub async fn records_count(&self) -> usize { let storage = self.storage.read().await; - storage.records_count().await + if let Some(storage) = storage.get() { + storage.records_count().await + } else { + 0 + } } pub fn gets_into_interval(&self, timestamp: u64) -> bool { - self.start_timestamp <= timestamp && timestamp < self.end_timestamp + self.inner.start_timestamp <= timestamp && timestamp < self.inner.end_timestamp } pub fn is_outdated(&self) -> bool { let ts = Self::get_current_ts(); - ts > self.end_timestamp + ts > self.inner.end_timestamp } pub fn is_older_than(&self, secs: u64) -> bool { let ts = Self::get_current_ts(); - (ts - secs) > self.end_timestamp + (ts - secs) > self.inner.end_timestamp } pub async fn no_modifications_recently(&self) -> bool { @@ -128,34 +158,40 @@ impl Holder { } pub fn last_modification(&self) -> u64 { - self.last_modification.load(Ordering::Acquire) + self.inner.last_modification.load(Ordering::Acquire) } fn update_last_modification(&self) { - self.last_modification + self.inner.last_modification .store(Self::get_current_ts(), Ordering::Release); } pub async fn has_active_blob(&self) -> bool { - self.storage().read().await.has_active_blob().await + let storage = self.storage.read().await; + if let Some(storage) = storage.get() { + storage.has_active_blob().await + } else { + false + } } pub async fn active_blob_is_empty(&self) -> Option { - self.storage() - .read() - .await - .active_blob_records_count() - .await - .map(|c| c == 0) + let storage = self.storage.read().await; + if let Some(storage) = storage.get() { + storage.records_count_in_active_blob().await.map(|c| c == 0) + } else { + None + } } pub async fn active_blob_is_small(&self) -> Option { - self.storage() - .read() - .await - .active_blob_records_count() - .await - .map(|c| c as u64 * SMALL_RECORDS_COUNT_MUL < self.config.max_data_in_blob()) + let storage = self.storage.read().await; + if let Some(storage) = storage.get() { + storage.records_count_in_active_blob().await + .map(|c| c as u64 * SMALL_RECORDS_COUNT_MUL < self.inner.config.max_data_in_blob()) + } else { + None + } } fn get_current_ts() -> u64 { @@ -174,39 +210,39 @@ impl Holder { // at every moment, but the whole operation will be performed faster (but remember about // disk_sem and other things, which may slow down this concurrent dump) let storage = self.storage.write().await; - storage.storage().close_active_blob_in_background().await; - warn!("Active blob of {} closed", self.get_id()); + if let Some(storage) = storage.get() { + storage.close_active_blob_in_background().await; + warn!("Active blob of {} closed", self.get_id()); + } } pub async fn free_excess_resources(&self) -> usize { let storage = self.storage.read().await; - storage.storage().free_excess_resources().await + if let Some(storage) = storage.get() { + storage.free_excess_resources().await + } else { + 0 + } } pub async fn filter_memory_allocated(&self) -> usize { - self.storage.read().await.filter_memory_allocated().await - } - - pub async fn update(&self, storage: Storage) { - let mut st = self.storage.write().await; - st.set(storage.clone()); - st.ready(); // current pearl disk is ready - debug!( - "update Pearl id: {}, mark as ready, state: {:?}", - self.vdisk, st - ); + let storage = self.storage.read().await; + if let Some(storage) = storage.get() { + storage.filter_memory_allocated().await + } else { + 0 + } } pub async fn write(&self, key: BobKey, data: &BobData) -> BackendResult<()> { let state = self.storage.read().await; - if state.is_ready() { - let storage = state.get(); + if let Some(storage) = state.get() { self.update_last_modification(); - trace!("Vdisk: {}, write key: {}", self.vdisk, key); + trace!("Vdisk: {}, write key: {}", self.inner.vdisk, key); Self::write_disk(storage, Key::from(key), data).await } else { - trace!("Vdisk: {} isn't ready for writing: {:?}", self.vdisk, state); + trace!("Vdisk: {} isn't ready for writing: {:?}", self.inner.vdisk, state); Err(Error::vdisk_is_not_ready()) } } @@ -219,7 +255,7 @@ impl Holder { // @TODO remove redundant return result #[allow(clippy::cast_possible_truncation)] - async fn write_disk(storage: PearlStorage, key: Key, data: &BobData) -> BackendResult<()> { + async fn write_disk(storage: &PearlStorage, key: Key, data: &BobData) -> BackendResult<()> { counter!(PEARL_PUT_COUNTER, 1); let data_size = Self::calc_data_size(&data); let timer = Instant::now(); @@ -254,9 +290,8 @@ impl Holder { #[allow(clippy::cast_possible_truncation)] pub async fn read(&self, key: BobKey) -> Result, Error> { let state = self.storage.read().await; - if state.is_ready() { - let storage = state.get(); - trace!("Vdisk: {}, read key: {}", self.vdisk, key); + if let Some(storage) = state.get() { + trace!("Vdisk: {}, read key: {}", self.inner.vdisk, key); counter!(PEARL_GET_COUNTER, 1); let timer = Instant::now(); let res = storage @@ -281,41 +316,17 @@ impl Holder { counter!(PEARL_GET_TIMER, timer.elapsed().as_nanos() as u64); res } else { - trace!("Vdisk: {} isn't ready for reading: {:?}", self.vdisk, state); + trace!("Vdisk: {} isn't ready for reading: {:?}", self.inner.vdisk, state); Err(Error::vdisk_is_not_ready()) } } - pub async fn try_reinit(&self) -> BackendResult<()> { - let mut state = self.storage.write().await; - if state.is_reinit() { - trace!( - "Vdisk: {} reinitializing now, state: {:?}", - self.vdisk, - state - ); - Err(Error::vdisk_is_not_ready()) - } else { - state.init(); - trace!("Vdisk: {} set as reinit, state: {:?}", self.vdisk, state); - let storage = state.get(); - trace!("Vdisk: {} close old Pearl", self.vdisk); - let result = storage.close().await; - if let Err(e) = result { - error!("can't close pearl storage: {:?}", e); - // we can't do anything - } - Ok(()) - } - } - pub async fn exist(&self, key: BobKey) -> Result, Error> { let state = self.storage.read().await; - if state.is_ready() { - trace!("Vdisk: {}, check key: {}", self.vdisk, key); + if let Some(storage) = state.get() { + trace!("Vdisk: {}, check key: {}", self.inner.vdisk, key); counter!(PEARL_EXIST_COUNTER, 1); let pearl_key = Key::from(key); - let storage = state.get(); let timer = Instant::now(); let res = storage .contains(pearl_key) @@ -328,18 +339,59 @@ impl Holder { counter!(PEARL_EXIST_TIMER, timer.elapsed().as_nanos() as u64); res } else { - trace!("Vdisk: {} not ready for reading: {:?}", self.vdisk, state); + trace!("Vdisk: {} not ready for reading: {:?}", self.inner.vdisk, state); Err(Error::vdisk_is_not_ready()) } } + pub async fn try_reinit(&self) -> BackendResult<()> { + let _init_protection = self.inner.init_protection.try_acquire().map_err(|_| Error::holder_temporary_unavailable())?; + + let old_storage = { + let mut state = self.storage.write().await; + state.reset() + }; + + if let Some(old_storage) = old_storage { + trace!("Vdisk: {} close old Pearl due to reinit", self.inner.vdisk); + if let Err(e) = old_storage.close().await { + error!("can't close pearl storage: {:?}", e); + // Continue anyway + } + } + + match self.create_and_prepare_storage().await { + Ok(storage) => { + let mut state = self.storage.write().await; + state.set_ready(storage).expect("Storage setting successful"); + debug!("update Pearl id: {}, mark as ready, state: ready", self.inner.vdisk); + Ok(()) + } + Err(e) => Err(e), + } + } + pub async fn prepare_storage(&self) -> Result<(), Error> { + let _init_protection = self.inner.init_protection.acquire().await.expect("init_protection semaphore acquire error"); + + match self.create_and_prepare_storage().await { + Ok(storage) => { + let mut st = self.storage.write().await; + st.set_ready(storage).expect("Storage setting successful"); + debug!("update Pearl id: {}, mark as ready, state: ready", self.inner.vdisk); + Ok(()) + } + Err(e) => Err(e), + } + } + + async fn create_and_prepare_storage(&self) -> Result, Error> { debug!("backend pearl holder prepare storage"); - self.config + self.inner.config .try_multiple_times_async( || self.init_holder(), "can't initialize holder", - self.config.fail_retry_timeout(), + self.inner.config.fail_retry_timeout(), ) .await .map_err(|e| { @@ -365,72 +417,67 @@ impl Holder { }) } - async fn init_holder(&self) -> AnyResult<()> { - let f = || Utils::check_or_create_directory(&self.disk_path); - self.config + async fn init_holder(&self) -> AnyResult> { + let f = || Utils::check_or_create_directory(&self.inner.disk_path); + self.inner.config .try_multiple_times_async( f, - &format!("cannot check path: {:?}", self.disk_path), - self.config.fail_retry_timeout(), + &format!("cannot check path: {:?}", self.inner.disk_path), + self.inner.config.fail_retry_timeout(), ) .await?; - self.config + self.inner.config .try_multiple_times_async( - || Utils::drop_pearl_lock_file(&self.disk_path), - &format!("cannot delete lock file: {:?}", self.disk_path), - self.config.fail_retry_timeout(), + || Utils::drop_pearl_lock_file(&self.inner.disk_path), + &format!("cannot delete lock file: {:?}", self.inner.disk_path), + self.inner.config.fail_retry_timeout(), ) .await?; - let storage = self - .config + let mut storage = self + .inner.config .try_multiple_times( || self.init_pearl_by_path(), - &format!("can't init pearl by path: {:?}", self.disk_path), - self.config.fail_retry_timeout(), + &format!("can't init pearl by path: {:?}", self.inner.disk_path), + self.inner.config.fail_retry_timeout(), ) .await .with_context(|| "backend pearl holder init storage failed")?; - self.init_pearl(storage).await?; - debug!("backend pearl holder init holder ready #{}", self.vdisk); - Ok(()) + self.init_pearl(&mut storage).await?; + debug!("backend pearl holder init holder ready #{}", self.inner.vdisk); + Ok(storage) } - async fn init_pearl(&self, mut storage: Storage) -> Result<(), Error> { + async fn init_pearl(&self, storage: &mut Storage) -> Result<(), Error> { let ts = get_current_timestamp(); let res = if self.gets_into_interval(ts) { storage.init().await } else { storage.init_lazy().await }; - match res { - Ok(_) => { - self.update(storage).await; - Ok(()) - } - Err(e) => Err(Error::storage(format!("pearl error: {:?}", e))), - } + + res.map_err(|err| Error::storage(format!("pearl error: {:?}", err))) } pub async fn drop_directory(&self) -> BackendResult<()> { - Utils::drop_directory(&self.disk_path).await + Utils::drop_directory(&self.inner.disk_path).await } fn init_pearl_by_path(&self) -> AnyResult { - let mut builder = Builder::new().work_dir(&self.disk_path); + let mut builder = Builder::new().work_dir(&self.inner.disk_path); - if self.config.allow_duplicates() { + if self.inner.config.allow_duplicates() { builder = builder.allow_duplicates(); } // @TODO add default values to be inserted on deserialisation step - let prefix = self.config.blob_file_name_prefix(); - let max_data = self.config.max_data_in_blob(); - let max_blob_size = self.config.max_blob_size(); + let prefix = self.inner.config.blob_file_name_prefix(); + let max_data = self.inner.config.max_data_in_blob(); + let max_blob_size = self.inner.config.max_blob_size(); let mut filter_config = BloomConfig::default(); - let validate_data_during_index_regen = self.config.validate_data_checksum_during_index_regen(); - if let Some(count) = self.config.max_buf_bits_count() { + let validate_data_during_index_regen = self.inner.config.validate_data_checksum_during_index_regen(); + if let Some(count) = self.inner.config.max_buf_bits_count() { filter_config.max_buf_bits_count = count; debug!("bloom filter max buffer bits count set to: {}", count); } @@ -440,8 +487,8 @@ impl Holder { .max_blob_size(max_blob_size) .set_filter_config(filter_config) .set_validate_data_during_index_regen(validate_data_during_index_regen) - .set_dump_sem(self.dump_sem.clone()); - let builder = if self.config.is_aio_enabled() { + .set_dump_sem(self.inner.dump_sem.clone()); + let builder = if self.inner.config.is_aio_enabled() { match rio::new() { Ok(ioring) => { warn!("bob will start with AIO - async fs io api"); @@ -450,7 +497,7 @@ impl Holder { Err(e) => { warn!("bob will start with standard sync fs io api"); warn!("can't start with AIO, cause: {}", e); - self.config.set_aio(false); + self.inner.config.set_aio(false); builder } } @@ -461,14 +508,13 @@ impl Holder { }; builder .build() - .with_context(|| format!("cannot build pearl by path: {:?}", &self.disk_path)) + .with_context(|| format!("cannot build pearl by path: {:?}", &self.inner.disk_path)) } pub async fn delete(&self, key: BobKey, _meta: &BobMeta, force_delete: bool) -> Result { let state = self.storage.read().await; - if state.is_ready() { - let storage = state.get(); - trace!("Vdisk: {}, delete key: {}", self.vdisk, key); + if let Some(storage) = state.get() { + trace!("Vdisk: {}, delete key: {}", self.inner.vdisk, key); counter!(PEARL_DELETE_COUNTER, 1); let timer = Instant::now(); // TODO: use meta @@ -484,26 +530,30 @@ impl Holder { counter!(PEARL_DELETE_TIMER, timer.elapsed().as_nanos() as u64); res } else { - trace!("Vdisk: {} isn't ready for reading: {:?}", self.vdisk, state); + trace!("Vdisk: {} isn't ready for reading: {:?}", self.inner.vdisk, state); Err(Error::vdisk_is_not_ready()) } } pub async fn close_storage(&self) { - let lck = self.storage(); - let pearl_sync = lck.write().await; - let storage = pearl_sync.storage().clone(); - if let Err(e) = storage.fsyncdata().await { - warn!("pearl fsync error: {:?}", e); - } - if let Err(e) = storage.close().await { - warn!("pearl close error: {:?}", e); + let mut pearl_sync = self.storage.write().await; + if let Some(storage) = pearl_sync.reset() { + if let Err(e) = storage.fsyncdata().await { + warn!("pearl fsync error (path: '{}'): {:?}", self.inner.disk_path.display(), e); + } + if let Err(e) = storage.close().await { + warn!("pearl close error (path: '{}'): {:?}", self.inner.disk_path.display(), e); + } } } pub async fn disk_used(&self) -> u64 { - let storage = self.storage.read().await; - storage.storage().disk_used().await + let storage_guard = self.storage.read().await; + if let Some(storage) = storage_guard.get() { + storage.disk_used().await + } else { + 0 + } } } @@ -511,8 +561,8 @@ impl Holder { impl BloomProvider for Holder { type Filter = as BloomProvider>::Filter; async fn check_filter(&self, item: &Key) -> FilterResult { - let storage = self.storage().read().await; - if let Some(storage) = &storage.storage { + let storage = self.storage.read().await; + if let Some(storage) = storage.get() { return BloomProvider::check_filter(storage, item).await; } FilterResult::NeedAdditionalCheck @@ -523,8 +573,8 @@ impl BloomProvider for Holder { } async fn offload_buffer(&mut self, needed_memory: usize, level: usize) -> usize { - let mut storage = self.storage().write().await; - if let Some(storage) = &mut storage.storage { + let mut storage = self.storage.write().await; + if let Some(storage) = storage.get_mut() { storage.offload_buffer(needed_memory, level).await } else { 0 @@ -532,8 +582,8 @@ impl BloomProvider for Holder { } async fn get_filter(&self) -> Option { - let storage = self.storage().read().await; - if let Some(storage) = &storage.storage { + let storage = self.storage.read().await; + if let Some(storage) = storage.get() { storage.get_filter().await } else { None @@ -545,8 +595,8 @@ impl BloomProvider for Holder { } async fn filter_memory_allocated(&self) -> usize { - let storage = self.storage().read().await; - if let Some(storage) = &storage.storage { + let storage = self.storage.read().await; + if let Some(storage) = storage.get() { storage.filter_memory_allocated().await } else { 0 @@ -554,106 +604,51 @@ impl BloomProvider for Holder { } } -#[derive(Clone, PartialEq, Debug)] +#[derive(Debug)] pub enum PearlState { - // pearl is started and working - Normal, - // pearl restarting + // pearl starting Initializing, + // pearl is started and working + Running(PearlStorage), } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct PearlSync { - storage: Option, state: PearlState, - start_time_test: u8, } impl PearlSync { - pub fn storage(&self) -> &PearlStorage { - self.storage.as_ref().expect("pearl storage") - } - - pub async fn records_count(&self) -> usize { - self.storage().records_count().await - } - - pub async fn active_index_memory(&self) -> usize { - self.storage().active_index_memory().await - } - - pub async fn inactive_index_memory(&self) -> usize { - self.storage().inactive_index_memory().await - } - - pub async fn index_memory(&self) -> usize { - self.storage().index_memory().await - } - - pub async fn active_blob_records_count(&self) -> Option { - self.storage().records_count_in_active_blob().await - } - - pub async fn blobs_count(&self) -> usize { - self.storage().blobs_count().await - } - - pub fn corrupted_blobs_count(&self) -> usize { - self.storage().corrupted_blobs_count() - } - - pub async fn has_active_blob(&self) -> bool { - self.storage().has_active_blob().await - } - - #[inline] - pub fn ready(&mut self) { - self.set_state(PearlState::Normal); + pub fn get(&self) -> Option<&PearlStorage> { + match &self.state { + PearlState::Initializing => None, + PearlState::Running(storage) => Some(storage) + } } - #[inline] - pub fn init(&mut self) { - self.set_state(PearlState::Initializing); + pub fn get_mut(&mut self) -> Option<&mut PearlStorage> { + match &mut self.state { + PearlState::Initializing => None, + PearlState::Running(storage) => Some(storage) + } } #[inline] pub fn is_ready(&self) -> bool { - self.state == PearlState::Normal - } - - #[inline] - pub fn is_reinit(&self) -> bool { - self.state == PearlState::Initializing + matches!(&self.state, PearlState::Running(_)) } - #[inline] - pub fn set_state(&mut self, state: PearlState) { - self.state = state; - } - - #[inline] - pub fn set(&mut self, storage: PearlStorage) { - self.storage = Some(storage); - self.start_time_test += 1; - } - - #[inline] - pub fn get(&self) -> PearlStorage { - self.storage.clone().expect("cloned storage") - } - - pub async fn filter_memory_allocated(&self) -> usize { - if let Some(storage) = &self.storage { - storage.filter_memory_allocated().await - } else { - 0 + pub fn set_ready(&mut self, storage: PearlStorage) -> Result<(), Error> { + if self.is_ready() { + return Err(Error::failed("Pearl storage already initialized. Please, close previous before")); } + self.state = PearlState::Running(storage); + Ok(()) } - pub async fn offload_buffer(&mut self, needed_memory: usize, level: usize) -> usize { - if let Some(storage) = &mut self.storage { - storage.offload_buffer(needed_memory, level).await - } else { - 0 + pub fn reset(&mut self) -> Option { + let prev_state = std::mem::replace(&mut self.state, PearlState::Initializing); + match prev_state { + PearlState::Initializing => None, + PearlState::Running(storage) => Some(storage) } } } @@ -661,9 +656,7 @@ impl PearlSync { impl Default for PearlSync { fn default() -> Self { Self { - storage: None, state: PearlState::Initializing, - start_time_test: 0, } } } diff --git a/bob-backend/src/pearl/hooks.rs b/bob-backend/src/pearl/hooks.rs index c3f127e5..fc47e82a 100644 --- a/bob-backend/src/pearl/hooks.rs +++ b/bob-backend/src/pearl/hooks.rs @@ -24,15 +24,21 @@ impl From<&Holder> for SimpleHolder { impl SimpleHolder { pub(crate) async fn filter_memory_allocated(&self) -> usize { - self.storage.read().await.filter_memory_allocated().await + let storage = self.storage.read().await; + if let Some(storage) = storage.get() { + storage.filter_memory_allocated().await + } else { + 0 + } } pub(crate) async fn offload_filter(&self, needed_memory: usize, level: usize) -> usize { - self.storage - .write() - .await - .offload_buffer(needed_memory, level) - .await + let mut storage = self.storage.write().await; + if let Some(storage) = storage.get_mut() { + storage.offload_buffer(needed_memory, level).await + } else { + 0 + } } pub(crate) fn timestamp(&self) -> u64 {