From 86aeccc0ddd0596906a3e5ef38a2c2726db35468 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Thu, 15 Sep 2022 15:11:11 +0100 Subject: [PATCH] feat: argument based prefetcher (#7620) Prefetcher that loads IO requests in parallel if the keys are listed in the smart contract function call argument. This is a experimental feature that has proven to stabilize mainnet nodes under current traffic. Especially when the node is not running with fast local SSDs. In the future we should generalize this implementation. --- Cargo.lock | 1 + core/store/src/config.rs | 15 +++ core/store/src/metrics.rs | 63 +++++++++++ core/store/src/trie/config.rs | 30 ++++++ .../src/trie/prefetching_trie_storage.rs | 86 +++++++++++++-- core/store/src/trie/shard_tries.rs | 14 ++- core/store/src/trie/trie_storage.rs | 28 ++++- nearcore/src/runtime/mod.rs | 7 +- runtime/runtime/Cargo.toml | 1 + runtime/runtime/src/metrics.rs | 16 +++ runtime/runtime/src/prefetch.rs | 101 +++++++++++++++--- 11 files changed, 327 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f147ed283c7..ef8ab5d88a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3735,6 +3735,7 @@ dependencies = [ "rayon", "serde", "serde_json", + "sha2 0.10.2", "tempfile", "testlib", "thiserror", diff --git a/core/store/src/config.rs b/core/store/src/config.rs index d083fe45a45..6c810c9c4a3 100644 --- a/core/store/src/config.rs +++ b/core/store/src/config.rs @@ -46,6 +46,13 @@ pub struct StoreConfig { /// Enable fetching account and access key data ahead of time to avoid IO latency. pub enable_receipt_prefetching: bool, + /// Configured accounts will be prefetched as SWEAT token account, if predecessor is listed as receiver. + /// This config option is temporary and will be removed once flat storage is implemented. + pub sweat_prefetch_receivers: Vec, + /// List of allowed predecessor accounts for SWEAT prefetching. + /// This config option is temporary and will be removed once flat storage is implemented. + pub sweat_prefetch_senders: Vec, + /// Path where to create RocksDB checkpoints during database migrations or /// `false` to disable that feature. /// @@ -156,6 +163,14 @@ impl Default for StoreConfig { trie_cache_capacities: vec![(ShardUId { version: 1, shard_id: 3 }, 45_000_000)], enable_receipt_prefetching: false, + sweat_prefetch_receivers: vec![ + "token.sweat".to_owned(), + "vfinal.token.sweat.testnet".to_owned(), + ], + sweat_prefetch_senders: vec![ + "oracle.sweat".to_owned(), + "sweat_the_oracle.testnet".to_owned(), + ], migration_snapshot: Default::default(), } diff --git a/core/store/src/metrics.rs b/core/store/src/metrics.rs index e8d9f5ba1db..68d3fb0e71e 100644 --- a/core/store/src/metrics.rs +++ b/core/store/src/metrics.rs @@ -142,3 +142,66 @@ pub static REVERTED_TRIE_INSERTIONS: Lazy = Lazy::new(|| { ) .unwrap() }); +pub static PREFETCH_SENT: Lazy = Lazy::new(|| { + try_create_int_counter_vec("near_prefetch_sent", "Prefetch requests sent to DB", &["shard_id"]) + .unwrap() +}); +pub static PREFETCH_HITS: Lazy = Lazy::new(|| { + try_create_int_counter_vec("near_prefetch_hits", "Prefetched trie keys", &["shard_id"]).unwrap() +}); +pub static PREFETCH_PENDING: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_pending", + "Prefetched trie keys that were still pending when main thread needed data", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_FAIL: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_fail", + "Prefetching trie key failed with an error", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_NOT_REQUESTED: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_not_requested", + "Number of values that had to be fetched without having been prefetched", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_MEMORY_LIMIT_REACHED: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_memory_limit_reached", + "Number of values that could not be prefetched due to prefetch staging area size limitations", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_RETRY: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_retries", + "Main thread was waiting for prefetched value but had to retry fetch afterwards.", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_STAGED_BYTES: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_prefetch_staged_bytes", + "Upper bound on memory usage for holding prefetched data.", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_STAGED_SLOTS: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_prefetch_staged_slots", + "Number of slots used in staging area.", + &["shard_id"], + ) + .unwrap() +}); diff --git a/core/store/src/trie/config.rs b/core/store/src/trie/config.rs index 31a1699de6b..18ff5f1adec 100644 --- a/core/store/src/trie/config.rs +++ b/core/store/src/trie/config.rs @@ -1,5 +1,9 @@ +use crate::StoreConfig; use near_primitives::shard_layout::ShardUId; +use near_primitives::types::AccountId; use std::collections::HashMap; +use std::str::FromStr; +use tracing::error; /// Default number of cache entries. /// It was chosen to fit into RAM well. RAM spend on trie cache should not exceed 50_000 * 4 (number of shards) * @@ -28,6 +32,11 @@ pub struct TrieConfig { pub shard_cache_config: ShardCacheConfig, pub view_shard_cache_config: ShardCacheConfig, pub enable_receipt_prefetching: bool, + + /// Configured accounts will be prefetched as SWEAT token account, if predecessor is listed as sender. + pub sweat_prefetch_receivers: Vec, + /// List of allowed predecessor accounts for SWEAT prefetching. + pub sweat_prefetch_senders: Vec, } pub struct ShardCacheConfig { @@ -46,6 +55,27 @@ pub struct ShardCacheConfig { } impl TrieConfig { + pub fn from_config(config: &StoreConfig) -> Self { + let mut this = Self::default(); + this.shard_cache_config + .override_max_entries + .extend(config.trie_cache_capacities.iter().cloned()); + this.enable_receipt_prefetching = config.enable_receipt_prefetching; + for account in &config.sweat_prefetch_receivers { + match AccountId::from_str(account) { + Ok(account_id) => this.sweat_prefetch_receivers.push(account_id), + Err(e) => error!(target: "config", "invalid account id {account}: {e}"), + } + } + for account in &config.sweat_prefetch_senders { + match AccountId::from_str(account) { + Ok(account_id) => this.sweat_prefetch_senders.push(account_id), + Err(e) => error!(target: "config", "invalid account id {account}: {e}"), + } + } + this + } + /// Shard cache capacity in number of trie nodes. pub fn shard_cache_capacity(&self, shard_uid: ShardUId, is_view: bool) -> u64 { if is_view { &self.view_shard_cache_config } else { &self.shard_cache_config } diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index 7ca417a823d..b00aff22bee 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -1,9 +1,14 @@ use crate::trie::POISONED_LOCK_ERR; -use crate::{DBCol, StorageError, Store, Trie, TrieCache, TrieCachingStorage, TrieStorage}; +use crate::{ + metrics, DBCol, StorageError, Store, Trie, TrieCache, TrieCachingStorage, TrieConfig, + TrieStorage, +}; +use near_o11y::metrics::prometheus; +use near_o11y::metrics::prometheus::core::GenericGauge; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; use near_primitives::trie_key::TrieKey; -use near_primitives::types::{StateRoot, TrieNodesCount}; +use near_primitives::types::{AccountId, ShardId, StateRoot, TrieNodesCount}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -63,6 +68,14 @@ pub struct PrefetchApi { /// to mark what is already being fetched, to avoid fetching the same data /// multiple times. pub(crate) prefetching: PrefetchStagingArea, + + pub enable_receipt_prefetching: bool, + /// Configured accounts will be prefetched as SWEAT token account, if predecessor is listed as receiver. + pub sweat_prefetch_receivers: Vec, + /// List of allowed predecessor accounts for SWEAT prefetching. + pub sweat_prefetch_senders: Vec, + + pub shard_uid: ShardUId, } /// Staging area for in-flight prefetch requests and a buffer for prefetched data. @@ -78,10 +91,9 @@ pub struct PrefetchApi { /// This design also ensures the shard cache works exactly the same with or /// without the prefetcher, because the order in which it sees accesses is /// independent of the prefetcher. -#[derive(Default, Clone)] +#[derive(Clone)] pub(crate) struct PrefetchStagingArea(Arc>); -#[derive(Default)] struct InnerPrefetchStagingArea { slots: SizeTrackedHashMap, } @@ -94,6 +106,22 @@ pub(crate) enum PrefetcherResult { MemoryLimitReached, } +struct StagedMetrics { + prefetch_staged_bytes: GenericGauge, + prefetch_staged_items: GenericGauge, +} + +impl StagedMetrics { + fn new(shard_id: ShardId) -> Self { + Self { + prefetch_staged_bytes: metrics::PREFETCH_STAGED_BYTES + .with_label_values(&[&shard_id.to_string()]), + prefetch_staged_items: metrics::PREFETCH_STAGED_SLOTS + .with_label_values(&[&shard_id.to_string()]), + } + } +} + /// Type used internally in the staging area to keep track of requests. #[derive(Clone, Debug)] enum PrefetchSlot { @@ -102,10 +130,10 @@ enum PrefetchSlot { Done(Arc<[u8]>), } -#[derive(Default)] struct SizeTrackedHashMap { map: HashMap, size_bytes: usize, + metrics: StagedMetrics, } impl SizeTrackedHashMap { @@ -115,6 +143,7 @@ impl SizeTrackedHashMap { if let Some(dropped) = &dropped { self.size_bytes -= Self::reserved_memory(dropped); } + self.update_metrics(); dropped } @@ -123,12 +152,19 @@ impl SizeTrackedHashMap { if let Some(dropped) = &dropped { self.size_bytes -= Self::reserved_memory(dropped); } + self.update_metrics(); dropped } fn clear(&mut self) { self.map.clear(); self.size_bytes = 0; + self.update_metrics(); + } + + fn update_metrics(&self) { + self.metrics.prefetch_staged_bytes.set(self.size_bytes as i64); + self.metrics.prefetch_staged_items.set(self.map.len() as i64); } /// Reserved memory capacity for a value from the prefetching area. @@ -249,6 +285,18 @@ impl TriePrefetchingStorage { } impl PrefetchStagingArea { + fn new(shard_id: ShardId) -> Self { + let inner = InnerPrefetchStagingArea { + slots: SizeTrackedHashMap { + map: Default::default(), + size_bytes: 0, + metrics: StagedMetrics::new(shard_id), + }, + }; + inner.slots.update_metrics(); + Self(Arc::new(Mutex::new(inner))) + } + /// Release a slot in the prefetcher staging area. /// /// This must only be called after inserting the value to the shard cache. @@ -329,10 +377,26 @@ impl PrefetchStagingArea { } impl PrefetchApi { - pub fn new(store: Store, shard_cache: TrieCache, shard_uid: ShardUId) -> Self { + pub fn new( + store: Store, + shard_cache: TrieCache, + shard_uid: ShardUId, + trie_config: &TrieConfig, + ) -> Self { let (work_queue_tx, work_queue_rx) = crossbeam::channel::bounded(MAX_QUEUED_WORK_ITEMS); - - let this = Self { work_queue_tx, work_queue_rx, prefetching: Default::default() }; + let sweat_prefetch_receivers = trie_config.sweat_prefetch_receivers.clone(); + let sweat_prefetch_senders = trie_config.sweat_prefetch_senders.clone(); + let enable_receipt_prefetching = trie_config.enable_receipt_prefetching; + + let this = Self { + work_queue_tx, + work_queue_rx, + prefetching: PrefetchStagingArea::new(shard_uid.shard_id()), + enable_receipt_prefetching, + sweat_prefetch_receivers, + sweat_prefetch_senders, + shard_uid, + }; for _ in 0..NUM_IO_THREADS { this.start_io_thread(store.clone(), shard_cache.clone(), shard_uid.clone()); } @@ -357,6 +421,10 @@ impl PrefetchApi { let prefetcher_storage = TriePrefetchingStorage::new(store, shard_uid, shard_cache, self.prefetching.clone()); let work_queue = self.work_queue_rx.clone(); + let metric_prefetch_sent = + metrics::PREFETCH_SENT.with_label_values(&[&shard_uid.shard_id.to_string()]); + let metric_prefetch_fail = + metrics::PREFETCH_FAIL.with_label_values(&[&shard_uid.shard_id.to_string()]); std::thread::spawn(move || { while let Ok((trie_root, trie_key)) = work_queue.recv() { // Since the trie root can change,and since the root is not known at the time when the IO threads starts, @@ -365,12 +433,14 @@ impl PrefetchApi { let prefetcher_trie = Trie::new(Box::new(prefetcher_storage.clone()), trie_root, None); let storage_key = trie_key.to_vec(); + metric_prefetch_sent.inc(); if let Ok(_maybe_value) = prefetcher_trie.get(&storage_key) { near_o11y::io_trace!(count: "prefetch"); } else { // This may happen in rare occasions and can be ignored safely. // See comments in `TriePrefetchingStorage::retrieve_raw_bytes`. near_o11y::io_trace!(count: "prefetch_failure"); + metric_prefetch_fail.inc(); } } }) diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 4f8d2fbd2d7..6c6d43ff5b9 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -113,14 +113,15 @@ impl ShardTries { .or_insert_with(|| TrieCache::new(&self.0.trie_config, shard_uid, is_view)) .clone() }; - // Do not enable prefetching on view caches. // 1) Performance of view calls is not crucial. // 2) A lot of the prefetcher code assumes there is only one "main-thread" per shard active. // If you want to enable it for view calls, at least make sure they don't share // the `PrefetchApi` instances with the normal calls. - let prefetch_enabled = !is_view && self.0.trie_config.enable_receipt_prefetching; - + let prefetch_enabled = !is_view + && (self.0.trie_config.enable_receipt_prefetching + || (!self.0.trie_config.sweat_prefetch_receivers.is_empty() + && !self.0.trie_config.sweat_prefetch_senders.is_empty())); let prefetch_api = prefetch_enabled.then(|| { self.0 .prefetchers @@ -128,7 +129,12 @@ impl ShardTries { .expect(POISONED_LOCK_ERR) .entry(shard_uid) .or_insert_with(|| { - PrefetchApi::new(self.0.store.clone(), cache.clone(), shard_uid.clone()) + PrefetchApi::new( + self.0.store.clone(), + cache.clone(), + shard_uid.clone(), + &self.0.trie_config, + ) }) .clone() }); diff --git a/core/store/src/trie/trie_storage.rs b/core/store/src/trie/trie_storage.rs index 0f2edafca18..7d56fafa2d6 100644 --- a/core/store/src/trie/trie_storage.rs +++ b/core/store/src/trie/trie_storage.rs @@ -376,6 +376,11 @@ struct TrieCacheInnerMetrics { shard_cache_size: GenericGauge, chunk_cache_size: GenericGauge, shard_cache_current_total_size: GenericGauge, + prefetch_hits: GenericCounter, + prefetch_pending: GenericCounter, + prefetch_not_requested: GenericCounter, + prefetch_memory_limit_reached: GenericCounter, + prefetch_retry: GenericCounter, } impl TrieCachingStorage { @@ -399,6 +404,13 @@ impl TrieCachingStorage { chunk_cache_size: metrics::CHUNK_CACHE_SIZE.with_label_values(&metrics_labels), shard_cache_current_total_size: metrics::SHARD_CACHE_CURRENT_TOTAL_SIZE .with_label_values(&metrics_labels), + prefetch_hits: metrics::PREFETCH_HITS.with_label_values(&metrics_labels[..1]), + prefetch_pending: metrics::PREFETCH_PENDING.with_label_values(&metrics_labels[..1]), + prefetch_not_requested: metrics::PREFETCH_NOT_REQUESTED + .with_label_values(&metrics_labels[..1]), + prefetch_memory_limit_reached: metrics::PREFETCH_MEMORY_LIMIT_REACHED + .with_label_values(&metrics_labels[..1]), + prefetch_retry: metrics::PREFETCH_RETRY.with_label_values(&metrics_labels[..1]), }; TrieCachingStorage { store, @@ -480,23 +492,30 @@ impl TrieStorage for TrieCachingStorage { std::mem::drop(guard); val = match prefetch_state { - // Slot reserved for us, the main thread, or no space left. + // Slot reserved for us, the main thread. // `SlotReserved` for the main thread means, either we have not submitted a prefetch request for // this value, or maybe it is just still queued up. Either way, prefetching is not going to help // so the main thread should fetch data from DB on its own. + PrefetcherResult::SlotReserved => { + self.metrics.prefetch_not_requested.inc(); + self.read_from_db(hash)? + } // `MemoryLimitReached` is not really relevant for the main thread, // we always have to go to DB even if we could not stage a new prefetch. // It only means we were not able to mark it as already being fetched, which in turn could lead to // a prefetcher trying to fetch the same value before we can put it in the shard cache. - PrefetcherResult::SlotReserved | PrefetcherResult::MemoryLimitReached => { + PrefetcherResult::MemoryLimitReached => { + self.metrics.prefetch_memory_limit_reached.inc(); self.read_from_db(hash)? } PrefetcherResult::Prefetched(value) => { near_o11y::io_trace!(count: "prefetch_hit"); + self.metrics.prefetch_hits.inc(); value } PrefetcherResult::Pending => { near_o11y::io_trace!(count: "prefetch_pending"); + self.metrics.prefetch_pending.inc(); std::thread::yield_now(); // If data is already being prefetched, wait for that instead of sending a new request. match prefetcher.prefetching.blocking_get(hash.clone()) { @@ -506,7 +525,10 @@ impl TrieStorage for TrieCachingStorage { // was a storage error. Or in the case of forks and parallel chunk // processing where one chunk cleans up prefetched data from the other. // In any case, we can try again from the main thread. - None => self.read_from_db(hash)?, + None => { + self.metrics.prefetch_retry.inc(); + self.read_from_db(hash)? + } } } }; diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index d775070615f..046379f4f11 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -97,12 +97,7 @@ pub struct NightshadeRuntime { impl NightshadeRuntime { pub fn from_config(home_dir: &Path, store: Store, config: &NearConfig) -> Self { - let mut trie_config = TrieConfig::default(); - trie_config - .shard_cache_config - .override_max_entries - .extend(config.config.store.trie_cache_capacities.iter().cloned()); - trie_config.enable_receipt_prefetching = config.config.store.enable_receipt_prefetching; + let trie_config = TrieConfig::from_config(&config.config.store); Self::new( home_dir, diff --git a/runtime/runtime/Cargo.toml b/runtime/runtime/Cargo.toml index 2f1525efa39..f82b7b70554 100644 --- a/runtime/runtime/Cargo.toml +++ b/runtime/runtime/Cargo.toml @@ -21,6 +21,7 @@ num-traits = "0.2.11" hex = "0.4.2" rayon = "1.5" thiserror = "1.0" +sha2 = "0.10" borsh = "0.9" diff --git a/runtime/runtime/src/metrics.rs b/runtime/runtime/src/metrics.rs index 854e558b47b..8b2e90b8e4b 100644 --- a/runtime/runtime/src/metrics.rs +++ b/runtime/runtime/src/metrics.rs @@ -33,6 +33,22 @@ pub static TRANSACTION_PROCESSED_FAILED_TOTAL: Lazy = Lazy::new(|| { ) .unwrap() }); +pub static PREFETCH_ENQUEUED: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_enqueued", + "Prefetch requests queued up", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_QUEUE_FULL: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_queue_full", + "Prefetch requests failed to queue up", + &["shard_id"], + ) + .unwrap() +}); pub static FUNCTION_CALL_PROCESSED: Lazy = Lazy::new(|| { try_create_int_counter_vec( "near_function_call_processed", diff --git a/runtime/runtime/src/prefetch.rs b/runtime/runtime/src/prefetch.rs index 76c981b6a09..3d684c5efad 100644 --- a/runtime/runtime/src/prefetch.rs +++ b/runtime/runtime/src/prefetch.rs @@ -41,17 +41,25 @@ //! in the prefetcher. Implementation details for most limits are in //! `core/store/src/trie/prefetching_trie_storage.rs` +use near_o11y::metrics::prometheus; +use near_o11y::metrics::prometheus::core::GenericCounter; use near_primitives::receipt::{Receipt, ReceiptEnum}; -use near_primitives::transaction::SignedTransaction; +use near_primitives::transaction::{Action, SignedTransaction}; use near_primitives::trie_key::TrieKey; +use near_primitives::types::AccountId; use near_primitives::types::StateRoot; use near_store::{PrefetchApi, Trie}; +use sha2::Digest; use std::rc::Rc; use tracing::debug; + +use crate::metrics; /// Transaction runtime view of the prefetching subsystem. pub(crate) struct TriePrefetcher { prefetch_api: PrefetchApi, trie_root: StateRoot, + prefetch_enqueued: GenericCounter, + prefetch_queue_full: GenericCounter, } impl TriePrefetcher { @@ -59,7 +67,16 @@ impl TriePrefetcher { if let Some(caching_storage) = trie.storage.as_caching_storage() { if let Some(prefetch_api) = caching_storage.prefetch_api().clone() { let trie_root = *trie.get_root(); - return Some(Self { prefetch_api, trie_root }); + let shard_uid = prefetch_api.shard_uid; + let metrics_labels: [&str; 1] = [&shard_uid.shard_id.to_string()]; + return Some(Self { + prefetch_api, + trie_root, + prefetch_enqueued: metrics::PREFETCH_ENQUEUED + .with_label_values(&metrics_labels), + prefetch_queue_full: metrics::PREFETCH_QUEUE_FULL + .with_label_values(&metrics_labels), + }); } } None @@ -70,10 +87,30 @@ impl TriePrefetcher { /// Returns an error if the prefetching queue is full. pub(crate) fn input_receipts(&mut self, receipts: &[Receipt]) -> Result<(), ()> { for receipt in receipts.iter() { - if let ReceiptEnum::Action(_) = &receipt.receipt { + if let ReceiptEnum::Action(action_receipt) = &receipt.receipt { let account_id = receipt.receiver_id.clone(); - let trie_key = TrieKey::Account { account_id }; - self.prefetch_trie_key(trie_key)?; + + // general-purpose account prefetching + if self.prefetch_api.enable_receipt_prefetching { + let trie_key = TrieKey::Account { account_id: account_id.clone() }; + self.prefetch_trie_key(trie_key)?; + } + + // SWEAT specific argument prefetcher + if self.prefetch_api.sweat_prefetch_receivers.contains(&account_id) + && self.prefetch_api.sweat_prefetch_senders.contains(&receipt.predecessor_id) + { + for action in &action_receipt.actions { + if let Action::FunctionCall(fn_call) = action { + if fn_call.method_name == "record_batch" { + self.prefetch_sweat_record_batch( + account_id.clone(), + &fn_call.args, + )?; + } + } + } + } } } Ok(()) @@ -86,16 +123,18 @@ impl TriePrefetcher { &mut self, transactions: &[SignedTransaction], ) -> Result<(), ()> { - for t in transactions { - let account_id = t.transaction.signer_id.clone(); - let trie_key = TrieKey::Account { account_id }; - self.prefetch_trie_key(trie_key)?; + if self.prefetch_api.enable_receipt_prefetching { + for t in transactions { + let account_id = t.transaction.signer_id.clone(); + let trie_key = TrieKey::Account { account_id }; + self.prefetch_trie_key(trie_key)?; - let trie_key = TrieKey::AccessKey { - account_id: t.transaction.signer_id.clone(), - public_key: t.transaction.public_key.clone(), - }; - self.prefetch_trie_key(trie_key)?; + let trie_key = TrieKey::AccessKey { + account_id: t.transaction.signer_id.clone(), + public_key: t.transaction.public_key.clone(), + }; + self.prefetch_trie_key(trie_key)?; + } } Ok(()) } @@ -121,12 +160,46 @@ impl TriePrefetcher { fn prefetch_trie_key(&self, trie_key: TrieKey) -> Result<(), ()> { let queue_full = self.prefetch_api.prefetch_trie_key(self.trie_root, trie_key).is_err(); if queue_full { + self.prefetch_queue_full.inc(); debug!(target: "prefetcher", "I/O scheduler input queue full, dropping prefetch request"); Err(()) } else { + self.prefetch_enqueued.inc(); Ok(()) } } + + /// Prefetcher specifically tuned for SWEAT record batch + /// + /// Temporary hack, consider removing after merging flat storage. + /// https://github.com/near/nearcore/issues/7327 + fn prefetch_sweat_record_batch(&self, account_id: AccountId, arg: &[u8]) -> Result<(), ()> { + if let Ok(json) = serde_json::de::from_slice::(arg) { + if json.is_object() { + if let Some(list) = json.get("steps_batch") { + if let Some(list) = list.as_array() { + for tuple in list.iter() { + if let Some(tuple) = tuple.as_array() { + if let Some(user_account) = tuple.first().and_then(|a| a.as_str()) { + let hashed_account = + sha2::Sha256::digest(user_account.as_bytes()).into_iter(); + let mut key = vec![0x74, 0x00]; + key.extend(hashed_account); + let trie_key = TrieKey::ContractData { + account_id: account_id.clone(), + key: key.to_vec(), + }; + near_o11y::io_trace!(count: "prefetch"); + self.prefetch_trie_key(trie_key)?; + } + } + } + } + } + } + } + Ok(()) + } } #[cfg(test)]