Skip to content

Commit

Permalink
perf: add allocation size hint for WriteBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
0xdeafbeef authored and Rexagon committed Dec 6, 2024
1 parent 64b497f commit 91b3996
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cli/src/cmd/debug/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Mempool {

self.storage
.shard_state_storage()
.store_state(&mc_zerostate_handle, &mc_zerostate)
.store_state(&mc_zerostate_handle, &mc_zerostate, Default::default())
.await?;

Ok(mc_zerostate)
Expand Down
7 changes: 5 additions & 2 deletions collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use phase::{ActualState, Phase};
use prepare::PrepareState;
use sha2::Digest;
use tycho_block_util::state::MinRefMcStateTracker;
use tycho_storage::NewBlockMeta;
use tycho_storage::{NewBlockMeta, StoreStateHint};
use tycho_util::futures::JoinTask;
use tycho_util::metrics::HistogramGuard;
use tycho_util::time::now_millis;
Expand Down Expand Up @@ -1052,13 +1052,16 @@ impl CollatorStdImpl {
let adapter = self.state_node_adapter.clone();
let labels = labels.clone();
let new_state_root = finalized.new_state_root.clone();
let hint = StoreStateHint {
block_data_size: Some(finalized.block_candidate.block.data_size()),
};
async move {
let _histogram = HistogramGuard::begin_with_labels(
"tycho_collator_build_new_state_time",
&labels,
);
adapter
.store_state_root(&block_id, meta, new_state_root)
.store_state_root(&block_id, meta, new_state_root, hint)
.await
}
});
Expand Down
6 changes: 4 additions & 2 deletions collator/src/state_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tycho_block_util::block::{BlockProofStuff, BlockStuff, BlockStuffAug};
use tycho_block_util::queue::QueueDiffStuff;
use tycho_block_util::state::ShardStateStuff;
use tycho_network::PeerId;
use tycho_storage::{BlockHandle, MaybeExistingHandle, NewBlockMeta, Storage};
use tycho_storage::{BlockHandle, MaybeExistingHandle, NewBlockMeta, Storage, StoreStateHint};
use tycho_util::metrics::HistogramGuard;
use tycho_util::{FastDashMap, FastHashMap};

