diff --git a/Cargo.lock b/Cargo.lock index f147ed283c7..ef8ab5d88a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3735,6 +3735,7 @@ dependencies = [ "rayon", "serde", "serde_json", + "sha2 0.10.2", "tempfile", "testlib", "thiserror", diff --git a/core/store/src/config.rs b/core/store/src/config.rs index d083fe45a45..6c810c9c4a3 100644 --- a/core/store/src/config.rs +++ b/core/store/src/config.rs @@ -46,6 +46,13 @@ pub struct StoreConfig { /// Enable fetching account and access key data ahead of time to avoid IO latency. pub enable_receipt_prefetching: bool, + /// Configured accounts will be prefetched as SWEAT token account, if predecessor is listed as receiver. + /// This config option is temporary and will be removed once flat storage is implemented. + pub sweat_prefetch_receivers: Vec, + /// List of allowed predecessor accounts for SWEAT prefetching. + /// This config option is temporary and will be removed once flat storage is implemented. + pub sweat_prefetch_senders: Vec, + /// Path where to create RocksDB checkpoints during database migrations or /// `false` to disable that feature. /// @@ -156,6 +163,14 @@ impl Default for StoreConfig { trie_cache_capacities: vec![(ShardUId { version: 1, shard_id: 3 }, 45_000_000)], enable_receipt_prefetching: false, + sweat_prefetch_receivers: vec![ + "token.sweat".to_owned(), + "vfinal.token.sweat.testnet".to_owned(), + ], + sweat_prefetch_senders: vec![ + "oracle.sweat".to_owned(), + "sweat_the_oracle.testnet".to_owned(), + ], migration_snapshot: Default::default(), } diff --git a/core/store/src/metrics.rs b/core/store/src/metrics.rs index e8d9f5ba1db..68d3fb0e71e 100644 --- a/core/store/src/metrics.rs +++ b/core/store/src/metrics.rs @@ -142,3 +142,66 @@ pub static REVERTED_TRIE_INSERTIONS: Lazy = Lazy::new(|| { ) .unwrap() }); +pub static PREFETCH_SENT: Lazy = Lazy::new(|| { + try_create_int_counter_vec("near_prefetch_sent", "Prefetch requests sent to DB", &["shard_id"]) + .unwrap() +}); +pub static PREFETCH_HITS: Lazy = Lazy::new(|| { + try_create_int_counter_vec("near_prefetch_hits", "Prefetched trie keys", &["shard_id"]).unwrap() +}); +pub static PREFETCH_PENDING: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_pending", + "Prefetched trie keys that were still pending when main thread needed data", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_FAIL: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_fail", + "Prefetching trie key failed with an error", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_NOT_REQUESTED: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_not_requested", + "Number of values that had to be fetched without having been prefetched", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_MEMORY_LIMIT_REACHED: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_memory_limit_reached", + "Number of values that could not be prefetched due to prefetch staging area size limitations", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_RETRY: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_retries", + "Main thread was waiting for prefetched value but had to retry fetch afterwards.", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_STAGED_BYTES: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_prefetch_staged_bytes", + "Upper bound on memory usage for holding prefetched data.", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_STAGED_SLOTS: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_prefetch_staged_slots", + "Number of slots used in staging area.", + &["shard_id"], + ) + .unwrap() +}); diff --git a/core/store/src/trie/config.rs b/core/store/src/trie/config.rs index 31a1699de6b..18ff5f1adec 100644 --- a/core/store/src/trie/config.rs +++ b/core/store/src/trie/config.rs @@ -1,5 +1,9 @@ +use crate::StoreConfig; use near_primitives::shard_layout::ShardUId; +use near_primitives::types::AccountId; use std::collections::HashMap; +use std::str::FromStr; +use tracing::error; /// Default number of cache entries. /// It was chosen to fit into RAM well. RAM spend on trie cache should not exceed 50_000 * 4 (number of shards) * @@ -28,6 +32,11 @@ pub struct TrieConfig { pub shard_cache_config: ShardCacheConfig, pub view_shard_cache_config: ShardCacheConfig, pub enable_receipt_prefetching: bool, + + /// Configured accounts will be prefetched as SWEAT token account, if predecessor is listed as sender. + pub sweat_prefetch_receivers: Vec, + /// List of allowed predecessor accounts for SWEAT prefetching. + pub sweat_prefetch_senders: Vec, } pub struct ShardCacheConfig { @@ -46,6 +55,27 @@ pub struct ShardCacheConfig { } impl TrieConfig { + pub fn from_config(config: &StoreConfig) -> Self { + let mut this = Self::default(); + this.shard_cache_config + .override_max_entries + .extend(config.trie_cache_capacities.iter().cloned()); + this.enable_receipt_prefetching = config.enable_receipt_prefetching; + for account in &config.sweat_prefetch_receivers { + match AccountId::from_str(account) { + Ok(account_id) => this.sweat_prefetch_receivers.push(account_id), + Err(e) => error!(target: "config", "invalid account id {account}: {e}"), + } + } + for account in &config.sweat_prefetch_senders { + match AccountId::from_str(account) { + Ok(account_id) => this.sweat_prefetch_senders.push(account_id), + Err(e) => error!(target: "config", "invalid account id {account}: {e}"), + } + } + this + } + /// Shard cache capacity in number of trie nodes. pub fn shard_cache_capacity(&self, shard_uid: ShardUId, is_view: bool) -> u64 { if is_view { &self.view_shard_cache_config } else { &self.shard_cache_config } diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index 7ca417a823d..b00aff22bee 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -1,9 +1,14 @@ use crate::trie::POISONED_LOCK_ERR; -use crate::{DBCol, StorageError, Store, Trie, TrieCache, TrieCachingStorage, TrieStorage}; +use crate::{ + metrics, DBCol, StorageError, Store, Trie, TrieCache, TrieCachingStorage, TrieConfig, + TrieStorage, +}; +use near_o11y::metrics::prometheus; +use near_o11y::metrics::prometheus::core::GenericGauge; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; use near_primitives::trie_key::TrieKey; -use near_primitives::types::{StateRoot, TrieNodesCount}; +use near_primitives::types::{AccountId, ShardId, StateRoot, TrieNodesCount}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -63,6 +68,14 @@ pub struct PrefetchApi { /// to mark what is already being fetched, to avoid fetching the same data /// multiple times. pub(crate) prefetching: PrefetchStagingArea, + + pub enable_receipt_prefetching: bool, + /// Configured accounts will be prefetched as SWEAT token account, if predecessor is listed as receiver. + pub sweat_prefetch_receivers: Vec, + /// List of allowed predecessor accounts for SWEAT prefetching. + pub sweat_prefetch_senders: Vec, + + pub shard_uid: ShardUId, } /// Staging area for in-flight prefetch requests and a buffer for prefetched data. @@ -78,10 +91,9 @@ pub struct PrefetchApi { /// This design also ensures the shard cache works exactly the same with or /// without the prefetcher, because the order in which it sees accesses is /// independent of the prefetcher. -#[derive(Default, Clone)] +#[derive(Clone)] pub(crate) struct PrefetchStagingArea(Arc>); -#[derive(Default)] struct InnerPrefetchStagingArea { slots: SizeTrackedHashMap, } @@ -94,6 +106,22 @@ pub(crate) enum PrefetcherResult { MemoryLimitReached, } +struct StagedMetrics { + prefetch_staged_bytes: GenericGauge, + prefetch_staged_items: GenericGauge, +} + +impl StagedMetrics { + fn new(shard_id: ShardId) -> Self { + Self { + prefetch_staged_bytes: metrics::PREFETCH_STAGED_BYTES + .with_label_values(&[&shard_id.to_string()]), + prefetch_staged_items: metrics::PREFETCH_STAGED_SLOTS + .with_label_values(&[&shard_id.to_string()]), + } + } +} + /// Type used internally in the staging area to keep track of requests. #[derive(Clone, Debug)] enum PrefetchSlot { @@ -102,10 +130,10 @@ enum PrefetchSlot { Done(Arc<[u8]>), } -#[derive(Default)] struct SizeTrackedHashMap { map: HashMap, size_bytes: usize, + metrics: StagedMetrics, } impl SizeTrackedHashMap { @@ -115,6 +143,7 @@ impl SizeTrackedHashMap { if let Some(dropped) = &dropped { self.size_bytes -= Self::reserved_memory(dropped); } + self.update_metrics(); dropped } @@ -123,12 +152,19 @@ impl SizeTrackedHashMap { if let Some(dropped) = &dropped { self.size_bytes -= Self::reserved_memory(dropped); } + self.update_metrics(); dropped } fn clear(&mut self) { self.map.clear(); self.size_bytes = 0; + self.update_metrics(); + } + + fn update_metrics(&self) { + self.metrics.prefetch_staged_bytes.set(self.size_bytes as i64); + self.metrics.prefetch_staged_items.set(self.map.len() as i64); } /// Reserved memory capacity for a value from the prefetching area. @@ -249,6 +285,18 @@ impl TriePrefetchingStorage { } impl PrefetchStagingArea { + fn new(shard_id: ShardId) -> Self { + let inner = InnerPrefetchStagingArea { + slots: SizeTrackedHashMap { + map: Default::default(), + size_bytes: 0, + metrics: StagedMetrics::new(shard_id), + }, + }; + inner.slots.update_metrics(); + Self(Arc::new(Mutex::new(inner))) + } + /// Release a slot in the prefetcher staging area. /// /// This must only be called after inserting the value to the shard cache. @@ -329,10 +377,26 @@ impl PrefetchStagingArea { } impl PrefetchApi { - pub fn new(store: Store, shard_cache: TrieCache, shard_uid: ShardUId) -> Self { + pub fn new( + store: Store, + shard_cache: TrieCache, + shard_uid: ShardUId, + trie_config: &TrieConfig, + ) -> Self { let (work_queue_tx, work_queue_rx) = crossbeam::channel::bounded(MAX_QUEUED_WORK_ITEMS); - - let this = Self { work_queue_tx, work_queue_rx, prefetching: Default::default() }; + let sweat_prefetch_receivers = trie_config.sweat_prefetch_receivers.clone(); + let sweat_prefetch_senders = trie_config.sweat_prefetch_senders.clone(); + let enable_receipt_prefetching = trie_config.enable_receipt_prefetching; + + let this = Self { + work_queue_tx, + work_queue_rx, + prefetching: PrefetchStagingArea::new(shard_uid.shard_id()), + enable_receipt_prefetching, + sweat_prefetch_receivers, + sweat_prefetch_senders, + shard_uid, + }; for _ in 0..NUM_IO_THREADS { this.start_io_thread(store.clone(), shard_cache.clone(), shard_uid.clone()); } @@ -357,6 +421,10 @@ impl PrefetchApi { let prefetcher_storage = TriePrefetchingStorage::new(store, shard_uid, shard_cache, self.prefetching.clone()); let work_queue = self.work_queue_rx.clone(); + let metric_prefetch_sent = + metrics::PREFETCH_SENT.with_label_values(&[&shard_uid.shard_id.to_string()]); + let metric_prefetch_fail = + metrics::PREFETCH_FAIL.with_label_values(&[&shard_uid.shard_id.to_string()]); std::thread::spawn(move || { while let Ok((trie_root, trie_key)) = work_queue.recv() { // Since the trie root can change,and since the root is not known at the time when the IO threads starts, @@ -365,12 +433,14 @@ impl PrefetchApi { let prefetcher_trie = Trie::new(Box::new(prefetcher_storage.clone()), trie_root, None); let storage_key = trie_key.to_vec(); + metric_prefetch_sent.inc(); if let Ok(_maybe_value) = prefetcher_trie.get(&storage_key) { near_o11y::io_trace!(count: "prefetch"); } else { // This may happen in rare occasions and can be ignored safely. // See comments in `TriePrefetchingStorage::retrieve_raw_bytes`. near_o11y::io_trace!(count: "prefetch_failure"); + metric_prefetch_fail.inc(); } } }) diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 4f8d2fbd2d7..6c6d43ff5b9 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -113,14 +113,15 @@ impl ShardTries { .or_insert_with(|| TrieCache::new(&self.0.trie_config, shard_uid, is_view)) .clone() }; - // Do not enable prefetching on view caches. // 1) Performance of view calls is not crucial. // 2) A lot of the prefetcher code assumes there is only one "main-thread" per shard active. // If you want to enable it for view calls, at least make sure they don't share // the `PrefetchApi` instances with the normal calls. - let prefetch_enabled = !is_view && self.0.trie_config.enable_receipt_prefetching; - + let prefetch_enabled = !is_view + && (self.0.trie_config.enable_receipt_prefetching + || (!self.0.trie_config.sweat_prefetch_receivers.is_empty() + && !self.0.trie_config.sweat_prefetch_senders.is_empty())); let prefetch_api = prefetch_enabled.then(|| { self.0 .prefetchers @@ -128,7 +129,12 @@ impl ShardTries { .expect(POISONED_LOCK_ERR) .entry(shard_uid) .or_insert_with(|| { - PrefetchApi::new(self.0.store.clone(), cache.clone(), shard_uid.clone()) + PrefetchApi::new( + self.0.store.clone(), + cache.clone(), + shard_uid.clone(), + &self.0.trie_config, + ) }) .clone() }); diff --git a/core/store/src/trie/trie_storage.rs b/core/store/src/trie/trie_storage.rs index 0f2edafca18..7d56fafa2d6 100644 --- a/core/store/src/trie/trie_storage.rs +++ b/core/store/src/trie/trie_storage.rs @@ -376,6 +376,11 @@ struct TrieCacheInnerMetrics { shard_cache_size: GenericGauge, chunk_cache_size: GenericGauge, shard_cache_current_total_size: GenericGauge, + prefetch_hits: GenericCounter, + prefetch_pending: GenericCounter, + prefetch_not_requested: GenericCounter, + prefetch_memory_limit_reached: GenericCounter, + prefetch_retry: GenericCounter, } impl TrieCachingStorage { @@ -399,6 +404,13 @@ impl TrieCachingStorage { chunk_cache_size: metrics::CHUNK_CACHE_SIZE.with_label_values(&metrics_labels), shard_cache_current_total_size: metrics::SHARD_CACHE_CURRENT_TOTAL_SIZE .with_label_values(&metrics_labels), + prefetch_hits: metrics::PREFETCH_HITS.with_label_values(&metrics_labels[..1]), + prefetch_pending: metrics::PREFETCH_PENDING.with_label_values(&metrics_labels[..1]), + prefetch_not_requested: metrics::PREFETCH_NOT_REQUESTED + .with_label_values(&metrics_labels[..1]), + prefetch_memory_limit_reached: metrics::PREFETCH_MEMORY_LIMIT_REACHED + .with_label_values(&metrics_labels[..1]), + prefetch_retry: metrics::PREFETCH_RETRY.with_label_values(&metrics_labels[..1]), }; TrieCachingStorage { store, @@ -480,23 +492,30 @@ impl TrieStorage for TrieCachingStorage { std::mem::drop(guard); val = match prefetch_state { - // Slot reserved for us, the main thread, or no space left. + // Slot reserved for us, the main thread. // `SlotReserved` for the main thread means, either we have not submitted a prefetch request for // this value, or maybe it is just still queued up. Either way, prefetching is not going to help // so the main thread should fetch data from DB on its own. + PrefetcherResult::SlotReserved => { + self.metrics.prefetch_not_requested.inc(); + self.read_from_db(hash)? + } // `MemoryLimitReached` is not really relevant for the main thread, // we always have to go to DB even if we could not stage a new prefetch. // It only means we were not able to mark it as already being fetched, which in turn could lead to // a prefetcher trying to fetch the same value before we can put it in the shard cache. - PrefetcherResult::SlotReserved | PrefetcherResult::MemoryLimitReached => { + PrefetcherResult::MemoryLimitReached => { + self.metrics.prefetch_memory_limit_reached.inc(); self.read_from_db(hash)? } PrefetcherResult::Prefetched(value) => { near_o11y::io_trace!(count: "prefetch_hit"); + self.metrics.prefetch_hits.inc(); value } PrefetcherResult::Pending => { near_o11y::io_trace!(count: "prefetch_pending"); + self.metrics.prefetch_pending.inc(); std::thread::yield_now(); // If data is already being prefetched, wait for that instead of sending a new request. match prefetcher.prefetching.blocking_get(hash.clone()) { @@ -506,7 +525,10 @@ impl TrieStorage for TrieCachingStorage { // was a storage error. Or in the case of forks and parallel chunk // processing where one chunk cleans up prefetched data from the other. // In any case, we can try again from the main thread. - None => self.read_from_db(hash)?, + None => { + self.metrics.prefetch_retry.inc(); + self.read_from_db(hash)? + } } } }; diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index d775070615f..046379f4f11 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -97,12 +97,7 @@ pub struct NightshadeRuntime { impl NightshadeRuntime { pub fn from_config(home_dir: &Path, store: Store, config: &NearConfig) -> Self { - let mut trie_config = TrieConfig::default(); - trie_config - .shard_cache_config - .override_max_entries - .extend(config.config.store.trie_cache_capacities.iter().cloned()); - trie_config.enable_receipt_prefetching = config.config.store.enable_receipt_prefetching; + let trie_config = TrieConfig::from_config(&config.config.store); Self::new( home_dir, diff --git a/runtime/runtime/Cargo.toml b/runtime/runtime/Cargo.toml index 2f1525efa39..f82b7b70554 100644 --- a/runtime/runtime/Cargo.toml +++ b/runtime/runtime/Cargo.toml @@ -21,6 +21,7 @@ num-traits = "0.2.11" hex = "0.4.2" rayon = "1.5" thiserror = "1.0" +sha2 = "0.10" borsh = "0.9" diff --git a/runtime/runtime/src/metrics.rs b/runtime/runtime/src/metrics.rs index 854e558b47b..8b2e90b8e4b 100644 --- a/runtime/runtime/src/metrics.rs +++ b/runtime/runtime/src/metrics.rs @@ -33,6 +33,22 @@ pub static TRANSACTION_PROCESSED_FAILED_TOTAL: Lazy = Lazy::new(|| { ) .unwrap() }); +pub static PREFETCH_ENQUEUED: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_enqueued", + "Prefetch requests queued up", + &["shard_id"], + ) + .unwrap() +}); +pub static PREFETCH_QUEUE_FULL: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_prefetch_queue_full", + "Prefetch requests failed to queue up", + &["shard_id"], + ) + .unwrap() +}); pub static FUNCTION_CALL_PROCESSED: Lazy = Lazy::new(|| { try_create_int_counter_vec( "near_function_call_processed", diff --git a/runtime/runtime/src/prefetch.rs b/runtime/runtime/src/prefetch.rs index 76c981b6a09..3d684c5efad 100644 --- a/runtime/runtime/src/prefetch.rs +++ b/runtime/runtime/src/prefetch.rs @@ -41,17 +41,25 @@ //! in the prefetcher. Implementation details for most limits are in //! `core/store/src/trie/prefetching_trie_storage.rs` +use near_o11y::metrics::prometheus; +use near_o11y::metrics::prometheus::core::GenericCounter; use near_primitives::receipt::{Receipt, ReceiptEnum}; -use near_primitives::transaction::SignedTransaction; +use near_primitives::transaction::{Action, SignedTransaction}; use near_primitives::trie_key::TrieKey; +use near_primitives::types::AccountId; use near_primitives::types::StateRoot; use near_store::{PrefetchApi, Trie}; +use sha2::Digest; use std::rc::Rc; use tracing::debug; + +use crate::metrics; /// Transaction runtime view of the prefetching subsystem. pub(crate) struct TriePrefetcher { prefetch_api: PrefetchApi, trie_root: StateRoot, + prefetch_enqueued: GenericCounter, + prefetch_queue_full: GenericCounter, } impl TriePrefetcher { @@ -59,7 +67,16 @@ impl TriePrefetcher { if let Some(caching_storage) = trie.storage.as_caching_storage() { if let Some(prefetch_api) = caching_storage.prefetch_api().clone() { let trie_root = *trie.get_root(); - return Some(Self { prefetch_api, trie_root }); + let shard_uid = prefetch_api.shard_uid; + let metrics_labels: [&str; 1] = [&shard_uid.shard_id.to_string()]; + return Some(Self { + prefetch_api, + trie_root, + prefetch_enqueued: metrics::PREFETCH_ENQUEUED + .with_label_values(&metrics_labels), + prefetch_queue_full: metrics::PREFETCH_QUEUE_FULL + .with_label_values(&metrics_labels), + }); } } None @@ -70,10 +87,30 @@ impl TriePrefetcher { /// Returns an error if the prefetching queue is full. pub(crate) fn input_receipts(&mut self, receipts: &[Receipt]) -> Result<(), ()> { for receipt in receipts.iter() { - if let ReceiptEnum::Action(_) = &receipt.receipt { + if let ReceiptEnum::Action(action_receipt) = &receipt.receipt { let account_id = receipt.receiver_id.clone(); - let trie_key = TrieKey::Account { account_id }; - self.prefetch_trie_key(trie_key)?; + + // general-purpose account prefetching + if self.prefetch_api.enable_receipt_prefetching { + let trie_key = TrieKey::Account { account_id: account_id.clone() }; + self.prefetch_trie_key(trie_key)?; + } + + // SWEAT specific argument prefetcher + if self.prefetch_api.sweat_prefetch_receivers.contains(&account_id) + && self.prefetch_api.sweat_prefetch_senders.contains(&receipt.predecessor_id) + { + for action in &action_receipt.actions { + if let Action::FunctionCall(fn_call) = action { + if fn_call.method_name == "record_batch" { + self.prefetch_sweat_record_batch( + account_id.clone(), + &fn_call.args, + )?; + } + } + } + } } } Ok(()) @@ -86,16 +123,18 @@ impl TriePrefetcher { &mut self, transactions: &[SignedTransaction], ) -> Result<(), ()> { - for t in transactions { - let account_id = t.transaction.signer_id.clone(); - let trie_key = TrieKey::Account { account_id }; - self.prefetch_trie_key(trie_key)?; + if self.prefetch_api.enable_receipt_prefetching { + for t in transactions { + let account_id = t.transaction.signer_id.clone(); + let trie_key = TrieKey::Account { account_id }; + self.prefetch_trie_key(trie_key)?; - let trie_key = TrieKey::AccessKey { - account_id: t.transaction.signer_id.clone(), - public_key: t.transaction.public_key.clone(), - }; - self.prefetch_trie_key(trie_key)?; + let trie_key = TrieKey::AccessKey { + account_id: t.transaction.signer_id.clone(), + public_key: t.transaction.public_key.clone(), + }; + self.prefetch_trie_key(trie_key)?; + } } Ok(()) } @@ -121,12 +160,46 @@ impl TriePrefetcher { fn prefetch_trie_key(&self, trie_key: TrieKey) -> Result<(), ()> { let queue_full = self.prefetch_api.prefetch_trie_key(self.trie_root, trie_key).is_err(); if queue_full { + self.prefetch_queue_full.inc(); debug!(target: "prefetcher", "I/O scheduler input queue full, dropping prefetch request"); Err(()) } else { + self.prefetch_enqueued.inc(); Ok(()) } } + + /// Prefetcher specifically tuned for SWEAT record batch + /// + /// Temporary hack, consider removing after merging flat storage. + /// https://github.com/near/nearcore/issues/7327 + fn prefetch_sweat_record_batch(&self, account_id: AccountId, arg: &[u8]) -> Result<(), ()> { + if let Ok(json) = serde_json::de::from_slice::(arg) { + if json.is_object() { + if let Some(list) = json.get("steps_batch") { + if let Some(list) = list.as_array() { + for tuple in list.iter() { + if let Some(tuple) = tuple.as_array() { + if let Some(user_account) = tuple.first().and_then(|a| a.as_str()) { + let hashed_account = + sha2::Sha256::digest(user_account.as_bytes()).into_iter(); + let mut key = vec![0x74, 0x00]; + key.extend(hashed_account); + let trie_key = TrieKey::ContractData { + account_id: account_id.clone(), + key: key.to_vec(), + }; + near_o11y::io_trace!(count: "prefetch"); + self.prefetch_trie_key(trie_key)?; + } + } + } + } + } + } + } + Ok(()) + } } #[cfg(test)]