Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: multi-threaded graph traversal during cell store #495

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ bytesize = { version = "1.3.0", features = ["serde"] }
castaway = "0.2"
clap = { version = "4.5.3", features = ["derive"] }
crc32c = "0.6"
crossbeam-deque = "0.8.5"
crossbeam-utils = "0.8.20"
dashmap = "5.5.3"
dirs = "5.0.1"
ed25519 = "2.0"
Expand Down Expand Up @@ -97,11 +99,11 @@ tarpc = { version = "0.34", features = [
] }
tempfile = "3.10"
thiserror = "1.0"
tikv-jemallocator = { version = "0.5", features = [
tikv-jemallocator = { version = "0.6.0", features = [
"unprefixed_malloc_on_supported_platforms",
"background_threads",
] }
tikv-jemalloc-ctl = { version = "0.5" }
tikv-jemalloc-ctl = { version = "0.6.0", features = ["stats"] }
tl-proto = "0.4"
tokio = { version = "1", default-features = false }
tokio-stream = "0.1.15"
Expand Down Expand Up @@ -134,6 +136,9 @@ tycho-rpc = { path = "./rpc", version = "0.1.4" }
tycho-storage = { path = "./storage", version = "0.1.4" }
tycho-util = { path = "./util", version = "0.1.4" }

[patch.crates-io]
weedb = { version = "0.3.8", git = "https://github.com/broxus/weedb.git", branch = "next-rocksdb" }

[workspace.lints.rust]
future_incompatible = "warn"
nonstandard_style = "warn"
Expand Down
2 changes: 1 addition & 1 deletion cli/src/cmd/debug/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Mempool {

self.storage
.shard_state_storage()
.store_state(&mc_zerostate_handle, &mc_zerostate)
.store_state(&mc_zerostate_handle, &mc_zerostate, Default::default())
.await?;

Ok(mc_zerostate)
Expand Down
7 changes: 5 additions & 2 deletions collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use phase::{ActualState, Phase};
use prepare::PrepareState;
use sha2::Digest;
use tycho_block_util::state::MinRefMcStateTracker;
use tycho_storage::NewBlockMeta;
use tycho_storage::{NewBlockMeta, StoreStateHint};
use tycho_util::futures::JoinTask;
use tycho_util::metrics::HistogramGuard;
use tycho_util::time::now_millis;
Expand Down Expand Up @@ -1052,13 +1052,16 @@ impl CollatorStdImpl {
let adapter = self.state_node_adapter.clone();
let labels = labels.clone();
let new_state_root = finalized.new_state_root.clone();
let hint = StoreStateHint {
block_data_size: Some(finalized.block_candidate.block.data_size()),
};
async move {
let _histogram = HistogramGuard::begin_with_labels(
"tycho_collator_build_new_state_time",
&labels,
);
adapter
.store_state_root(&block_id, meta, new_state_root)
.store_state_root(&block_id, meta, new_state_root, hint)
.await
}
});
Expand Down
6 changes: 4 additions & 2 deletions collator/src/state_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tycho_block_util::block::{BlockProofStuff, BlockStuff, BlockStuffAug};
use tycho_block_util::queue::QueueDiffStuff;
use tycho_block_util::state::ShardStateStuff;
use tycho_network::PeerId;
use tycho_storage::{BlockHandle, MaybeExistingHandle, NewBlockMeta, Storage};
use tycho_storage::{BlockHandle, MaybeExistingHandle, NewBlockMeta, Storage, StoreStateHint};
use tycho_util::metrics::HistogramGuard;
use tycho_util::{FastDashMap, FastHashMap};

Expand Down Expand Up @@ -61,6 +61,7 @@ pub trait StateNodeAdapter: Send + Sync + 'static {
block_id: &BlockId,
meta: NewBlockMeta,
state_root: Cell,
hint: StoreStateHint,
) -> Result<bool>;
/// Return block by its id from node local state
async fn load_block(&self, block_id: &BlockId) -> Result<Option<BlockStuff>>;
Expand Down Expand Up @@ -191,6 +192,7 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl {
block_id: &BlockId,
meta: NewBlockMeta,
state_root: Cell,
hint: StoreStateHint,
) -> Result<bool> {
let _histogram = HistogramGuard::begin("tycho_collator_state_store_state_root_time");

Expand All @@ -204,7 +206,7 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl {
let updated = self
.storage
.shard_state_storage()
.store_state_root(&handle, state_root)
.store_state_root(&handle, state_root, hint)
.await?;

Ok(updated)
Expand Down
4 changes: 2 additions & 2 deletions collator/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn prepare_test_storage() -> anyhow::Result<(Storage, tempfile::TempDi

storage
.shard_state_storage()
.store_state(&handle, &master_state_stuff)
.store_state(&handle, &master_state_stuff, Default::default())
.await?;

// first master block
Expand Down Expand Up @@ -142,7 +142,7 @@ pub async fn prepare_test_storage() -> anyhow::Result<(Storage, tempfile::TempDi

storage
.shard_state_storage()
.store_state(&handle, &shard_state_stuff)
.store_state(&handle, &shard_state_stuff, Default::default())
.await?;
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/block_strider/starter/cold_boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ impl StarterInner {
});

let stored = state_storage
.store_state(&handle, &state)
.store_state(&handle, &state, Default::default())
.await
.with_context(|| {
format!("failed to import zerostate for {}", state.block_id().shard)
Expand Down
6 changes: 4 additions & 2 deletions core/src/block_strider/state_applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures_util::future::BoxFuture;
use tycho_block_util::archive::ArchiveData;
use tycho_block_util::block::BlockStuff;
use tycho_block_util::state::{MinRefMcStateTracker, RefMcStateHandle, ShardStateStuff};
use tycho_storage::{BlockConnection, BlockHandle, NewBlockMeta, Storage};
use tycho_storage::{BlockConnection, BlockHandle, NewBlockMeta, Storage, StoreStateHint};
use tycho_util::metrics::HistogramGuard;
use tycho_util::sync::rayon_run;

Expand Down Expand Up @@ -254,7 +254,9 @@ where

let state_storage = self.inner.storage.shard_state_storage();
state_storage
.store_state(handle, &new_state)
.store_state(handle, &new_state, StoreStateHint {
block_data_size: Some(block.data_size()),
})
.await
.context("Failed to store new state")?;

Expand Down
4 changes: 2 additions & 2 deletions core/tests/archives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async fn prepare_storage(config: StorageConfig, zerostate: ShardStateStuff) -> R

storage
.shard_state_storage()
.store_state(&handle, &zerostate)
.store_state(&handle, &zerostate, Default::default())
.await?;

let tracker = MinRefMcStateTracker::default();
Expand Down Expand Up @@ -144,7 +144,7 @@ async fn prepare_storage(config: StorageConfig, zerostate: ShardStateStuff) -> R

storage
.shard_state_storage()
.store_state(&handle, &state)
.store_state(&handle, &state, Default::default())
.await?;
}

Expand Down
2 changes: 1 addition & 1 deletion core/tests/overlay_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ async fn overlay_server_persistent_state() -> Result<()> {
shard_states.min_ref_mc_state(),
)?;
shard_states
.store_state(&zerostate_handle, &zerostate)
.store_state(&zerostate_handle, &zerostate, Default::default())
.await?;

{
Expand Down
2 changes: 1 addition & 1 deletion core/tests/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) async fn init_storage() -> Result<(Storage, TempDir)> {

storage
.shard_state_storage()
.store_state(&handle, &zerostate)
.store_state(&handle, &zerostate, Default::default())
.await?;

// Init blocks
Expand Down
31 changes: 30 additions & 1 deletion scripts/gen-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,34 @@ def storage() -> RowPanel:
create_heatmap_panel(
"tycho_storage_state_update_time", "Time to write state update to rocksdb"
),
create_heatmap_panel(
"tycho_storage_state_store_time",
"Time to store single root with rocksdb write etc",
),
create_heatmap_panel(
"tycho_storage_cell_in_mem_store_time", "Time to store cell without write"
),
create_heatmap_panel(
"tycho_storage_cell_in_mem_store_time", "Time to store cell without write"
),
create_heatmap_panel(
"tycho_storage_batch_write_time", "Time to write merge in write batch"
),
create_heatmap_quantile_panel(
"tycho_storage_state_update_size_bytes",
"State update size",
UNITS.BYTES,
"0.999",
),
create_heatmap_quantile_panel(
"tycho_storage_state_update_size_predicted_bytes",
"Predicted state update size",
UNITS.BYTES,
"0.999",
),
create_heatmap_panel(
"tycho_storage_batch_write_time", "Time to write merge in write batch"
),
create_heatmap_panel(
"tycho_storage_state_store_time", "Time to store state with cell traversal"
),
Expand Down Expand Up @@ -1682,7 +1710,8 @@ def mempool_engine() -> RowPanel:
metrics = [
create_counter_panel(
expr_sum_increase(
"tycho_mempool_rounds_dag_behind_consensus", range_selector="$__interval"
"tycho_mempool_rounds_dag_behind_consensus",
range_selector="$__interval",
),
"Dag: rounds behind consensus",
),
Expand Down
2 changes: 2 additions & 0 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ bumpalo = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
crc32c = { workspace = true }
crossbeam-deque = { workspace = true }
crossbeam-utils = { workspace = true }
dashmap = { workspace = true }
everscale-types = { workspace = true, features = ["tycho", "stats"] }
fdlimit = { workspace = true }
Expand Down
9 changes: 0 additions & 9 deletions storage/src/db/kv_db/config.rs

This file was deleted.

26 changes: 24 additions & 2 deletions storage/src/db/kv_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,18 @@ impl BaseDbExt for BaseDb {

impl WithMigrations for BaseDb {
const NAME: &'static str = "base";
const VERSION: Semver = [0, 0, 2];
const VERSION: Semver = [0, 0, 3];

fn register_migrations(
migrations: &mut Migrations<Self>,
cancelled: CancellationFlag,
) -> Result<(), MigrationError> {
migrations.register([0, 0, 1], [0, 0, 2], move |db| {
base_migrations::v0_0_1_to_0_0_2(db, cancelled.clone())
})
})?;
migrations.register([0, 0, 2], [0, 0, 3], base_migrations::v_0_0_2_to_v_0_0_3)?;

Ok(())
}
}

Expand Down Expand Up @@ -139,6 +142,7 @@ mod base_migrations {

use everscale_types::boc::Boc;
use tycho_block_util::archive::ArchiveEntryType;
use weedb::rocksdb::CompactOptions;

use super::*;
use crate::util::StoredValue;
Expand Down Expand Up @@ -192,6 +196,24 @@ mod base_migrations {
);
Ok(())
}

pub fn v_0_0_2_to_v_0_0_3(db: &BaseDb) -> Result<(), MigrationError> {
let mut opts = CompactOptions::default();
opts.set_exclusive_manual_compaction(true);
let null = Option::<&[u8]>::None;

let started_at = Instant::now();
tracing::info!("started cells compaction");
db.cells
.db()
.compact_range_cf_opt(&db.cells.cf(), null, null, &opts);
tracing::info!(
elapsed = %humantime::format_duration(started_at.elapsed()),
"finished cells compaction"
);

Ok(())
}
}

// === RPC DB ===
Expand Down
Loading
Loading