Skip to content

Commit

Permalink
perf(core): mt cell store
Browse files Browse the repository at this point in the history
  • Loading branch information
0xdeafbeef authored and Rexagon committed Dec 6, 2024
1 parent 8a872b1 commit e85a8ad
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 47 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ bytesize = { version = "1.3.0", features = ["serde"] }
castaway = "0.2"
clap = { version = "4.5.3", features = ["derive"] }
crc32c = "0.6"
crossbeam-deque = "0.8.5"
crossbeam-utils = "0.8.20"
dashmap = "5.5.3"
dirs = "5.0.1"
ed25519 = "2.0"
Expand Down
2 changes: 2 additions & 0 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ bumpalo = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
crc32c = { workspace = true }
crossbeam-deque = { workspace = true }
crossbeam-utils = { workspace = true }
dashmap = { workspace = true }
everscale-types = { workspace = true, features = ["tycho", "stats"] }
fdlimit = { workspace = true }
Expand Down
193 changes: 146 additions & 47 deletions storage/src/store/shard_state/cell_storage.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::cell::UnsafeCell;
use std::collections::hash_map;
use std::collections::{hash_map, VecDeque};
use std::mem::{ManuallyDrop, MaybeUninit};
use std::sync::atomic::{AtomicI64, AtomicU8, Ordering};
use std::sync::{Arc, Weak};

use anyhow::{Context, Result};
use bumpalo::Bump;
use crossbeam_deque::{Steal, Stealer, Worker};
use crossbeam_utils::Backoff;
use dashmap::mapref::entry::Entry;
use dashmap::Map;
use everscale_types::cell::*;
use parking_lot::Mutex;
use quick_cache::sync::{Cache, DefaultLifecycle};
Expand Down Expand Up @@ -242,40 +245,59 @@ impl CellStorage {
) -> Result<(PendingOperation<'_>, usize), CellStorageError> {
let pending_op = self.pending.begin();

let walk_hist = HistogramGuard::begin("tycho_storage_walk_tree_time");
let ctx = StoreContext::new(&self.db, &self.raw_cells_cache, estimated_cell_count);

// Check root cell
let mut queue = VecDeque::new();
queue.push_back((root, 0usize));

let key = root.repr_hash();
if !ctx.insert_cell(key, root.as_ref(), 0)? {
return Ok((pending_op, 0));
}

let mut stack = Vec::with_capacity(16);
stack.push(root.references());
while let Some((current_cell, current_depth)) = queue.pop_front() {
if !ctx.insert_cell(
current_cell.repr_hash(),
current_cell.as_ref(),
Some(current_depth),
)? {
continue;
}
for next_cell in current_cell.references().cloned() {
queue.push_back((next_cell, current_depth + 1));
}

// Check other cells
'outer: loop {
let depth = stack.len();
let Some(iter) = stack.last_mut() else {
if current_depth == 6 {
break;
};
}
}

for child in &mut *iter {
let key = child.repr_hash();
let num_cpus = std::thread::available_parallelism()
.expect("We don't use platforms where it's not supported")
.get();
if !queue.is_empty() {
let queues = (0..num_cpus)
.map(|_| Worker::new_lifo())
.collect::<Vec<_>>();

if ctx.insert_cell(key, child, depth)? {
stack.push(child.references());
continue 'outer;
}
for (idx, cell) in queue.into_iter().enumerate() {
queues[idx % num_cpus].push(cell.0);
}

stack.pop();
}
let stealers: Vec<_> = queues.iter().map(|w| w.stealer()).collect();

// Clear big chunks of data before finalization
drop(stack);
std::thread::scope(|s| {
for (index, worker) in queues.into_iter().enumerate() {
let mut stealers = stealers.clone();
stealers.remove(index); // we don't want to steal from ourselves

let ctxt = ctx.clone();
s.spawn(move || {
process_worker_queue(worker, stealers, &ctxt)
.expect("todo somehow propagate error");
});
}
});
}
drop(walk_hist);

let ctx = Arc::into_inner(ctx).unwrap();
// Write transaction to the `WriteBatch`
Ok((pending_op, ctx.finalize(batch)))
}
Expand Down Expand Up @@ -419,6 +441,53 @@ impl CellStorage {
}
}

fn process_worker_queue(
worker: Worker<Cell>,
stealers: Vec<Stealer<Cell>>,
ctx: &StoreContext,
) -> Result<(), CellStorageError> {
loop {
let Some(cell) = find_task(&worker, &stealers) else {
break Ok(());
};

let cell_hash = *cell.repr_hash();
if !ctx.insert_cell(&cell_hash, cell.as_ref(), None)? {
continue;
}

for c in cell.references().cloned() {
worker.push(c);
}
}
}

