Skip to content

Commit

Permalink
feat(storage): use Once for storage cells
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Dec 6, 2024
1 parent e85a8ad commit bd4bc78
Showing 1 changed file with 26 additions and 144 deletions.
170 changes: 26 additions & 144 deletions storage/src/store/shard_state/cell_storage.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::cell::UnsafeCell;
use std::collections::{hash_map, VecDeque};
use std::mem::{ManuallyDrop, MaybeUninit};
use std::sync::atomic::{AtomicI64, AtomicU8, Ordering};
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, Once, Weak};

use anyhow::{Context, Result};
use bumpalo::Bump;
Expand Down Expand Up @@ -632,19 +632,14 @@ pub struct StorageCell {
cell_storage: Arc<CellStorage>,
descriptor: CellDescriptor,
bit_len: u16,
data: Vec<u8>,
hashes: Vec<(HashBytes, u16)>,
data: Box<[u8]>,
hashes: Box<[(HashBytes, u16)]>,

reference_states: [AtomicU8; 4],
reference_states: [Once; 4],
reference_data: [UnsafeCell<StorageCellReferenceData>; 4],
}

impl StorageCell {
const REF_EMPTY: u8 = 0x0;
const REF_RUNNING: u8 = 0x1;
const REF_STORAGE: u8 = 0x2;
const REF_REPLACED: u8 = 0x3;

pub fn deserialize(cell_storage: Arc<CellStorage>, buffer: &[u8]) -> Option<Self> {
if buffer.len() < 4 {
return None;
Expand All @@ -661,9 +656,11 @@ impl StorageCell {
return None;
}

let data = buffer[4..4 + byte_len].to_vec();
let data = Box::from(&buffer[4..4 + byte_len]);

let mut hashes = Vec::new();
hashes.reserve_exact(hash_count);

let mut hashes = Vec::with_capacity(hash_count);
let mut offset = 4 + byte_len;
for _ in 0..hash_count {
hashes.push((
Expand All @@ -673,7 +670,7 @@ impl StorageCell {
offset += 32 + 2;
}

let reference_states = Default::default();
let reference_states = [(); 4].map(|_| Once::new());
let reference_data = unsafe {
MaybeUninit::<[UnsafeCell<StorageCellReferenceData>; 4]>::uninit().assume_init()
};
Expand All @@ -689,7 +686,7 @@ impl StorageCell {
bit_len,
descriptor,
data,
hashes,
hashes: hashes.into_boxed_slice(),
reference_states,
reference_data,
})
Expand Down Expand Up @@ -767,88 +764,22 @@ impl StorageCell {
return None;
}

let state = &self.reference_states[index as usize];
let slot = self.reference_data[index as usize].get();
self.reference_states[index as usize].call_once(|| {
let cell = self
.cell_storage
.load_cell(unsafe { (*slot).hash })
.unwrap();

let current_state = state.load(Ordering::Acquire);
if current_state == Self::REF_STORAGE {
return Some(unsafe { &(*slot).storage_cell });
}

let mut res = Ok(());
Self::initialize_inner(state, &mut || match self
.cell_storage
.load_cell(unsafe { (*slot).hash })
{
Ok(cell) => unsafe {
unsafe {
*slot = StorageCellReferenceData {
storage_cell: ManuallyDrop::new(cell),
};
true
},
Err(err) => {
res = Err(err);
false
}
}
};
});

// TODO: just return none?
res.unwrap();

Some(unsafe { &(*slot).storage_cell })
}

fn initialize_inner(state: &AtomicU8, init: &mut impl FnMut() -> bool) {
struct Guard<'a> {
state: &'a AtomicU8,
new_state: u8,
}

impl Drop for Guard<'_> {
fn drop(&mut self) {
self.state.store(self.new_state, Ordering::Release);
unsafe {
let key = self.state as *const AtomicU8 as usize;
parking_lot_core::unpark_all(key, parking_lot_core::DEFAULT_UNPARK_TOKEN);
}
}
}

loop {
let exchange = state.compare_exchange_weak(
Self::REF_EMPTY,
Self::REF_RUNNING,
Ordering::Acquire,
Ordering::Acquire,
);
match exchange {
Ok(_) => {
let mut guard = Guard {
state,
new_state: Self::REF_EMPTY,
};
if init() {
guard.new_state = Self::REF_STORAGE;
}
return;
}
Err(Self::REF_STORAGE) => return,
Err(Self::REF_RUNNING) => unsafe {
let key = state as *const AtomicU8 as usize;
parking_lot_core::park(
key,
|| state.load(Ordering::Relaxed) == Self::REF_RUNNING,
|| (),
|_, _| (),
parking_lot_core::DEFAULT_PARK_TOKEN,
None,
);
},
Err(Self::REF_EMPTY) => (),
Err(_) => debug_assert!(false),
}
}
}
}

impl CellImpl for StorageCell {
Expand Down Expand Up @@ -892,48 +823,14 @@ impl CellImpl for StorageCell {
}

fn take_first_child(&mut self) -> Option<Cell> {
let state = self.reference_states[0].swap(Self::REF_EMPTY, Ordering::AcqRel);
let data = self.reference_data[0].get_mut();
match state {
Self::REF_STORAGE => Some(unsafe { data.take_storage_cell() }),
Self::REF_REPLACED => Some(unsafe { data.take_replaced_cell() }),
_ => None,
}
None
}

fn replace_first_child(&mut self, parent: Cell) -> std::result::Result<Cell, Cell> {
let state = self.reference_states[0].load(Ordering::Acquire);
if state < Self::REF_STORAGE {
return Err(parent);
}

self.reference_states[0].store(Self::REF_REPLACED, Ordering::Release);
let data = self.reference_data[0].get_mut();

let cell = match state {
Self::REF_STORAGE => unsafe { data.take_storage_cell() },
Self::REF_REPLACED => unsafe { data.take_replaced_cell() },
_ => return Err(parent),
};
data.replaced_cell = ManuallyDrop::new(parent);
Ok(cell)
Err(parent)
}

fn take_next_child(&mut self) -> Option<Cell> {
while self.descriptor.reference_count() > 1 {
self.descriptor.d1 -= 1;
let idx = (self.descriptor.d1 & CellDescriptor::REF_COUNT_MASK) as usize;

let state = self.reference_states[idx].swap(Self::REF_EMPTY, Ordering::AcqRel);
let data = self.reference_data[idx].get_mut();

return Some(match state {
Self::REF_STORAGE => unsafe { data.take_storage_cell() },
Self::REF_REPLACED => unsafe { data.take_replaced_cell() },
_ => continue,
});
}

None
}

Expand All @@ -948,16 +845,13 @@ impl CellImpl for StorageCell {
impl Drop for StorageCell {
fn drop(&mut self) {
self.cell_storage.drop_cell(DynCell::repr_hash(self));

for i in 0..4 {
let state = self.reference_states[i].load(Ordering::Acquire);
let initialized = self.reference_states[i].is_completed();
let data = self.reference_data[i].get_mut();

unsafe {
match state {
Self::REF_STORAGE => ManuallyDrop::drop(&mut data.storage_cell),
Self::REF_REPLACED => ManuallyDrop::drop(&mut data.replaced_cell),
_ => {}
}
if initialized {
unsafe { ManuallyDrop::drop(&mut data.storage_cell) }
}
}
}
Expand All @@ -967,22 +861,10 @@ unsafe impl Send for StorageCell {}
unsafe impl Sync for StorageCell {}

pub union StorageCellReferenceData {
/// Incplmete state.
/// Incomplete state.
hash: HashBytes,
/// Complete state.
storage_cell: ManuallyDrop<Arc<StorageCell>>,
/// Replaced state.
replaced_cell: ManuallyDrop<Cell>,
}

impl StorageCellReferenceData {
unsafe fn take_storage_cell(&mut self) -> Cell {
Cell::from(ManuallyDrop::take(&mut self.storage_cell) as Arc<_>)
}

unsafe fn take_replaced_cell(&mut self) -> Cell {
ManuallyDrop::take(&mut self.replaced_cell)
}
}

struct RawCellsCache(Cache<HashBytes, RawCellsCacheItem, CellSizeEstimator, FastHasherState>);
Expand Down

0 comments on commit bd4bc78

Please sign in to comment.