From c41423f7f4f0f9858b0f77c56885210b4625c65e Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Mon, 9 Dec 2024 21:20:31 +0100 Subject: [PATCH 1/2] feat(storage): rework raw cells cache --- storage/src/store/shard_state/cell_storage.rs | 497 +++++++++--------- storage/src/store/shard_state/mod.rs | 20 +- .../src/store/shard_state/store_state_raw.rs | 6 +- 3 files changed, 272 insertions(+), 251 deletions(-) diff --git a/storage/src/store/shard_state/cell_storage.rs b/storage/src/store/shard_state/cell_storage.rs index dfdfde7ed..2e104e8b2 100644 --- a/storage/src/store/shard_state/cell_storage.rs +++ b/storage/src/store/shard_state/cell_storage.rs @@ -3,11 +3,11 @@ use std::collections::hash_map; use std::mem::{ManuallyDrop, MaybeUninit}; use std::sync::atomic::{AtomicI64, AtomicU8, Ordering}; use std::sync::{Arc, Weak}; +use std::time::Instant; use anyhow::{Context, Result}; use bumpalo::Bump; use everscale_types::cell::*; -use parking_lot::Mutex; use quick_cache::sync::{Cache, DefaultLifecycle}; use triomphe::ThinArc; use tycho_util::metrics::HistogramGuard; @@ -20,8 +20,7 @@ use crate::db::*; pub struct CellStorage { db: BaseDb, cells_cache: Arc, - raw_cells_cache: RawCellsCache, - pending: PendingOperations, + raw_cells_cache: Arc, } type CellsIndex = FastDashMap>; @@ -29,19 +28,23 @@ type CellsIndex = FastDashMap>; impl CellStorage { pub fn new(db: BaseDb, cache_size_bytes: u64) -> Arc { let cells_cache = Default::default(); - let raw_cells_cache = RawCellsCache::new(cache_size_bytes); + let raw_cells_cache = Arc::new(RawCellsCache::new(cache_size_bytes)); Arc::new(Self { db, cells_cache, raw_cells_cache, - pending: PendingOperations::default(), }) } pub fn apply_temp_cell(&self, root: &HashBytes) -> Result<()> { const MAX_NEW_CELLS_BATCH_SIZE: usize = 10000; + struct TempCell { + old_rc: i64, + additions: u32, + } + struct CellHashesIter<'a> { data: rocksdb::DBPinnableSlice<'a>, offset: usize, @@ -81,7 +84,7 @@ impl CellStorage { cells_cf: BoundedCfHandle<'a>, db: &'a BaseDb, buffer: Vec, - transaction: FastHashMap, + transaction: FastHashMap, new_cells_batch: rocksdb::WriteBatch, new_cell_count: usize, raw_cache: &'a RawCellsCache, @@ -136,7 +139,7 @@ impl CellStorage { ) -> Result, CellStorageError> { Ok(match self.transaction.entry(*key) { hash_map::Entry::Occupied(mut entry) => { - *entry.get_mut() += 1; // 1 new reference + entry.get_mut().additions += 1; // 1 new reference InsertedCell::Existing } hash_map::Entry::Vacant(entry) => { @@ -144,12 +147,18 @@ impl CellStorage { let (rc, value) = refcount::decode_value_with_rc(value.as_ref()); debug_assert!(rc > 0 && value.is_some() || rc == 0 && value.is_none()); if value.is_some() { - entry.insert(1); // 1 new reference + entry.insert(TempCell { + old_rc: rc, + additions: 1, // 1 new reference + }); return Ok(InsertedCell::Existing); } } - entry.insert(0); // 0 new references (the first one is included in the merge below) + entry.insert(TempCell { + old_rc: 0, + additions: 1, + }); let iter = self.load_temp(key)?; self.buffer.clear(); @@ -159,8 +168,6 @@ impl CellStorage { &mut self.buffer, ); - self.raw_cache.add_refs(key, 1); - self.new_cells_batch .put_cf(&self.cells_cf, key, self.buffer.as_slice()); @@ -184,18 +191,24 @@ impl CellStorage { Ok(()) } - fn flush_existing_cells(&mut self) -> Result<(), rocksdb::Error> { + fn flush_existing_cells(mut self) -> Result<(), rocksdb::Error> { let mut batch = rocksdb::WriteBatch::default(); - for (key, &refs_diff) in &self.transaction { - if refs_diff == 0 { - continue; + for (key, item) in self.transaction { + let mut refs_diff = item.additions; + if item.old_rc == 0 { + // 1 reference was added with the data while traversing the tree. + refs_diff -= 1; } - self.buffer.clear(); - refcount::add_positive_refount(refs_diff, None, &mut self.buffer); - self.raw_cache.add_refs(key, refs_diff); - batch.merge_cf(&self.cells_cf, key, self.buffer.as_slice()); + if refs_diff > 0 { + self.buffer.clear(); + refcount::add_positive_refount(refs_diff, None, &mut self.buffer); + batch.merge_cf(&self.cells_cf, key, self.buffer.as_slice()); + } + + let new_rc = item.old_rc + item.additions as i64; + self.raw_cache.on_insert_cell(&key, new_rc, None); } self.db.rocksdb().write(batch) @@ -236,66 +249,42 @@ impl CellStorage { pub fn store_cell( &self, batch: &mut WriteBatch, - root: Cell, + root: &DynCell, estimated_cell_count: usize, - ) -> Result<(PendingOperation<'_>, usize), CellStorageError> { - struct CellWithRefs<'a> { - rc: u32, + ) -> Result { + struct AddedCell<'a> { + old_rc: i64, + additions: u32, data: Option<&'a [u8]>, } struct Context<'a> { db: &'a BaseDb, - raw_cache: &'a RawCellsCache, + raw_cells_cache: &'a RawCellsCache, alloc: &'a Bump, - transaction: FastHashMap>, + transaction: FastHashMap<&'a HashBytes, AddedCell<'a>>, buffer: Vec, } - impl Context<'_> { + impl<'a> Context<'a> { fn insert_cell( &mut self, - key: &HashBytes, - cell: &DynCell, + cell: &'a DynCell, depth: usize, ) -> Result { - Ok(match self.transaction.entry(*key) { + let key = cell.repr_hash(); + Ok(match self.transaction.entry(key) { hash_map::Entry::Occupied(mut value) => { - value.get_mut().rc += 1; + value.get_mut().additions += 1; false } hash_map::Entry::Vacant(entry) => { - // A constant which tells since which depth we should start to use cache. - // This method is used mostly for inserting new states, so we can assume - // that first N levels will mostly be new. - // - // This value was chosen empirically. - const NEW_CELLS_DEPTH_THRESHOLD: usize = 4; - - let (old_rc, has_value) = 'value: { - if depth >= NEW_CELLS_DEPTH_THRESHOLD { - // NOTE: `get` here is used to affect a "hotness" of the value, because - // there is a big chance that we will need it soon during state processing - if let Some(entry) = self.raw_cache.0.get(key) { - let rc = entry.header.header.load(Ordering::Acquire); - break 'value (rc, rc > 0); - } - } - - match self.db.cells.get(key).map_err(CellStorageError::Internal)? { - Some(value) => { - let (rc, value) = - refcount::decode_value_with_rc(value.as_ref()); - (rc, value.is_some()) - } - None => (0, false), - } - }; + let old_rc = self + .raw_cells_cache + .get_rc_for_insert(self.db, key, depth)?; - // TODO: lower to `debug_assert` when sure - assert!(has_value && old_rc > 0 || !has_value && old_rc == 0); - - let data = if !has_value { + let is_new = old_rc == 0; + let data = if is_new { self.buffer.clear(); if StorageCell::serialize_to(cell, &mut self.buffer).is_err() { return Err(CellStorageError::InvalidCell); @@ -304,8 +293,13 @@ impl CellStorage { } else { None }; - entry.insert(CellWithRefs { rc: 1, data }); - !has_value + + entry.insert(AddedCell { + old_rc, + additions: 1, + data, + }); + is_new } }) } @@ -313,28 +307,26 @@ impl CellStorage { fn finalize(mut self, batch: &mut rocksdb::WriteBatch) -> usize { let total = self.transaction.len(); let cells_cf = &self.db.cells.cf(); - for (key, CellWithRefs { rc, data }) in self.transaction { + + for (key, item) in self.transaction { self.buffer.clear(); - refcount::add_positive_refount(rc, data, &mut self.buffer); - if let Some(data) = data { - self.raw_cache.insert(&key, rc, data); - } else { - self.raw_cache.add_refs(&key, rc); - } + refcount::add_positive_refount(item.additions, item.data, &mut self.buffer); batch.merge_cf(cells_cf, key.as_slice(), &self.buffer); + + let new_rc = item.old_rc + item.additions as i64; + self.raw_cells_cache.on_insert_cell(key, new_rc, item.data); } + total } } - let pending_op = self.pending.begin(); + let alloc = bumpalo::Bump::new(); // Prepare context and handles - let alloc = Bump::new(); - let mut ctx = Context { db: &self.db, - raw_cache: &self.raw_cells_cache, + raw_cells_cache: &self.raw_cells_cache, alloc: &alloc, transaction: FastHashMap::with_capacity_and_hasher( estimated_cell_count, @@ -343,42 +335,34 @@ impl CellStorage { buffer: Vec::with_capacity(512), }; - // Check root cell - { - let key = root.repr_hash(); - - if !ctx.insert_cell(key, root.as_ref(), 0)? { - return Ok((pending_op, 0)); + 'visit: { + // Check root cell + if !ctx.insert_cell(root.as_ref(), 0)? { + break 'visit; } - } - - let mut stack = Vec::with_capacity(16); - stack.push(root.references()); - - // Check other cells - 'outer: loop { - let depth = stack.len(); - let Some(iter) = stack.last_mut() else { - break; - }; - - for child in &mut *iter { - let key = child.repr_hash(); + let mut stack = Vec::with_capacity(16); + stack.push(root.references()); + + // Check other cells + 'outer: loop { + let depth = stack.len(); + let Some(iter) = stack.last_mut() else { + break; + }; - if ctx.insert_cell(key, child, depth)? { - stack.push(child.references()); - continue 'outer; + for child in &mut *iter { + if ctx.insert_cell(child, depth)? { + stack.push(child.references()); + continue 'outer; + } } - } - stack.pop(); + stack.pop(); + } } - // Clear big chunks of data before finalization - drop(stack); - // Write transaction to the `WriteBatch` - Ok((pending_op, ctx.finalize(batch))) + Ok(ctx.finalize(batch)) } pub fn load_cell( @@ -393,19 +377,12 @@ impl CellStorage { } } - let cell = match self.raw_cells_cache.get_raw(&self.db, &hash, &self.pending) { - Ok(value) => 'cell: { - if let Some(value) = value { - let rc = &value.header.header; - if rc.load(Ordering::Acquire) > 0 { - match StorageCell::deserialize(self.clone(), &value.slice) { - Some(cell) => break 'cell Arc::new(cell), - None => return Err(CellStorageError::InvalidCell), - } - } - } - return Err(CellStorageError::CellNotFound); - } + let cell = match self.raw_cells_cache.get_raw(&self.db, &hash) { + Ok(Some(value)) => match StorageCell::deserialize(self.clone(), &value.slice) { + Some(cell) => Arc::new(cell), + None => return Err(CellStorageError::InvalidCell), + }, + Ok(None) => return Err(CellStorageError::CellNotFound), Err(e) => return Err(CellStorageError::Internal(e)), }; @@ -424,18 +401,18 @@ impl CellStorage { &self, alloc: &Bump, hash: &HashBytes, - ) -> Result<(PendingOperation<'_>, usize, WriteBatch), CellStorageError> { + ) -> Result<(usize, WriteBatch), CellStorageError> { #[derive(Clone, Copy)] - struct CellState<'a> { - rc: i64, + struct RemovedCell<'a> { + old_rc: i64, removes: u32, refs: &'a [HashBytes], } - impl<'a> CellState<'a> { + impl<'a> RemovedCell<'a> { fn remove(&mut self) -> Result, CellStorageError> { self.removes += 1; - if self.removes as i64 <= self.rc { + if self.removes as i64 <= self.old_rc { Ok(self.next_refs()) } else { Err(CellStorageError::CounterMismatch) @@ -443,7 +420,7 @@ impl CellStorage { } fn next_refs(&self) -> Option<&'a [HashBytes]> { - if self.rc > self.removes as i64 { + if self.old_rc > self.removes as i64 { None } else { Some(self.refs) @@ -451,44 +428,52 @@ impl CellStorage { } } - let pending_op = self.pending.begin(); - let cells = &self.db.cells; let cells_cf = &cells.cf(); - let mut transaction: FastHashMap<&HashBytes, CellState<'_>> = + let mut transaction: FastHashMap<&HashBytes, RemovedCell<'_>> = FastHashMap::with_capacity_and_hasher(128, Default::default()); let mut buffer = Vec::with_capacity(4); let mut stack = Vec::with_capacity(16); - stack.push(hash); + stack.push(std::slice::from_ref(hash).iter()); // While some cells left - while let Some(cell_id) = stack.pop() { - let refs = match transaction.entry(cell_id) { - hash_map::Entry::Occupied(mut v) => v.get_mut().remove()?, - hash_map::Entry::Vacant(v) => { - let rc = - self.raw_cells_cache - .get_raw_for_delete(&self.db, cell_id, &mut buffer)?; - debug_assert!(rc > 0); - - v.insert(CellState { - rc, - removes: 1, - refs: alloc.alloc_slice_copy(buffer.as_slice()), - }) - .next_refs() - } + 'outer: loop { + let Some(iter) = stack.last_mut() else { + break; }; - if let Some(refs) = refs { - // Add all children - for cell_id in refs { - // Unknown cell, push to the stack to process it - stack.push(cell_id); + for cell_id in iter.by_ref() { + // Process the current cell. + let refs = match transaction.entry(cell_id) { + hash_map::Entry::Occupied(mut v) => v.get_mut().remove()?, + hash_map::Entry::Vacant(v) => { + let old_rc = self.raw_cells_cache.get_rc_for_delete( + &self.db, + cell_id, + &mut buffer, + )?; + debug_assert!(old_rc > 0); + + v.insert(RemovedCell { + old_rc, + removes: 1, + refs: alloc.alloc_slice_copy(buffer.as_slice()), + }) + .next_refs() + } + }; + + if let Some(refs) = refs { + // And proceed to its refs if any. + stack.push(refs.iter()); + continue 'outer; } } + + // Drop the current cell when all of its children were processed. + stack.pop(); } // Clear big chunks of data before finalization @@ -502,15 +487,18 @@ impl CellStorage { // and a bit more just in case. let mut batch = WriteBatch::with_capacity_bytes(total * (32 + 8 + 8)); - for (key, CellState { removes, .. }) in transaction { - self.raw_cells_cache.remove_refs(key, removes); + for (key, item) in transaction { batch.merge_cf( cells_cf, key.as_slice(), - refcount::encode_negative_refcount(removes), + refcount::encode_negative_refcount(item.removes), ); + + let new_rc = item.old_rc - item.removes as i64; + self.raw_cells_cache.on_remove_cell(key, new_rc); } - Ok((pending_op, total, batch)) + + Ok((total, batch)) } pub fn drop_cell(&self, hash: &HashBytes) { @@ -875,7 +863,10 @@ impl StorageCellReferenceData { } } -struct RawCellsCache(Cache); +struct RawCellsCache { + inner: Cache, + rocksdb_access_histogram: metrics::Histogram, +} type RawCellsCacheItem = ThinArc; @@ -893,6 +884,8 @@ impl quick_cache::Weighter for CellSizeEstimator { } impl RawCellsCache { + const RC_NAN: i64 = i64::MAX; + fn new(size_in_bytes: u64) -> Self { // Percentile 0.1% from 96 to 127 => 1725119 count // Percentile 10% from 128 to 191 => 82838849 count @@ -917,6 +910,7 @@ impl RawCellsCache { const MAX_CELL_SIZE: u64 = 192; const KEY_SIZE: u64 = 32; + const SHARDS: usize = 512; let estimated_cell_cache_capacity = size_in_bytes / (KEY_SIZE + MAX_CELL_SIZE); tracing::info!( @@ -924,54 +918,103 @@ impl RawCellsCache { max_cell_cache_size = %bytesize::ByteSize(size_in_bytes), ); - let raw_cache = Cache::with( - estimated_cell_cache_capacity as usize, - size_in_bytes, + let inner = Cache::with_options( + quick_cache::OptionsBuilder::new() + .shards(SHARDS) + .estimated_items_capacity(estimated_cell_cache_capacity as usize) + .weight_capacity(size_in_bytes) + .hot_allocation(0.8) + .build() + .unwrap(), CellSizeEstimator, FastHasherState::default(), DefaultLifecycle::default(), ); - Self(raw_cache) + Self { + inner, + rocksdb_access_histogram: metrics::histogram!( + "tycho_storage_get_cell_from_rocksdb_time" + ), + } } fn get_raw( &self, db: &BaseDb, key: &HashBytes, - pending: &PendingOperations, ) -> Result, rocksdb::Error> { use quick_cache::sync::GuardResult; - match self.0.get_value_or_guard(key, None) { + match self.inner.get_value_or_guard(key, None) { GuardResult::Value(value) => Ok(Some(value)), - GuardResult::Guard(g) => Ok( - if let Some(value) = { - let _histogram = - HistogramGuard::begin("tycho_storage_get_cell_from_rocksdb_time"); - db.cells.get(key.as_slice())? - } { - let (rc, data) = refcount::decode_value_with_rc(value.as_ref()); - data.map(|value| { - let value = - RawCellsCacheItem::from_header_and_slice(AtomicI64::new(rc), value); + GuardResult::Guard(g) => { + let value = { + let started_at = Instant::now(); + scopeguard::defer! { + self.rocksdb_access_histogram.record(started_at.elapsed()); + } - pending.run_if_none(|| { - // Insert value to the cache only if there are no pending operations - _ = g.insert(value.clone()); - }); + db.cells.get(key.as_slice())? + }; + Ok(if let Some(value) = value { + let (_, data) = refcount::decode_value_with_rc(value.as_ref()); + data.map(|value| { + let value = RawCellsCacheItem::from_header_and_slice( + AtomicI64::new(Self::RC_NAN), + value, + ); + _ = g.insert(value.clone()); value }) } else { None - }, - ), + }) + } GuardResult::Timeout => unreachable!(), } } - fn get_raw_for_delete( + fn get_rc_for_insert( + &self, + db: &BaseDb, + key: &HashBytes, + depth: usize, + ) -> Result { + // A constant which tells since which depth we should start to use cache. + // This method is used mostly for inserting new states, so we can assume + // that first N levels will mostly be new. + // + // This value was chosen empirically. + const NEW_CELLS_DEPTH_THRESHOLD: usize = 4; + + if depth >= NEW_CELLS_DEPTH_THRESHOLD { + // NOTE: `get` here is used to affect a "hotness" of the value, because + // there is a big chance that we will need it soon during state processing + if let Some(entry) = self.inner.get(key) { + let rc = entry.header.header.load(Ordering::Acquire); + if rc != Self::RC_NAN { + return Ok(rc); + } + } + } + + match db.cells.get(key).map_err(CellStorageError::Internal)? { + Some(value) => { + let (rc, value) = refcount::decode_value_with_rc(value.as_ref()); + + // TODO: lower to `debug_assert` when sure + let has_value = value.is_some(); + assert!(has_value && rc > 0 || !has_value && rc == 0); + + Ok(rc) + } + None => Ok(0), + } + } + + fn get_rc_for_delete( &self, db: &BaseDb, key: &HashBytes, @@ -980,84 +1023,64 @@ impl RawCellsCache { refs_buffer.clear(); // NOTE: `peek` here is used to avoid affecting a "hotness" of the value - if let Some(value) = self.0.peek(key) { + if let Some(value) = self.inner.peek(key) { let rc = value.header.header.load(Ordering::Acquire); if rc <= 0 { return Err(CellStorageError::CellNotFound); + } else if rc != i64::MAX { + return StorageCell::deserialize_references(&value.slice, refs_buffer) + .then_some(rc) + .ok_or(CellStorageError::InvalidCell); } + } - StorageCell::deserialize_references(&value.slice, refs_buffer) - .then_some(rc) - .ok_or(CellStorageError::InvalidCell) - } else { - match db.cells.get(key.as_slice()) { - Ok(value) => { - if let Some(value) = value { - if let (rc, Some(value)) = refcount::decode_value_with_rc(&value) { - return StorageCell::deserialize_references(value, refs_buffer) - .then_some(rc) - .ok_or(CellStorageError::InvalidCell); - } + match db.cells.get(key.as_slice()) { + Ok(value) => { + if let Some(value) = value { + if let (rc, Some(value)) = refcount::decode_value_with_rc(&value) { + return StorageCell::deserialize_references(value, refs_buffer) + .then_some(rc) + .ok_or(CellStorageError::InvalidCell); } - - Err(CellStorageError::CellNotFound) } - Err(e) => Err(CellStorageError::Internal(e)), + + Err(CellStorageError::CellNotFound) } + Err(e) => Err(CellStorageError::Internal(e)), } } - fn insert(&self, key: &HashBytes, refs: u32, value: &[u8]) { - let value = RawCellsCacheItem::from_header_and_slice(AtomicI64::new(refs as _), value); - self.0.insert(*key, value); - } - - fn add_refs(&self, key: &HashBytes, refs: u32) { - // NOTE: `peek` here is used to avoid affecting a "hotness" of the value - if let Some(v) = self.0.peek(key) { - v.header.header.fetch_add(refs as i64, Ordering::Release); + fn on_insert_cell(&self, key: &HashBytes, rc: i64, data: Option<&[u8]>) { + match data { + None => { + // NOTE: `get` here is used to affect a "hotness" of the value + if let Some(v) = self.inner.get(key) { + v.header.header.store(rc, Ordering::Release); + } + } + Some(data) => self.inner.insert( + *key, + RawCellsCacheItem::from_header_and_slice(AtomicI64::new(rc), data), + ), } } - fn remove_refs(&self, key: &HashBytes, refs: u32) { - // NOTE: `peek` here is used to avoid affecting a "hotness" of the value - if let Some(v) = self.0.peek(key) { - let old_refs = v.header.header.fetch_sub(refs as i64, Ordering::Release); - debug_assert!(old_refs >= refs as i64); - } - } -} + fn on_remove_cell(&self, key: &HashBytes, rc: i64) { + let v = if rc <= 0 { + debug_assert_eq!(rc, 0, "too many removed cells"); -#[derive(Default)] -struct PendingOperations { - // TODO: Replace with two atomic counters for inserts and pending operations - pending_count: Mutex, -} - -impl PendingOperations { - fn begin(&self) -> PendingOperation<'_> { - *self.pending_count.lock() += 1; - PendingOperation { operations: self } - } - - #[inline] - fn run_if_none(&self, f: F) { - let guard = self.pending_count.lock(); - if *guard == 0 { - f(); - } - - // NOTE: Make sure to drop the lock only after the operation is executed - drop(guard); - } -} - -pub struct PendingOperation<'a> { - operations: &'a PendingOperations, -} + match self.inner.remove(key) { + None => return, + Some((_, v)) => v, + } + } else { + // NOTE: `peek` here is used to avoid affecting a "hotness" of the value + match self.inner.peek(key) { + None => return, + Some(v) => v, + } + }; -impl Drop for PendingOperation<'_> { - fn drop(&mut self) { - *self.operations.pending_count.lock() -= 1; + v.header.header.store(rc, Ordering::Release); } } diff --git a/storage/src/store/shard_state/mod.rs b/storage/src/store/shard_state/mod.rs index d263c1257..5456c7adb 100644 --- a/storage/src/store/shard_state/mod.rs +++ b/storage/src/store/shard_state/mod.rs @@ -121,8 +121,13 @@ impl ShardStateStorage { let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes); let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time"); - let (pending_op, new_cell_count) = - cell_storage.store_cell(&mut batch, root_cell, estimated_merkle_update_size)?; + + let new_cell_count = cell_storage.store_cell( + &mut batch, + root_cell.as_ref(), + estimated_merkle_update_size, + )?; + in_mem_store.finish(); metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64); @@ -136,8 +141,7 @@ impl ShardStateStorage { raw_db.write(batch)?; - // Ensure that pending operation guard is dropped after the batch is written - drop(pending_op); + drop(root_cell); hist.finish(); @@ -170,6 +174,8 @@ impl ShardStateStorage { }; let block_id = *block_id; + + let _gc_lock = self.gc_lock.lock().await; tokio::task::spawn_blocking(move || ctx.store(&block_id, boc)).await? } @@ -242,17 +248,13 @@ impl ShardStateStorage { let key = key.to_vec(); let (total, inner_alloc) = tokio::task::spawn_blocking(move || { - let (pending_op, stats, mut batch) = - cell_storage.remove_cell(&alloc, &root_hash)?; + let (stats, mut batch) = cell_storage.remove_cell(&alloc, &root_hash)?; batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key); db.raw() .rocksdb() .write_opt(batch, db.cells.write_config())?; - // Ensure that pending operation guard is dropped after the batch is written - drop(pending_op); - Ok::<_, anyhow::Error>((stats, alloc)) }) .await??; diff --git a/storage/src/store/shard_state/store_state_raw.rs b/storage/src/store/shard_state/store_state_raw.rs index 37e991b71..e3299cdef 100644 --- a/storage/src/store/shard_state/store_state_raw.rs +++ b/storage/src/store/shard_state/store_state_raw.rs @@ -626,15 +626,11 @@ mod test { // check that state actually exists let cell = cell_storage.load_cell(HashBytes::from_slice(value.as_ref()))?; - let (pending_op, _, batch) = - cell_storage.remove_cell(&bump, cell.hash(LevelMask::MAX_LEVEL))?; + let (_, batch) = cell_storage.remove_cell(&bump, cell.hash(LevelMask::MAX_LEVEL))?; // execute batch db.rocksdb().write_opt(batch, db.cells.write_config())?; - // Ensure that pending operation guard is dropped after the batch is written - drop(pending_op); - tracing::info!("State deleted. Progress: {}/{total_states}", deleted + 1); } From 4385a9a1f55260fe875d9b423b32a87bb9b43fb9 Mon Sep 17 00:00:00 2001 From: Vladimir Petrzhikovskii Date: Mon, 9 Dec 2024 13:49:53 +0100 Subject: [PATCH 2/2] feat: add cells cache metrics --- scripts/gen-dashboard.py | 77 +++++++++++++++---- storage/src/store/shard_state/cell_storage.rs | 14 +++- 2 files changed, 75 insertions(+), 16 deletions(-) diff --git a/scripts/gen-dashboard.py b/scripts/gen-dashboard.py index e95f46718..79934f177 100644 --- a/scripts/gen-dashboard.py +++ b/scripts/gen-dashboard.py @@ -759,13 +759,54 @@ def storage() -> RowPanel: create_heatmap_panel( "tycho_storage_load_cell_time", "Time to load cell from storage" ), + create_counter_panel( + expr_sum_rate("tycho_storage_load_cell_time_count"), + "Number of load_cell calls", + UNITS.OPS_PER_SEC, + ), create_heatmap_panel( "tycho_storage_get_cell_from_rocksdb_time", "Time to load cell from RocksDB" ), + create_counter_panel( + expr_sum_rate("tycho_storage_get_cell_from_rocksdb_time_count"), + "Number of cache missed cell loads", + UNITS.OPS_PER_SEC, + ), + timeseries_panel( + title="Storage Cache Hit Rate", + targets=[ + target( + expr=expr_operator( + expr_operator( + "1", + "-", + expr_operator( + expr_sum_rate( + "tycho_storage_get_cell_from_rocksdb_time_count", + ), + "/", + expr_sum_rate( + "tycho_storage_load_cell_time_count", + ), + ), + ), + "*", + "100", + ), + legend_format="Hit Rate", + ) + ], + unit=UNITS.PERCENT_FORMAT, + ), + create_counter_panel( + "tycho_storage_raw_cells_cache_size", + "Raw cells cache size", + UNITS.BYTES_IEC, + ), create_heatmap_quantile_panel( "tycho_storage_store_block_data_size", "Block data size", - UNITS.BYTES, + UNITS.BYTES_IEC, "0.999", ), create_heatmap_quantile_panel( @@ -1631,7 +1672,7 @@ def mempool_point_rates() -> RowPanel: create_counter_panel( "tycho_mempool_msgs_unique_bytes", "Adapter: unique externals size", - unit_format=UNITS.BYTES, + unit_format=UNITS.BYTES_IEC, ), create_counter_panel( "tycho_mempool_msgs_duplicates_count", @@ -1640,7 +1681,7 @@ def mempool_point_rates() -> RowPanel: create_counter_panel( "tycho_mempool_msgs_duplicates_bytes", "Adapter: removed duplicate externals size", - unit_format=UNITS.BYTES, + unit_format=UNITS.BYTES_IEC, ), create_counter_panel( "tycho_mempool_point_payload_count", @@ -1649,7 +1690,7 @@ def mempool_point_rates() -> RowPanel: create_counter_panel( "tycho_mempool_point_payload_bytes", "Engine: points payload size", - unit_format=UNITS.BYTES, + unit_format=UNITS.BYTES_IEC, ), create_counter_panel( "tycho_mempool_evicted_externals_count", @@ -1658,7 +1699,7 @@ def mempool_point_rates() -> RowPanel: create_counter_panel( "tycho_mempool_evicted_externals_size", "Input buffer: evicted externals size", - unit_format=UNITS.BYTES, + unit_format=UNITS.BYTES_IEC, ), ] return create_row("Mempool point rates", metrics) @@ -1983,15 +2024,23 @@ def collator_execution_manager() -> RowPanel: def allocator_stats() -> RowPanel: metrics = [ - create_gauge_panel("jemalloc_allocated_bytes", "Allocated Bytes", UNITS.BYTES), - create_gauge_panel("jemalloc_active_bytes", "Active Bytes", UNITS.BYTES), - create_gauge_panel("jemalloc_metadata_bytes", "Metadata Bytes", UNITS.BYTES), - create_gauge_panel("jemalloc_resident_bytes", "Resident Bytes", UNITS.BYTES), - create_gauge_panel("jemalloc_mapped_bytes", "Mapped Bytes", UNITS.BYTES), - create_gauge_panel("jemalloc_retained_bytes", "Retained Bytes", UNITS.BYTES), - create_gauge_panel("jemalloc_dirty_bytes", "Dirty Bytes", UNITS.BYTES), - create_gauge_panel( - "jemalloc_fragmentation_bytes", "Fragmentation Bytes", UNITS.BYTES + create_gauge_panel( + "jemalloc_allocated_bytes", "Allocated Bytes", UNITS.BYTES_IEC + ), + create_gauge_panel("jemalloc_active_bytes", "Active Bytes", UNITS.BYTES_IEC), + create_gauge_panel( + "jemalloc_metadata_bytes", "Metadata Bytes", UNITS.BYTES_IEC + ), + create_gauge_panel( + "jemalloc_resident_bytes", "Resident Bytes", UNITS.BYTES_IEC + ), + create_gauge_panel("jemalloc_mapped_bytes", "Mapped Bytes", UNITS.BYTES_IEC), + create_gauge_panel( + "jemalloc_retained_bytes", "Retained Bytes", UNITS.BYTES_IEC + ), + create_gauge_panel("jemalloc_dirty_bytes", "Dirty Bytes", UNITS.BYTES_IEC), + create_gauge_panel( + "jemalloc_fragmentation_bytes", "Fragmentation Bytes", UNITS.BYTES_IEC ), ] return create_row("Allocator Stats", metrics) diff --git a/storage/src/store/shard_state/cell_storage.rs b/storage/src/store/shard_state/cell_storage.rs index 2e104e8b2..b85f86356 100644 --- a/storage/src/store/shard_state/cell_storage.rs +++ b/storage/src/store/shard_state/cell_storage.rs @@ -3,14 +3,14 @@ use std::collections::hash_map; use std::mem::{ManuallyDrop, MaybeUninit}; use std::sync::atomic::{AtomicI64, AtomicU8, Ordering}; use std::sync::{Arc, Weak}; -use std::time::Instant; +use std::time::{Duration, Instant}; use anyhow::{Context, Result}; use bumpalo::Bump; use everscale_types::cell::*; use quick_cache::sync::{Cache, DefaultLifecycle}; use triomphe::ThinArc; -use tycho_util::metrics::HistogramGuard; +use tycho_util::metrics::{spawn_metrics_loop, HistogramGuard}; use tycho_util::{FastDashMap, FastHashMap, FastHasherState}; use weedb::rocksdb::WriteBatch; use weedb::{rocksdb, BoundedCfHandle}; @@ -30,6 +30,12 @@ impl CellStorage { let cells_cache = Default::default(); let raw_cells_cache = Arc::new(RawCellsCache::new(cache_size_bytes)); + spawn_metrics_loop( + &raw_cells_cache.clone(), + Duration::from_secs(5), + |c| async move { c.refresh_metrics() }, + ); + Arc::new(Self { db, cells_cache, @@ -1083,4 +1089,8 @@ impl RawCellsCache { v.header.header.store(rc, Ordering::Release); } + + fn refresh_metrics(&self) { + metrics::gauge!("tycho_storage_raw_cells_cache_size").set(self.inner.weight() as f64); + } }