fn find_task<T>(local: &Worker<T>, stealers: &[Stealer<T>]) -> Option<T> {
if let Some(task) = local.pop() {
return Some(task);
};

let backoff = Backoff::new();
for stealer in stealers {
'inner: loop {
match stealer.steal_batch_and_pop(local) {
Steal::Empty => {
// todo : always skip it
break 'inner;
}
Steal::Success(t) => {
return Some(t);
}
Steal::Retry => {
backoff.snooze();
continue 'inner;
}
}
}
}
None
}

struct CellWithRefs {
rc: u32,
data: Option<Vec<u8>>,
Expand All @@ -431,23 +500,23 @@ struct StoreContext {
}

impl StoreContext {
fn new(db: &BaseDb, raw_cache: &Arc<RawCellsCache>, capacity: usize) -> Self {
Self {
fn new(db: &BaseDb, raw_cache: &Arc<RawCellsCache>, capacity: usize) -> Arc<Self> {
Arc::new(Self {
db: db.clone(),
raw_cache: raw_cache.clone(),
transaction: FastDashMap::with_capacity_and_hasher_and_shard_amount(
capacity,
Default::default(),
512,
),
}
})
}

fn insert_cell(
&self,
key: &HashBytes,
cell: &DynCell,
depth: usize,
depth: Option<usize>,
) -> Result<bool, CellStorageError> {
let mut buffer = [0; 512];
Ok(match self.transaction.entry(*key) {
Expand All @@ -464,13 +533,16 @@ impl StoreContext {
const NEW_CELLS_DEPTH_THRESHOLD: usize = 4;

let (old_rc, has_value) = 'value: {
if depth >= NEW_CELLS_DEPTH_THRESHOLD {
// NOTE: `get` here is used to affect a "hotness" of the value, because
// there is a big chance that we will need it soon during state processing
if let Some(entry) = self.raw_cache.0.get(key) {
let rc = entry.header.header.load(Ordering::Acquire);
break 'value (rc, rc > 0);
match depth {
Some(d) if d >= NEW_CELLS_DEPTH_THRESHOLD => {
// NOTE: `get` here is used to affect a "hotness" of the value, because
// there is a big chance that we will need it soon during state processing
if let Some(entry) = self.raw_cache.0.get(key) {
let rc = entry.header.header.load(Ordering::Acquire);
break 'value (rc, rc > 0);
}
}
_ => {}
}

match self.db.cells.get(key).map_err(CellStorageError::Internal)? {
Expand Down Expand Up @@ -500,20 +572,47 @@ impl StoreContext {
}

fn finalize(self, batch: &mut WriteBatch) -> usize {
let mut buffer = Vec::with_capacity(512);
let total = self.transaction.len();
let cells_cf = &self.db.cells.cf();
for (key, CellWithRefs { rc, data }) in self.transaction {
buffer.clear();
refcount::add_positive_refount(rc, data.as_deref(), &mut buffer);
if let Some(data) = data {
self.raw_cache.insert(&key, rc, &data);
} else {
self.raw_cache.add_refs(&key, rc);
std::thread::scope(|s| {
let number_shards = self.transaction._shard_count();
// safety: we hold only read locks
let shards = unsafe { (0..number_shards).map(|i| self.transaction._get_read_shard(i)) };
let cache = &self.raw_cache;

// todo: clamp to number of cpus x2
for shard in shards {
// spawned threads will be joined at the end of the scope, so we don't need to store them
s.spawn(move || {
for (key, value) in shard {
let value = value.get();
let rc = value.rc;
if let Some(data) = &value.data {
cache.insert(key, rc, data);
} else {
cache.add_refs(key, rc);
}
}
});
}
batch.merge_cf(cells_cf, key.as_slice(), &buffer);
}
total

let batch_update = s.spawn(|| {
let mut buffer = Vec::with_capacity(512);
let total = self.transaction.len();
let cells_cf = &self.db.cells.cf();
for kv in self.transaction.iter() {
let key = kv.key();
let value = kv.value();
let rc = value.rc;
let data = value.data.as_deref();

buffer.clear();
refcount::add_positive_refount(rc, data, &mut buffer);
batch.merge_cf(cells_cf, key.as_slice(), &buffer);
}
total
});

batch_update.join().expect("thread panicked")
})
}
}

Expand Down

0 comments on commit e85a8ad

Please sign in to comment.