diff --git a/Cargo.lock b/Cargo.lock index 6eb57781a..ddd4c4d24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -306,7 +306,7 @@ dependencies = [ "bitflags", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "lazy_static", "lazycell", "log", @@ -1507,7 +1507,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -1522,9 +1522,8 @@ dependencies = [ [[package]] name = "librocksdb-sys" -version = "0.16.0+8.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce3d60bc059831dc1c83903fb45c103f75db65c5a7bf22272764d9cc683e348c" +version = "0.17.1+9.7.4" +source = "git+https://github.com/rust-rocksdb/rust-rocksdb.git?rev=95d01b183cdb45f80a470cadfc030a545ab66156#95d01b183cdb45f80a470cadfc030a545ab66156" dependencies = [ "bindgen", "bzip2-sys", @@ -2491,8 +2490,7 @@ dependencies = [ [[package]] name = "rocksdb" version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bd13e55d6d7b8cd0ea569161127567cd587676c99f4472f779a0279aa60a7a7" +source = "git+https://github.com/rust-rocksdb/rust-rocksdb.git?rev=95d01b183cdb45f80a470cadfc030a545ab66156#95d01b183cdb45f80a470cadfc030a545ab66156" dependencies = [ "libc", "librocksdb-sys", @@ -3002,9 +3000,9 @@ dependencies = [ [[package]] name = "tikv-jemalloc-ctl" -version = "0.5.4" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c" +checksum = "f21f216790c8df74ce3ab25b534e0718da5a1916719771d3fec23315c99e468b" dependencies = [ "libc", "paste", @@ -3013,9 +3011,9 @@ dependencies = [ [[package]] name = "tikv-jemalloc-sys" -version = "0.5.4+5.3.0-patched" +version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1" +checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d" dependencies = [ "cc", "libc", @@ -3023,9 +3021,9 @@ dependencies = [ [[package]] name = "tikv-jemallocator" -version = "0.5.4" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca" +checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865" dependencies = [ "libc", "tikv-jemalloc-sys", @@ -3762,6 +3760,8 @@ dependencies = [ "bytes", "bytesize", "crc32c", + "crossbeam-deque", + "crossbeam-utils", "dashmap", "everscale-types", "fdlimit", @@ -4028,8 +4028,7 @@ dependencies = [ [[package]] name = "weedb" version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c65a1543619bd05013dfe6092797d2c3b16aa7b41862d588735ff7def453ce04" +source = "git+https://github.com/broxus/weedb.git?branch=next-rocksdb#be76187ed31348144bdab3e113ad7de114d99ac6" dependencies = [ "librocksdb-sys", "metrics", diff --git a/Cargo.toml b/Cargo.toml index 398602dac..17c12cc57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,8 @@ bytesize = { version = "1.3.0", features = ["serde"] } castaway = "0.2" clap = { version = "4.5.3", features = ["derive"] } crc32c = "0.6" +crossbeam-deque = "0.8.5" +crossbeam-utils = "0.8.20" dashmap = "5.5.3" dirs = "5.0.1" ed25519 = "2.0" @@ -97,11 +99,11 @@ tarpc = { version = "0.34", features = [ ] } tempfile = "3.10" thiserror = "1.0" -tikv-jemallocator = { version = "0.5", features = [ +tikv-jemallocator = { version = "0.6.0", features = [ "unprefixed_malloc_on_supported_platforms", "background_threads", ] } -tikv-jemalloc-ctl = { version = "0.5" } +tikv-jemalloc-ctl = { version = "0.6.0", features = ["stats"] } tl-proto = "0.4" tokio = { version = "1", default-features = false } tokio-stream = "0.1.15" @@ -134,6 +136,9 @@ tycho-rpc = { path = "./rpc", version = "0.1.4" } tycho-storage = { path = "./storage", version = "0.1.4" } tycho-util = { path = "./util", version = "0.1.4" } +[patch.crates-io] +weedb = { version = "0.3.8", git = "https://github.com/broxus/weedb.git", branch = "next-rocksdb" } + [workspace.lints.rust] future_incompatible = "warn" nonstandard_style = "warn" diff --git a/cli/src/cmd/debug/mempool.rs b/cli/src/cmd/debug/mempool.rs index 556eaf578..5dacb74e7 100644 --- a/cli/src/cmd/debug/mempool.rs +++ b/cli/src/cmd/debug/mempool.rs @@ -292,7 +292,7 @@ impl Mempool { self.storage .shard_state_storage() - .store_state(&mc_zerostate_handle, &mc_zerostate) + .store_state(&mc_zerostate_handle, &mc_zerostate, Default::default()) .await?; Ok(mc_zerostate) diff --git a/collator/src/collator/do_collate/mod.rs b/collator/src/collator/do_collate/mod.rs index c6394e89c..ddf0be459 100644 --- a/collator/src/collator/do_collate/mod.rs +++ b/collator/src/collator/do_collate/mod.rs @@ -11,7 +11,7 @@ use phase::{ActualState, Phase}; use prepare::PrepareState; use sha2::Digest; use tycho_block_util::state::MinRefMcStateTracker; -use tycho_storage::NewBlockMeta; +use tycho_storage::{NewBlockMeta, StoreStateHint}; use tycho_util::futures::JoinTask; use tycho_util::metrics::HistogramGuard; use tycho_util::time::now_millis; @@ -1052,13 +1052,16 @@ impl CollatorStdImpl { let adapter = self.state_node_adapter.clone(); let labels = labels.clone(); let new_state_root = finalized.new_state_root.clone(); + let hint = StoreStateHint { + block_data_size: Some(finalized.block_candidate.block.data_size()), + }; async move { let _histogram = HistogramGuard::begin_with_labels( "tycho_collator_build_new_state_time", &labels, ); adapter - .store_state_root(&block_id, meta, new_state_root) + .store_state_root(&block_id, meta, new_state_root, hint) .await } }); diff --git a/collator/src/state_node.rs b/collator/src/state_node.rs index 60455a33c..9bcf5286a 100644 --- a/collator/src/state_node.rs +++ b/collator/src/state_node.rs @@ -13,7 +13,7 @@ use tycho_block_util::block::{BlockProofStuff, BlockStuff, BlockStuffAug}; use tycho_block_util::queue::QueueDiffStuff; use tycho_block_util::state::ShardStateStuff; use tycho_network::PeerId; -use tycho_storage::{BlockHandle, MaybeExistingHandle, NewBlockMeta, Storage}; +use tycho_storage::{BlockHandle, MaybeExistingHandle, NewBlockMeta, Storage, StoreStateHint}; use tycho_util::metrics::HistogramGuard; use tycho_util::{FastDashMap, FastHashMap}; @@ -61,6 +61,7 @@ pub trait StateNodeAdapter: Send + Sync + 'static { block_id: &BlockId, meta: NewBlockMeta, state_root: Cell, + hint: StoreStateHint, ) -> Result; /// Return block by its id from node local state async fn load_block(&self, block_id: &BlockId) -> Result>; @@ -191,6 +192,7 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl { block_id: &BlockId, meta: NewBlockMeta, state_root: Cell, + hint: StoreStateHint, ) -> Result { let _histogram = HistogramGuard::begin("tycho_collator_state_store_state_root_time"); @@ -204,7 +206,7 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl { let updated = self .storage .shard_state_storage() - .store_state_root(&handle, state_root) + .store_state_root(&handle, state_root, hint) .await?; Ok(updated) diff --git a/collator/src/test_utils.rs b/collator/src/test_utils.rs index 7ed3bef4b..b2666116c 100644 --- a/collator/src/test_utils.rs +++ b/collator/src/test_utils.rs @@ -69,7 +69,7 @@ pub async fn prepare_test_storage() -> anyhow::Result<(Storage, tempfile::TempDi storage .shard_state_storage() - .store_state(&handle, &master_state_stuff) + .store_state(&handle, &master_state_stuff, Default::default()) .await?; // first master block @@ -142,7 +142,7 @@ pub async fn prepare_test_storage() -> anyhow::Result<(Storage, tempfile::TempDi storage .shard_state_storage() - .store_state(&handle, &shard_state_stuff) + .store_state(&handle, &shard_state_stuff, Default::default()) .await?; } diff --git a/core/src/block_strider/starter/cold_boot.rs b/core/src/block_strider/starter/cold_boot.rs index 2bfa98a2e..3702cb858 100644 --- a/core/src/block_strider/starter/cold_boot.rs +++ b/core/src/block_strider/starter/cold_boot.rs @@ -411,7 +411,7 @@ impl StarterInner { }); let stored = state_storage - .store_state(&handle, &state) + .store_state(&handle, &state, Default::default()) .await .with_context(|| { format!("failed to import zerostate for {}", state.block_id().shard) diff --git a/core/src/block_strider/state_applier.rs b/core/src/block_strider/state_applier.rs index 93bca86f1..39a332cff 100644 --- a/core/src/block_strider/state_applier.rs +++ b/core/src/block_strider/state_applier.rs @@ -7,7 +7,7 @@ use futures_util::future::BoxFuture; use tycho_block_util::archive::ArchiveData; use tycho_block_util::block::BlockStuff; use tycho_block_util::state::{MinRefMcStateTracker, RefMcStateHandle, ShardStateStuff}; -use tycho_storage::{BlockConnection, BlockHandle, NewBlockMeta, Storage}; +use tycho_storage::{BlockConnection, BlockHandle, NewBlockMeta, Storage, StoreStateHint}; use tycho_util::metrics::HistogramGuard; use tycho_util::sync::rayon_run; @@ -254,7 +254,9 @@ where let state_storage = self.inner.storage.shard_state_storage(); state_storage - .store_state(handle, &new_state) + .store_state(handle, &new_state, StoreStateHint { + block_data_size: Some(block.data_size()), + }) .await .context("Failed to store new state")?; diff --git a/core/tests/archives.rs b/core/tests/archives.rs index 58bff776d..ecc146bef 100644 --- a/core/tests/archives.rs +++ b/core/tests/archives.rs @@ -101,7 +101,7 @@ async fn prepare_storage(config: StorageConfig, zerostate: ShardStateStuff) -> R storage .shard_state_storage() - .store_state(&handle, &zerostate) + .store_state(&handle, &zerostate, Default::default()) .await?; let tracker = MinRefMcStateTracker::default(); @@ -144,7 +144,7 @@ async fn prepare_storage(config: StorageConfig, zerostate: ShardStateStuff) -> R storage .shard_state_storage() - .store_state(&handle, &state) + .store_state(&handle, &state, Default::default()) .await?; } diff --git a/core/tests/overlay_server.rs b/core/tests/overlay_server.rs index c13aaf990..1fe09c38b 100644 --- a/core/tests/overlay_server.rs +++ b/core/tests/overlay_server.rs @@ -292,7 +292,7 @@ async fn overlay_server_persistent_state() -> Result<()> { shard_states.min_ref_mc_state(), )?; shard_states - .store_state(&zerostate_handle, &zerostate) + .store_state(&zerostate_handle, &zerostate, Default::default()) .await?; { diff --git a/core/tests/storage/mod.rs b/core/tests/storage/mod.rs index 7e0a8d0fa..aa98b5fbb 100644 --- a/core/tests/storage/mod.rs +++ b/core/tests/storage/mod.rs @@ -24,7 +24,7 @@ pub(crate) async fn init_storage() -> Result<(Storage, TempDir)> { storage .shard_state_storage() - .store_state(&handle, &zerostate) + .store_state(&handle, &zerostate, Default::default()) .await?; // Init blocks diff --git a/scripts/gen-dashboard.py b/scripts/gen-dashboard.py index 647afc792..e95f46718 100644 --- a/scripts/gen-dashboard.py +++ b/scripts/gen-dashboard.py @@ -776,6 +776,34 @@ def storage() -> RowPanel: create_heatmap_panel( "tycho_storage_state_update_time", "Time to write state update to rocksdb" ), + create_heatmap_panel( + "tycho_storage_state_store_time", + "Time to store single root with rocksdb write etc", + ), + create_heatmap_panel( + "tycho_storage_cell_in_mem_store_time", "Time to store cell without write" + ), + create_heatmap_panel( + "tycho_storage_cell_in_mem_store_time", "Time to store cell without write" + ), + create_heatmap_panel( + "tycho_storage_batch_write_time", "Time to write merge in write batch" + ), + create_heatmap_quantile_panel( + "tycho_storage_state_update_size_bytes", + "State update size", + UNITS.BYTES, + "0.999", + ), + create_heatmap_quantile_panel( + "tycho_storage_state_update_size_predicted_bytes", + "Predicted state update size", + UNITS.BYTES, + "0.999", + ), + create_heatmap_panel( + "tycho_storage_batch_write_time", "Time to write merge in write batch" + ), create_heatmap_panel( "tycho_storage_state_store_time", "Time to store state with cell traversal" ), @@ -1682,7 +1710,8 @@ def mempool_engine() -> RowPanel: metrics = [ create_counter_panel( expr_sum_increase( - "tycho_mempool_rounds_dag_behind_consensus", range_selector="$__interval" + "tycho_mempool_rounds_dag_behind_consensus", + range_selector="$__interval", ), "Dag: rounds behind consensus", ), diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 57e9e60ea..59544635f 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -18,6 +18,8 @@ bumpalo = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } crc32c = { workspace = true } +crossbeam-deque = { workspace = true } +crossbeam-utils = { workspace = true } dashmap = { workspace = true } everscale-types = { workspace = true, features = ["tycho", "stats"] } fdlimit = { workspace = true } diff --git a/storage/src/db/kv_db/config.rs b/storage/src/db/kv_db/config.rs deleted file mode 100644 index 93839d9e4..000000000 --- a/storage/src/db/kv_db/config.rs +++ /dev/null @@ -1,9 +0,0 @@ -use bytesize::ByteSize; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Copy, Clone, Serialize, Deserialize)] -#[serde(deny_unknown_fields, default)] -pub struct DbConfig { - pub rocksdb_lru_capacity: ByteSize, - pub cells_cache_size: ByteSize, -} diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index a8d032abe..095ca3320 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -103,7 +103,7 @@ impl BaseDbExt for BaseDb { impl WithMigrations for BaseDb { const NAME: &'static str = "base"; - const VERSION: Semver = [0, 0, 2]; + const VERSION: Semver = [0, 0, 3]; fn register_migrations( migrations: &mut Migrations, @@ -111,7 +111,10 @@ impl WithMigrations for BaseDb { ) -> Result<(), MigrationError> { migrations.register([0, 0, 1], [0, 0, 2], move |db| { base_migrations::v0_0_1_to_0_0_2(db, cancelled.clone()) - }) + })?; + migrations.register([0, 0, 2], [0, 0, 3], base_migrations::v_0_0_2_to_v_0_0_3)?; + + Ok(()) } } @@ -139,6 +142,7 @@ mod base_migrations { use everscale_types::boc::Boc; use tycho_block_util::archive::ArchiveEntryType; + use weedb::rocksdb::CompactOptions; use super::*; use crate::util::StoredValue; @@ -192,6 +196,24 @@ mod base_migrations { ); Ok(()) } + + pub fn v_0_0_2_to_v_0_0_3(db: &BaseDb) -> Result<(), MigrationError> { + let mut opts = CompactOptions::default(); + opts.set_exclusive_manual_compaction(true); + let null = Option::<&[u8]>::None; + + let started_at = Instant::now(); + tracing::info!("started cells compaction"); + db.cells + .db() + .compact_range_cf_opt(&db.cells.cf(), null, null, &opts); + tracing::info!( + elapsed = %humantime::format_duration(started_at.elapsed()), + "finished cells compaction" + ); + + Ok(()) + } } // === RPC DB === diff --git a/storage/src/db/kv_db/tables.rs b/storage/src/db/kv_db/tables.rs index e0ec52ede..d426f0f40 100644 --- a/storage/src/db/kv_db/tables.rs +++ b/storage/src/db/kv_db/tables.rs @@ -1,7 +1,7 @@ use bytesize::ByteSize; use weedb::rocksdb::{ - BlockBasedIndexType, BlockBasedOptions, DBCompressionType, DataBlockIndexType, MergeOperands, - Options, ReadOptions, + BlockBasedIndexType, BlockBasedOptions, CompactionPri, DBCompressionType, DataBlockIndexType, + MemtableFactory, MergeOperands, Options, ReadOptions, SliceTransform, }; use weedb::{rocksdb, Caches, ColumnFamily, ColumnFamilyOptions}; @@ -225,28 +225,111 @@ impl ColumnFamily for Cells { } impl ColumnFamilyOptions for Cells { - fn options(opts: &mut Options, caches: &mut Caches) { + fn options(opts: &mut Options, block_cache: &mut Caches) { opts.set_level_compaction_dynamic_level_bytes(true); opts.set_merge_operator_associative("cell_merge", refcount::merge_operator); opts.set_compaction_filter("cell_compaction", refcount::compaction_filter); - optimize_for_level_compaction(opts, ByteSize::gib(1u64)); + // optimize for bulk inserts and single writer + opts.set_max_write_buffer_number(8); // 8 * 512MB = 4GB + opts.set_min_write_buffer_number_to_merge(2); // allow early flush + opts.set_write_buffer_size(512 * 1024 * 1024); // 512 per memtable + + opts.set_max_successive_merges(0); // it will eat cpu, we are doing first merge in hashmap anyway. + + // - Write batch size: 500K entries + // - Entry size: ~244 bytes (32 SHA + 8 seq + 192 value + 12 overhead) + // - Memtable size: 512MB + + // 1. Entries per memtable = 512MB / 244B ≈ 2.2M entries + // 2. Target bucket load factor = 10-12 entries per bucket (RocksDB recommendation) + // 3. Bucket count = entries / target_load = 2.2M / 11 ≈ 200K + opts.set_memtable_factory(MemtableFactory::HashLinkList { + bucket_count: 200_000, + }); + + opts.set_memtable_prefix_bloom_ratio(0.1); // we use hash-based memtable so bloom filter is not that useful + opts.set_bloom_locality(1); // Optimize bloom filter locality let mut block_factory = BlockBasedOptions::default(); - block_factory.set_block_cache(&caches.block_cache); - block_factory.set_data_block_index_type(DataBlockIndexType::BinaryAndHash); - block_factory.set_whole_key_filtering(true); - block_factory.set_checksum_type(rocksdb::ChecksumType::NoChecksum); + // todo: some how make block cache separate for cells, + // using 3/4 of all available cache space + block_factory.set_block_cache(&block_cache.block_cache); + + // 10 bits per key, stored at the end of the sst block_factory.set_bloom_filter(10.0, false); - block_factory.set_block_size(16 * 1024); - block_factory.set_format_version(5); + block_factory.set_optimize_filters_for_memory(true); + block_factory.set_whole_key_filtering(true); + + // to match fs block size + block_factory.set_block_size(4096); + block_factory.set_format_version(6); + + // we have 4096 / 256 = 16 keys per block, so binary search is enough + block_factory.set_data_block_index_type(DataBlockIndexType::BinarySearch); + + block_factory.set_index_type(BlockBasedIndexType::HashSearch); + block_factory.set_pin_l0_filter_and_index_blocks_in_cache(true); opts.set_block_based_table_factory(&block_factory); + opts.set_prefix_extractor(SliceTransform::create_noop()); + + opts.set_memtable_whole_key_filtering(true); + opts.set_memtable_prefix_bloom_ratio(0.25); + + opts.set_compression_type(DBCompressionType::None); + + opts.set_compaction_pri(CompactionPri::OldestSmallestSeqFirst); + opts.set_level_zero_file_num_compaction_trigger(8); + + opts.set_target_file_size_base(512 * 1024 * 1024); // smaller files for more efficient GC + + opts.set_max_bytes_for_level_base(4 * 1024 * 1024 * 1024); // 4GB per level + opts.set_max_bytes_for_level_multiplier(8.0); + + // 512MB per file; less files - less compactions + opts.set_target_file_size_base(512 * 1024 * 1024); + // L1: 4GB + // L2: ~32GB + // L3: ~256GB + // L4: ~2TB + opts.set_num_levels(5); + opts.set_optimize_filters_for_hits(true); - // option is set for cf - opts.set_compression_type(DBCompressionType::Lz4); + + // we have our own cache and don't want `kcompactd` goes brrr scenario + opts.set_use_direct_reads(true); + opts.set_use_direct_io_for_flush_and_compaction(true); + + opts.add_compact_on_deletion_collector_factory( + 100, // N: examine 100 consecutive entries + // Small enough window to detect local delete patterns + // Large enough to avoid spurious compactions + 45, // D: trigger on 45 deletions in window + // Balance between the space reclaim and compaction frequency + // ~45% deletion density trigger + 0.5, /* deletion_ratio: trigger if 50% of a total file is deleted + * Backup trigger for overall file health + * Higher than window trigger to prefer local optimization */ + ); + + // single writer optimizations + opts.set_enable_write_thread_adaptive_yield(false); + opts.set_allow_concurrent_memtable_write(false); + opts.set_enable_pipelined_write(true); + opts.set_inplace_update_support(false); + opts.set_unordered_write(true); // we don't use snapshots + opts.set_avoid_unnecessary_blocking_io(true); // schedule unnecessary IO in background; + + opts.set_auto_tuned_ratelimiter( + 256 * 1024 * 1024, // 256MB/s base rate + 100_000, // 100ms refill (standard value) + 10, // fairness (standard value) + ); + + opts.set_periodic_compaction_seconds(3600 * 24); // force compaction once a day } } diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 77165ebb8..bc8c24ce8 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -61,9 +61,7 @@ impl StorageBuilder { let update_options = |opts: &mut rocksdb::Options, threads: usize, fdlimit: u64| { opts.set_paranoid_checks(false); - // bigger base level size - less compactions // parallel compactions finishes faster - less write stalls - opts.set_max_subcompactions(threads as u32 / 2); // io @@ -83,7 +81,6 @@ impl StorageBuilder { opts.increase_parallelism(threads as i32); opts.set_allow_concurrent_memtable_write(false); - opts.set_enable_write_thread_adaptive_yield(true); // debug // NOTE: could slower everything a bit in some cloud environments. diff --git a/storage/src/store/persistent_state/tests.rs b/storage/src/store/persistent_state/tests.rs index 72b3ebead..0be0ccd35 100644 --- a/storage/src/store/persistent_state/tests.rs +++ b/storage/src/store/persistent_state/tests.rs @@ -46,7 +46,9 @@ async fn persistent_shard_state() -> Result<()> { NewBlockMeta::zero_state(zerostate.as_ref().gen_utime, true), ); - shard_states.store_state(&handle, &zerostate).await?; + shard_states + .store_state(&handle, &zerostate, Default::default()) + .await?; // Check seqno let min_ref_mc_state = shard_states.min_ref_mc_state(); diff --git a/storage/src/store/shard_state/cell_storage.rs b/storage/src/store/shard_state/cell_storage.rs index cc42984d2..e728d680e 100644 --- a/storage/src/store/shard_state/cell_storage.rs +++ b/storage/src/store/shard_state/cell_storage.rs @@ -1,29 +1,34 @@ use std::cell::UnsafeCell; -use std::collections::hash_map; +use std::collections::{hash_map, VecDeque}; use std::mem::{ManuallyDrop, MaybeUninit}; use std::sync::atomic::{AtomicI64, AtomicU8, Ordering}; use std::sync::{Arc, Weak}; use anyhow::{Context, Result}; use bumpalo::Bump; +use crossbeam_deque::{Steal, Stealer, Worker}; +use crossbeam_utils::Backoff; +use dashmap::mapref::entry::Entry; +use dashmap::Map; use everscale_types::cell::*; use parking_lot::Mutex; use quick_cache::sync::{Cache, DefaultLifecycle}; use triomphe::ThinArc; use tycho_util::metrics::HistogramGuard; use tycho_util::{FastDashMap, FastHashMap, FastHasherState}; +use weedb::rocksdb::WriteBatch; use weedb::{rocksdb, BoundedCfHandle}; use crate::db::*; pub struct CellStorage { db: BaseDb, - cells_cache: Arc, - raw_cells_cache: RawCellsCache, + cells_cache: Arc, + raw_cells_cache: Arc, pending: PendingOperations, } -type CellsIndex = FastDashMap>; +type CellsCache = FastDashMap>; impl CellStorage { pub fn new(db: BaseDb, cache_size_bytes: u64) -> Arc { @@ -33,7 +38,7 @@ impl CellStorage { Arc::new(Self { db, cells_cache, - raw_cells_cache, + raw_cells_cache: Arc::new(raw_cells_cache), pending: PendingOperations::default(), }) } @@ -234,144 +239,65 @@ impl CellStorage { pub fn store_cell( &self, - batch: &mut rocksdb::WriteBatch, + batch: &mut WriteBatch, root: Cell, + estimated_cell_count: usize, ) -> Result<(PendingOperation<'_>, usize), CellStorageError> { - struct CellWithRefs<'a> { - rc: u32, - data: Option<&'a [u8]>, - } - - struct Context<'a> { - db: &'a BaseDb, - raw_cache: &'a RawCellsCache, - alloc: &'a Bump, - transaction: FastHashMap>, - buffer: Vec, - } + let pending_op = self.pending.begin(); - impl Context<'_> { - fn insert_cell( - &mut self, - key: &HashBytes, - cell: &DynCell, - depth: usize, - ) -> Result { - Ok(match self.transaction.entry(*key) { - hash_map::Entry::Occupied(mut value) => { - value.get_mut().rc += 1; - false - } - hash_map::Entry::Vacant(entry) => { - // A constant which tells since which depth we should start to use cache. - // This method is used mostly for inserting new states, so we can assume - // that first N levels will mostly be new. - // - // This value was chosen empirically. - const NEW_CELLS_DEPTH_THRESHOLD: usize = 4; - - let (old_rc, has_value) = 'value: { - if depth >= NEW_CELLS_DEPTH_THRESHOLD { - // NOTE: `get` here is used to affect a "hotness" of the value, because - // there is a big chance that we will need it soon during state processing - if let Some(entry) = self.raw_cache.0.get(key) { - let rc = entry.header.header.load(Ordering::Acquire); - break 'value (rc, rc > 0); - } - } + let walk_hist = HistogramGuard::begin("tycho_storage_walk_tree_time"); + let ctx = StoreContext::new(&self.db, &self.raw_cells_cache, estimated_cell_count); - match self.db.cells.get(key).map_err(CellStorageError::Internal)? { - Some(value) => { - let (rc, value) = - refcount::decode_value_with_rc(value.as_ref()); - (rc, value.is_some()) - } - None => (0, false), - } - }; + let mut queue = VecDeque::new(); + queue.push_back((root, 0usize)); - // TODO: lower to `debug_assert` when sure - assert!(has_value && old_rc > 0 || !has_value && old_rc == 0); - - let data = if !has_value { - self.buffer.clear(); - if StorageCell::serialize_to(cell, &mut self.buffer).is_err() { - return Err(CellStorageError::InvalidCell); - } - Some(self.alloc.alloc_slice_copy(self.buffer.as_slice()) as &[u8]) - } else { - None - }; - entry.insert(CellWithRefs { rc: 1, data }); - !has_value - } - }) + while let Some((current_cell, current_depth)) = queue.pop_front() { + if !ctx.insert_cell( + current_cell.repr_hash(), + current_cell.as_ref(), + Some(current_depth), + )? { + continue; + } + for next_cell in current_cell.references().cloned() { + queue.push_back((next_cell, current_depth + 1)); } - fn finalize(mut self, batch: &mut rocksdb::WriteBatch) -> usize { - let total = self.transaction.len(); - let cells_cf = &self.db.cells.cf(); - for (key, CellWithRefs { rc, data }) in self.transaction { - self.buffer.clear(); - refcount::add_positive_refount(rc, data, &mut self.buffer); - if let Some(data) = data { - self.raw_cache.insert(&key, rc, data); - } else { - self.raw_cache.add_refs(&key, rc); - } - batch.merge_cf(cells_cf, key.as_slice(), &self.buffer); - } - total + if current_depth == 6 { + break; } } - let pending_op = self.pending.begin(); - - // Prepare context and handles - let alloc = Bump::new(); + let num_cpus = std::thread::available_parallelism() + .expect("We don't use platforms where it's not supported") + .get(); + if !queue.is_empty() { + let queues = (0..num_cpus) + .map(|_| Worker::new_lifo()) + .collect::>(); - let mut ctx = Context { - db: &self.db, - raw_cache: &self.raw_cells_cache, - alloc: &alloc, - transaction: FastHashMap::with_capacity_and_hasher(128, Default::default()), - buffer: Vec::with_capacity(512), - }; - - // Check root cell - { - let key = root.repr_hash(); - - if !ctx.insert_cell(key, root.as_ref(), 0)? { - return Ok((pending_op, 0)); + for (idx, cell) in queue.into_iter().enumerate() { + queues[idx % num_cpus].push(cell.0); } - } - let mut stack = Vec::with_capacity(16); - stack.push(root.references()); + let stealers: Vec<_> = queues.iter().map(|w| w.stealer()).collect(); - // Check other cells - 'outer: loop { - let depth = stack.len(); - let Some(iter) = stack.last_mut() else { - break; - }; + std::thread::scope(|s| { + for (index, worker) in queues.into_iter().enumerate() { + let mut stealers = stealers.clone(); + stealers.remove(index); // we don't want to steal from ourselves - for child in &mut *iter { - let key = child.repr_hash(); - - if ctx.insert_cell(key, child, depth)? { - stack.push(child.references()); - continue 'outer; + let ctxt = ctx.clone(); + s.spawn(move || { + process_worker_queue(worker, stealers, &ctxt) + .expect("todo somehow propagate error"); + }); } - } - - stack.pop(); + }); } + drop(walk_hist); - // Clear big chunks of data before finalization - drop(stack); - + let ctx = Arc::into_inner(ctx).unwrap(); // Write transaction to the `WriteBatch` Ok((pending_op, ctx.finalize(batch))) } @@ -417,10 +343,9 @@ impl CellStorage { pub fn remove_cell( &self, - batch: &mut rocksdb::WriteBatch, alloc: &Bump, hash: &HashBytes, - ) -> Result<(PendingOperation<'_>, usize), CellStorageError> { + ) -> Result<(PendingOperation<'_>, usize, WriteBatch), CellStorageError> { #[derive(Clone, Copy)] struct CellState<'a> { rc: i64, @@ -491,7 +416,13 @@ impl CellStorage { drop(stack); // Write transaction to the `WriteBatch` + let _hist = HistogramGuard::begin("tycho_storage_batch_write_time"); let total = transaction.len(); + + // NOTE: For each cell we have 32 bytes for key and 8 bytes for RC, + // and a bit more just in case. + let mut batch = WriteBatch::with_capacity_bytes(total * (32 + 8 + 8)); + for (key, CellState { removes, .. }) in transaction { self.raw_cells_cache.remove_refs(key, removes); batch.merge_cf( @@ -500,7 +431,7 @@ impl CellStorage { refcount::encode_negative_refcount(removes), ); } - Ok((pending_op, total)) + Ok((pending_op, total, batch)) } pub fn drop_cell(&self, hash: &HashBytes) { @@ -510,6 +441,181 @@ impl CellStorage { } } +fn process_worker_queue( + worker: Worker, + stealers: Vec>, + ctx: &StoreContext, +) -> Result<(), CellStorageError> { + loop { + let Some(cell) = find_task(&worker, &stealers) else { + break Ok(()); + }; + + let cell_hash = *cell.repr_hash(); + if !ctx.insert_cell(&cell_hash, cell.as_ref(), None)? { + continue; + } + + for c in cell.references().cloned() { + worker.push(c); + } + } +} + +fn find_task(local: &Worker, stealers: &[Stealer]) -> Option { + if let Some(task) = local.pop() { + return Some(task); + }; + + let backoff = Backoff::new(); + for stealer in stealers { + 'inner: loop { + match stealer.steal_batch_and_pop(local) { + Steal::Empty => { + // todo : always skip it + break 'inner; + } + Steal::Success(t) => { + return Some(t); + } + Steal::Retry => { + backoff.snooze(); + continue 'inner; + } + } + } + } + None +} + +struct CellWithRefs { + rc: u32, + data: Option>, +} + +struct StoreContext { + db: BaseDb, + raw_cache: Arc, + transaction: FastDashMap, +} + +impl StoreContext { + fn new(db: &BaseDb, raw_cache: &Arc, capacity: usize) -> Arc { + Arc::new(Self { + db: db.clone(), + raw_cache: raw_cache.clone(), + transaction: FastDashMap::with_capacity_and_hasher_and_shard_amount( + capacity, + Default::default(), + 512, + ), + }) + } + + fn insert_cell( + &self, + key: &HashBytes, + cell: &DynCell, + depth: Option, + ) -> Result { + let mut buffer = [0; 512]; + Ok(match self.transaction.entry(*key) { + Entry::Occupied(mut value) => { + value.get_mut().rc += 1; + false + } + Entry::Vacant(entry) => { + // A constant which tells since which depth we should start to use cache. + // This method is used mostly for inserting new states, so we can assume + // that first N levels will mostly be new. + // + // This value was chosen empirically. + const NEW_CELLS_DEPTH_THRESHOLD: usize = 4; + + let (old_rc, has_value) = 'value: { + match depth { + Some(d) if d >= NEW_CELLS_DEPTH_THRESHOLD => { + // NOTE: `get` here is used to affect a "hotness" of the value, because + // there is a big chance that we will need it soon during state processing + if let Some(entry) = self.raw_cache.0.get(key) { + let rc = entry.header.header.load(Ordering::Acquire); + break 'value (rc, rc > 0); + } + } + _ => {} + } + + match self.db.cells.get(key).map_err(CellStorageError::Internal)? { + Some(value) => { + let (rc, value) = refcount::decode_value_with_rc(value.as_ref()); + (rc, value.is_some()) + } + None => (0, false), + } + }; + + // TODO: lower to `debug_assert` when sure + assert!(has_value && old_rc > 0 || !has_value && old_rc == 0); + + let data = if !has_value { + match StorageCell::serialize_to(cell, &mut buffer) { + Err(_) => return Err(CellStorageError::InvalidCell), + Ok(size) => Some(buffer[..size].to_vec()), + } + } else { + None + }; + entry.insert(CellWithRefs { rc: 1, data }); + !has_value + } + }) + } + + fn finalize(self, batch: &mut WriteBatch) -> usize { + std::thread::scope(|s| { + let number_shards = self.transaction._shard_count(); + // safety: we hold only read locks + let shards = unsafe { (0..number_shards).map(|i| self.transaction._get_read_shard(i)) }; + let cache = &self.raw_cache; + + // todo: clamp to number of cpus x2 + for shard in shards { + // spawned threads will be joined at the end of the scope, so we don't need to store them + s.spawn(move || { + for (key, value) in shard { + let value = value.get(); + let rc = value.rc; + if let Some(data) = &value.data { + cache.insert(key, rc, data); + } else { + cache.add_refs(key, rc); + } + } + }); + } + + let batch_update = s.spawn(|| { + let mut buffer = Vec::with_capacity(512); + let total = self.transaction.len(); + let cells_cf = &self.db.cells.cf(); + for kv in self.transaction.iter() { + let key = kv.key(); + let value = kv.value(); + let rc = value.rc; + let data = value.data.as_deref(); + + buffer.clear(); + refcount::add_positive_refount(rc, data, &mut buffer); + batch.merge_cf(cells_cf, key.as_slice(), &buffer); + } + total + }); + + batch_update.join().expect("thread panicked") + }) + } +} + #[derive(thiserror::Error, Debug)] pub enum CellStorageError { #[error("Cell not found in cell db")] @@ -612,34 +718,48 @@ impl StorageCell { true } - pub fn serialize_to(cell: &DynCell, target: &mut Vec) -> Result<()> { + pub fn serialize_to(cell: &DynCell, target: &mut [u8]) -> Result { let descriptor = cell.descriptor(); let hash_count = descriptor.hash_count(); let ref_count = descriptor.reference_count(); - target.reserve( - 4usize - + descriptor.byte_len() as usize - + (32 + 2) * hash_count as usize - + 32 * ref_count as usize, - ); + let expected_len = 4usize + + descriptor.byte_len() as usize + + (32 + 2) * hash_count as usize + + 32 * ref_count as usize; + assert!(target.len() >= expected_len); + + let mut cursor = 0; + + // Copy descriptor bytes + target[cursor..cursor + 2].copy_from_slice(&[descriptor.d1, descriptor.d2]); + cursor += 2; + + // Copy bit length + target[cursor..cursor + 2].copy_from_slice(&cell.bit_len().to_le_bytes()); + cursor += 2; + + // Copy cell data + let data_len = descriptor.byte_len() as usize; + target[cursor..cursor + data_len].copy_from_slice(cell.data()); + cursor += data_len; - target.extend_from_slice(&[descriptor.d1, descriptor.d2]); - target.extend_from_slice(&cell.bit_len().to_le_bytes()); - target.extend_from_slice(cell.data()); assert_eq!(cell.data().len(), descriptor.byte_len() as usize); + for i in 0..hash_count { + target[cursor..cursor + 32].copy_from_slice(cell.hash(i).as_array()); + cursor += 32; - for i in 0..descriptor.hash_count() { - target.extend_from_slice(cell.hash(i).as_array()); - target.extend_from_slice(&cell.depth(i).to_le_bytes()); + target[cursor..cursor + 2].copy_from_slice(&cell.depth(i).to_le_bytes()); + cursor += 2; } - for i in 0..descriptor.reference_count() { + for i in 0..ref_count { let cell = cell.reference(i).context("Child not found")?; - target.extend_from_slice(cell.repr_hash().as_array()); + target[cursor..cursor + 32].copy_from_slice(cell.repr_hash().as_array()); + cursor += 32; } - Ok(()) + Ok(cursor) } pub fn reference_raw(&self, index: u8) -> Option<&Arc> { diff --git a/storage/src/store/shard_state/mod.rs b/storage/src/store/shard_state/mod.rs index d09a41a5b..d263c1257 100644 --- a/storage/src/store/shard_state/mod.rs +++ b/storage/src/store/shard_state/mod.rs @@ -74,16 +74,26 @@ impl ShardStateStorage { &self.min_ref_mc_state } - pub async fn store_state(&self, handle: &BlockHandle, state: &ShardStateStuff) -> Result { + pub async fn store_state( + &self, + handle: &BlockHandle, + state: &ShardStateStuff, + hint: StoreStateHint, + ) -> Result { if handle.id() != state.block_id() { return Err(ShardStateStorageError::BlockHandleIdMismatch.into()); } - self.store_state_root(handle, state.root_cell().clone()) + self.store_state_root(handle, state.root_cell().clone(), hint) .await } - pub async fn store_state_root(&self, handle: &BlockHandle, root_cell: Cell) -> Result { + pub async fn store_state_root( + &self, + handle: &BlockHandle, + root_cell: Cell, + hint: StoreStateHint, + ) -> Result { if handle.has_state() { return Ok(false); } @@ -105,15 +115,25 @@ impl ShardStateStorage { // NOTE: `spawn_blocking` is used here instead of `rayon_run` as it is IO-bound task. let (new_cell_count, updated) = tokio::task::spawn_blocking(move || { let root_hash = *root_cell.repr_hash(); + let estimated_merkle_update_size = hint.estimate_cell_count(); - let mut batch = rocksdb::WriteBatch::default(); + let estimated_update_size_bytes = estimated_merkle_update_size * 192; // p50 cell size in bytes + let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes); - let (pending_op, new_cell_count) = cell_storage.store_cell(&mut batch, root_cell)?; + let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time"); + let (pending_op, new_cell_count) = + cell_storage.store_cell(&mut batch, root_cell, estimated_merkle_update_size)?; + in_mem_store.finish(); metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64); batch.put_cf(&cf.bound(), block_id.to_vec(), root_hash.as_slice()); let hist = HistogramGuard::begin("tycho_storage_state_update_time"); + metrics::histogram!("tycho_storage_state_update_size_bytes") + .record(batch.size_in_bytes() as f64); + metrics::histogram!("tycho_storage_state_update_size_predicted_bytes") + .record(estimated_update_size_bytes as f64); + raw_db.write(batch)?; // Ensure that pending operation guard is dropped after the batch is written @@ -220,11 +240,10 @@ impl ShardStateStorage { let db = self.db.clone(); let cell_storage = self.cell_storage.clone(); let key = key.to_vec(); - let mut batch = rocksdb::WriteBatch::default(); let (total, inner_alloc) = tokio::task::spawn_blocking(move || { - let (pending_op, stats) = - cell_storage.remove_cell(&mut batch, &alloc, &root_hash)?; + let (pending_op, stats, mut batch) = + cell_storage.remove_cell(&alloc, &root_hash)?; batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key); db.raw() @@ -365,6 +384,23 @@ impl ShardStateStorage { } } +#[derive(Default, Debug, Clone, Copy)] +pub struct StoreStateHint { + pub block_data_size: Option, +} + +impl StoreStateHint { + fn estimate_cell_count(&self) -> usize { + const MIN_BLOCK_SIZE: usize = 4 << 10; // 4 KB + + let block_data_size = self.block_data_size.unwrap_or(MIN_BLOCK_SIZE); + + // y = 3889.9821 + 14.7480 × √x + // R-squared: 0.7035 + ((3889.9821 + 14.7480 * (block_data_size as f64).sqrt()) as usize).next_power_of_two() + } +} + #[derive(Debug, Copy, Clone)] pub struct ShardStateStorageMetrics { pub max_new_mc_cell_count: usize, diff --git a/storage/src/store/shard_state/store_state_raw.rs b/storage/src/store/shard_state/store_state_raw.rs index 11e8a23ed..37e991b71 100644 --- a/storage/src/store/shard_state/store_state_raw.rs +++ b/storage/src/store/shard_state/store_state_raw.rs @@ -547,7 +547,7 @@ mod test { use bytesize::ByteSize; use everscale_types::models::ShardIdent; use tycho_util::project_root; - use weedb::rocksdb::{IteratorMode, WriteBatch}; + use weedb::rocksdb::IteratorMode; use super::*; use crate::{Storage, StorageConfig}; @@ -626,9 +626,8 @@ mod test { // check that state actually exists let cell = cell_storage.load_cell(HashBytes::from_slice(value.as_ref()))?; - let mut batch = WriteBatch::default(); - let (pending_op, _) = - cell_storage.remove_cell(&mut batch, &bump, cell.hash(LevelMask::MAX_LEVEL))?; + let (pending_op, _, batch) = + cell_storage.remove_cell(&bump, cell.hash(LevelMask::MAX_LEVEL))?; // execute batch db.rocksdb().write_opt(batch, db.cells.write_config())?;