Expand Down Expand Up @@ -61,6 +61,7 @@ pub trait StateNodeAdapter: Send + Sync + 'static {
block_id: &BlockId,
meta: NewBlockMeta,
state_root: Cell,
hint: StoreStateHint,
) -> Result<bool>;
/// Return block by its id from node local state
async fn load_block(&self, block_id: &BlockId) -> Result<Option<BlockStuff>>;
Expand Down Expand Up @@ -191,6 +192,7 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl {
block_id: &BlockId,
meta: NewBlockMeta,
state_root: Cell,
hint: StoreStateHint,
) -> Result<bool> {
let _histogram = HistogramGuard::begin("tycho_collator_state_store_state_root_time");

Expand All @@ -204,7 +206,7 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl {
let updated = self
.storage
.shard_state_storage()
.store_state_root(&handle, state_root)
.store_state_root(&handle, state_root, hint)
.await?;

Ok(updated)
Expand Down
4 changes: 2 additions & 2 deletions collator/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn prepare_test_storage() -> anyhow::Result<(Storage, tempfile::TempDi

storage
.shard_state_storage()
.store_state(&handle, &master_state_stuff)
.store_state(&handle, &master_state_stuff, Default::default())
.await?;

// first master block
Expand Down Expand Up @@ -142,7 +142,7 @@ pub async fn prepare_test_storage() -> anyhow::Result<(Storage, tempfile::TempDi

storage
.shard_state_storage()
.store_state(&handle, &shard_state_stuff)
.store_state(&handle, &shard_state_stuff, Default::default())
.await?;
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/block_strider/starter/cold_boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ impl StarterInner {
});

let stored = state_storage
.store_state(&handle, &state)
.store_state(&handle, &state, Default::default())
.await
.with_context(|| {
format!("failed to import zerostate for {}", state.block_id().shard)
Expand Down
6 changes: 4 additions & 2 deletions core/src/block_strider/state_applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures_util::future::BoxFuture;
use tycho_block_util::archive::ArchiveData;
use tycho_block_util::block::BlockStuff;
use tycho_block_util::state::{MinRefMcStateTracker, RefMcStateHandle, ShardStateStuff};
use tycho_storage::{BlockConnection, BlockHandle, NewBlockMeta, Storage};
use tycho_storage::{BlockConnection, BlockHandle, NewBlockMeta, Storage, StoreStateHint};
use tycho_util::metrics::HistogramGuard;
use tycho_util::sync::rayon_run;

Expand Down Expand Up @@ -254,7 +254,9 @@ where

let state_storage = self.inner.storage.shard_state_storage();
state_storage
.store_state(handle, &new_state)
.store_state(handle, &new_state, StoreStateHint {
block_data_size: Some(block.data_size()),
})
.await
.context("Failed to store new state")?;

Expand Down
4 changes: 2 additions & 2 deletions core/tests/archives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async fn prepare_storage(config: StorageConfig, zerostate: ShardStateStuff) -> R

storage
.shard_state_storage()
.store_state(&handle, &zerostate)
.store_state(&handle, &zerostate, Default::default())
.await?;

let tracker = MinRefMcStateTracker::default();
Expand Down Expand Up @@ -144,7 +144,7 @@ async fn prepare_storage(config: StorageConfig, zerostate: ShardStateStuff) -> R

storage
.shard_state_storage()
.store_state(&handle, &state)
.store_state(&handle, &state, Default::default())
.await?;
}

Expand Down
2 changes: 1 addition & 1 deletion core/tests/overlay_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ async fn overlay_server_persistent_state() -> Result<()> {
shard_states.min_ref_mc_state(),
)?;
shard_states
.store_state(&zerostate_handle, &zerostate)
.store_state(&zerostate_handle, &zerostate, Default::default())
.await?;

{
Expand Down
2 changes: 1 addition & 1 deletion core/tests/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) async fn init_storage() -> Result<(Storage, TempDir)> {

storage
.shard_state_storage()
.store_state(&handle, &zerostate)
.store_state(&handle, &zerostate, Default::default())
.await?;

// Init blocks
Expand Down
25 changes: 24 additions & 1 deletion scripts/gen-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,12 +776,34 @@ def storage() -> RowPanel:
create_heatmap_panel(
"tycho_storage_state_update_time", "Time to write state update to rocksdb"
),
create_heatmap_panel(
"tycho_storage_state_store_time",
"Time to store single root with rocksdb write etc",
),
create_heatmap_panel(
"tycho_storage_cell_in_mem_store_time", "Time to store cell without write"
),
create_heatmap_panel(
"tycho_storage_cell_in_mem_store_time", "Time to store cell without write"
),
create_heatmap_panel(
"tycho_storage_batch_write_time", "Time to write merge in write batch"
),
create_heatmap_quantile_panel(
"tycho_storage_state_update_size_bytes",
"State update size",
UNITS.BYTES,
"0.999",
),
create_heatmap_quantile_panel(
"tycho_storage_state_update_size_predicted_bytes",
"Predicted state update size",
UNITS.BYTES,
"0.999",
),
create_heatmap_panel(
"tycho_storage_batch_write_time", "Time to write merge in write batch"
),
create_heatmap_panel(
"tycho_storage_state_store_time", "Time to store state with cell traversal"
),
Expand Down Expand Up @@ -1688,7 +1710,8 @@ def mempool_engine() -> RowPanel:
metrics = [
create_counter_panel(
expr_sum_increase(
"tycho_mempool_rounds_dag_behind_consensus", range_selector="$__interval"
"tycho_mempool_rounds_dag_behind_consensus",
range_selector="$__interval",
),
"Dag: rounds behind consensus",
),
Expand Down
4 changes: 3 additions & 1 deletion storage/src/store/persistent_state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ async fn persistent_shard_state() -> Result<()> {
NewBlockMeta::zero_state(zerostate.as_ref().gen_utime, true),
);

shard_states.store_state(&handle, &zerostate).await?;
shard_states
.store_state(&handle, &zerostate, Default::default())
.await?;

// Check seqno
let min_ref_mc_state = shard_states.min_ref_mc_state();
Expand Down
19 changes: 14 additions & 5 deletions storage/src/store/shard_state/cell_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use quick_cache::sync::{Cache, DefaultLifecycle};
use triomphe::ThinArc;
use tycho_util::metrics::HistogramGuard;
use tycho_util::{FastDashMap, FastHashMap, FastHasherState};
use weedb::rocksdb::WriteBatch;
use weedb::{rocksdb, BoundedCfHandle};

use crate::db::*;
Expand Down Expand Up @@ -234,8 +235,9 @@ impl CellStorage {

pub fn store_cell(
&self,
batch: &mut rocksdb::WriteBatch,
batch: &mut WriteBatch,
root: Cell,
estimated_cell_count: usize,
) -> Result<(PendingOperation<'_>, usize), CellStorageError> {
struct CellWithRefs<'a> {
rc: u32,
Expand Down Expand Up @@ -334,7 +336,10 @@ impl CellStorage {
db: &self.db,
raw_cache: &self.raw_cells_cache,
alloc: &alloc,
transaction: FastHashMap::with_capacity_and_hasher(128, Default::default()),
transaction: FastHashMap::with_capacity_and_hasher(
estimated_cell_count,
Default::default(),
),
buffer: Vec::with_capacity(512),
};

Expand Down Expand Up @@ -417,10 +422,9 @@ impl CellStorage {

pub fn remove_cell(
&self,
batch: &mut rocksdb::WriteBatch,
alloc: &Bump,
hash: &HashBytes,
) -> Result<(PendingOperation<'_>, usize), CellStorageError> {
) -> Result<(PendingOperation<'_>, usize, WriteBatch), CellStorageError> {
#[derive(Clone, Copy)]
struct CellState<'a> {
rc: i64,
Expand Down Expand Up @@ -493,6 +497,11 @@ impl CellStorage {
// Write transaction to the `WriteBatch`
let _hist = HistogramGuard::begin("tycho_storage_batch_write_time");
let total = transaction.len();

// NOTE: For each cell we have 32 bytes for key and 8 bytes for RC,
// and a bit more just in case.
let mut batch = WriteBatch::with_capacity_bytes(total * (32 + 8 + 8));

for (key, CellState { removes, .. }) in transaction {
self.raw_cells_cache.remove_refs(key, removes);
batch.merge_cf(
Expand All @@ -501,7 +510,7 @@ impl CellStorage {
refcount::encode_negative_refcount(removes),
);
}
Ok((pending_op, total))
Ok((pending_op, total, batch))
}

pub fn drop_cell(&self, hash: &HashBytes) {
Expand Down
50 changes: 42 additions & 8 deletions storage/src/store/shard_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,26 @@ impl ShardStateStorage {
&self.min_ref_mc_state
}

pub async fn store_state(&self, handle: &BlockHandle, state: &ShardStateStuff) -> Result<bool> {
pub async fn store_state(
&self,
handle: &BlockHandle,
state: &ShardStateStuff,
hint: StoreStateHint,
) -> Result<bool> {
if handle.id() != state.block_id() {
return Err(ShardStateStorageError::BlockHandleIdMismatch.into());
}

self.store_state_root(handle, state.root_cell().clone())
self.store_state_root(handle, state.root_cell().clone(), hint)
.await
}

pub async fn store_state_root(&self, handle: &BlockHandle, root_cell: Cell) -> Result<bool> {
pub async fn store_state_root(
&self,
handle: &BlockHandle,
root_cell: Cell,
hint: StoreStateHint,
) -> Result<bool> {
if handle.has_state() {
return Ok(false);
}
Expand All @@ -105,17 +115,25 @@ impl ShardStateStorage {
// NOTE: `spawn_blocking` is used here instead of `rayon_run` as it is IO-bound task.
let (new_cell_count, updated) = tokio::task::spawn_blocking(move || {
let root_hash = *root_cell.repr_hash();
let estimated_merkle_update_size = hint.estimate_cell_count();

let mut batch = rocksdb::WriteBatch::default();
let estimated_update_size_bytes = estimated_merkle_update_size * 192; // p50 cell size in bytes
let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes);

let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time");
let (pending_op, new_cell_count) = cell_storage.store_cell(&mut batch, root_cell)?;
let (pending_op, new_cell_count) =
cell_storage.store_cell(&mut batch, root_cell, estimated_merkle_update_size)?;
in_mem_store.finish();
metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64);

batch.put_cf(&cf.bound(), block_id.to_vec(), root_hash.as_slice());

let hist = HistogramGuard::begin("tycho_storage_state_update_time");
metrics::histogram!("tycho_storage_state_update_size_bytes")
.record(batch.size_in_bytes() as f64);
metrics::histogram!("tycho_storage_state_update_size_predicted_bytes")
.record(estimated_update_size_bytes as f64);

raw_db.write(batch)?;

// Ensure that pending operation guard is dropped after the batch is written
Expand Down Expand Up @@ -222,11 +240,10 @@ impl ShardStateStorage {
let db = self.db.clone();
let cell_storage = self.cell_storage.clone();
let key = key.to_vec();
let mut batch = rocksdb::WriteBatch::default();

let (total, inner_alloc) = tokio::task::spawn_blocking(move || {
let (pending_op, stats) =
cell_storage.remove_cell(&mut batch, &alloc, &root_hash)?;
let (pending_op, stats, mut batch) =
cell_storage.remove_cell(&alloc, &root_hash)?;

batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key);
db.raw()
Expand Down Expand Up @@ -367,6 +384,23 @@ impl ShardStateStorage {
}
}

#[derive(Default, Debug, Clone, Copy)]
pub struct StoreStateHint {
pub block_data_size: Option<usize>,
}

impl StoreStateHint {
fn estimate_cell_count(&self) -> usize {
const MIN_BLOCK_SIZE: usize = 4 << 10; // 4 KB

let block_data_size = self.block_data_size.unwrap_or(MIN_BLOCK_SIZE);

// y = 3889.9821 + 14.7480 × √x
// R-squared: 0.7035
((3889.9821 + 14.7480 * (block_data_size as f64).sqrt()) as usize).next_power_of_two()
}
}

#[derive(Debug, Copy, Clone)]
pub struct ShardStateStorageMetrics {
pub max_new_mc_cell_count: usize,
Expand Down
7 changes: 3 additions & 4 deletions storage/src/store/shard_state/store_state_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ mod test {
use bytesize::ByteSize;
use everscale_types::models::ShardIdent;
use tycho_util::project_root;
use weedb::rocksdb::{IteratorMode, WriteBatch};
use weedb::rocksdb::IteratorMode;

use super::*;
use crate::{Storage, StorageConfig};
Expand Down Expand Up @@ -626,9 +626,8 @@ mod test {
// check that state actually exists
let cell = cell_storage.load_cell(HashBytes::from_slice(value.as_ref()))?;

let mut batch = WriteBatch::default();
let (pending_op, _) =
cell_storage.remove_cell(&mut batch, &bump, cell.hash(LevelMask::MAX_LEVEL))?;
let (pending_op, _, batch) =
cell_storage.remove_cell(&bump, cell.hash(LevelMask::MAX_LEVEL))?;

// execute batch
db.rocksdb().write_opt(batch, db.cells.write_config())?;
Expand Down

0 comments on commit 91b3996

Please sign in to comment.