Skip to content

Commit

Permalink
feat: argument based prefetcher (#7620)
Browse files Browse the repository at this point in the history
Prefetcher that loads IO requests in parallel if the keys are listed in the  smart contract function call argument.

This is a experimental feature that has proven to stabilize mainnet nodes under current traffic. Especially when the node is not running with fast local SSDs.

In the future we should generalize this implementation.
  • Loading branch information
jakmeier authored and nikurt committed Nov 9, 2022
1 parent 93ce19f commit 86aeccc
Show file tree
Hide file tree
Showing 11 changed files with 327 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions core/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ pub struct StoreConfig {
/// Enable fetching account and access key data ahead of time to avoid IO latency.
pub enable_receipt_prefetching: bool,

/// Configured accounts will be prefetched as SWEAT token account, if predecessor is listed as receiver.
/// This config option is temporary and will be removed once flat storage is implemented.
pub sweat_prefetch_receivers: Vec<String>,
/// List of allowed predecessor accounts for SWEAT prefetching.
/// This config option is temporary and will be removed once flat storage is implemented.
pub sweat_prefetch_senders: Vec<String>,

/// Path where to create RocksDB checkpoints during database migrations or
/// `false` to disable that feature.
///
Expand Down Expand Up @@ -156,6 +163,14 @@ impl Default for StoreConfig {

trie_cache_capacities: vec![(ShardUId { version: 1, shard_id: 3 }, 45_000_000)],
enable_receipt_prefetching: false,
sweat_prefetch_receivers: vec![
"token.sweat".to_owned(),
"vfinal.token.sweat.testnet".to_owned(),
],
sweat_prefetch_senders: vec![
"oracle.sweat".to_owned(),
"sweat_the_oracle.testnet".to_owned(),
],

migration_snapshot: Default::default(),
}
Expand Down
63 changes: 63 additions & 0 deletions core/store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,66 @@ pub static REVERTED_TRIE_INSERTIONS: Lazy<IntCounterVec> = Lazy::new(|| {
)
.unwrap()
});
pub static PREFETCH_SENT: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec("near_prefetch_sent", "Prefetch requests sent to DB", &["shard_id"])
.unwrap()
});
pub static PREFETCH_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec("near_prefetch_hits", "Prefetched trie keys", &["shard_id"]).unwrap()
});
pub static PREFETCH_PENDING: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_prefetch_pending",
"Prefetched trie keys that were still pending when main thread needed data",
&["shard_id"],
)
.unwrap()
});
pub static PREFETCH_FAIL: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_prefetch_fail",
"Prefetching trie key failed with an error",
&["shard_id"],
)
.unwrap()
});
pub static PREFETCH_NOT_REQUESTED: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_prefetch_not_requested",
"Number of values that had to be fetched without having been prefetched",
&["shard_id"],
)
.unwrap()
});
pub static PREFETCH_MEMORY_LIMIT_REACHED: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_prefetch_memory_limit_reached",
"Number of values that could not be prefetched due to prefetch staging area size limitations",
&["shard_id"],
)
.unwrap()
});
pub static PREFETCH_RETRY: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_prefetch_retries",
"Main thread was waiting for prefetched value but had to retry fetch afterwards.",
&["shard_id"],
)
.unwrap()
});
pub static PREFETCH_STAGED_BYTES: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_prefetch_staged_bytes",
"Upper bound on memory usage for holding prefetched data.",
&["shard_id"],
)
.unwrap()
});
pub static PREFETCH_STAGED_SLOTS: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_prefetch_staged_slots",
"Number of slots used in staging area.",
&["shard_id"],
)
.unwrap()
});
30 changes: 30 additions & 0 deletions core/store/src/trie/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use crate::StoreConfig;
use near_primitives::shard_layout::ShardUId;
use near_primitives::types::AccountId;
use std::collections::HashMap;
use std::str::FromStr;
use tracing::error;

/// Default number of cache entries.
/// It was chosen to fit into RAM well. RAM spend on trie cache should not exceed 50_000 * 4 (number of shards) *
Expand Down Expand Up @@ -28,6 +32,11 @@ pub struct TrieConfig {
pub shard_cache_config: ShardCacheConfig,
pub view_shard_cache_config: ShardCacheConfig,
pub enable_receipt_prefetching: bool,

/// Configured accounts will be prefetched as SWEAT token account, if predecessor is listed as sender.
pub sweat_prefetch_receivers: Vec<AccountId>,
/// List of allowed predecessor accounts for SWEAT prefetching.
pub sweat_prefetch_senders: Vec<AccountId>,
}

pub struct ShardCacheConfig {
Expand All @@ -46,6 +55,27 @@ pub struct ShardCacheConfig {
}

impl TrieConfig {
pub fn from_config(config: &StoreConfig) -> Self {
let mut this = Self::default();
this.shard_cache_config
.override_max_entries
.extend(config.trie_cache_capacities.iter().cloned());
this.enable_receipt_prefetching = config.enable_receipt_prefetching;
for account in &config.sweat_prefetch_receivers {
match AccountId::from_str(account) {
Ok(account_id) => this.sweat_prefetch_receivers.push(account_id),
Err(e) => error!(target: "config", "invalid account id {account}: {e}"),
}
}
for account in &config.sweat_prefetch_senders {
match AccountId::from_str(account) {
Ok(account_id) => this.sweat_prefetch_senders.push(account_id),
Err(e) => error!(target: "config", "invalid account id {account}: {e}"),
}
}
this
}

/// Shard cache capacity in number of trie nodes.
pub fn shard_cache_capacity(&self, shard_uid: ShardUId, is_view: bool) -> u64 {
if is_view { &self.view_shard_cache_config } else { &self.shard_cache_config }
Expand Down
86 changes: 78 additions & 8 deletions core/store/src/trie/prefetching_trie_storage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use crate::trie::POISONED_LOCK_ERR;
use crate::{DBCol, StorageError, Store, Trie, TrieCache, TrieCachingStorage, TrieStorage};
use crate::{
metrics, DBCol, StorageError, Store, Trie, TrieCache, TrieCachingStorage, TrieConfig,
TrieStorage,
};
use near_o11y::metrics::prometheus;
use near_o11y::metrics::prometheus::core::GenericGauge;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::{StateRoot, TrieNodesCount};
use near_primitives::types::{AccountId, ShardId, StateRoot, TrieNodesCount};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -63,6 +68,14 @@ pub struct PrefetchApi {
/// to mark what is already being fetched, to avoid fetching the same data
/// multiple times.
pub(crate) prefetching: PrefetchStagingArea,

pub enable_receipt_prefetching: bool,
/// Configured accounts will be prefetched as SWEAT token account, if predecessor is listed as receiver.
pub sweat_prefetch_receivers: Vec<AccountId>,
/// List of allowed predecessor accounts for SWEAT prefetching.
pub sweat_prefetch_senders: Vec<AccountId>,

pub shard_uid: ShardUId,
}

/// Staging area for in-flight prefetch requests and a buffer for prefetched data.
Expand All @@ -78,10 +91,9 @@ pub struct PrefetchApi {
/// This design also ensures the shard cache works exactly the same with or
/// without the prefetcher, because the order in which it sees accesses is
/// independent of the prefetcher.
#[derive(Default, Clone)]
#[derive(Clone)]
pub(crate) struct PrefetchStagingArea(Arc<Mutex<InnerPrefetchStagingArea>>);

#[derive(Default)]
struct InnerPrefetchStagingArea {
slots: SizeTrackedHashMap,
}
Expand All @@ -94,6 +106,22 @@ pub(crate) enum PrefetcherResult {
MemoryLimitReached,
}

struct StagedMetrics {
prefetch_staged_bytes: GenericGauge<prometheus::core::AtomicI64>,
prefetch_staged_items: GenericGauge<prometheus::core::AtomicI64>,
}

impl StagedMetrics {
fn new(shard_id: ShardId) -> Self {
Self {
prefetch_staged_bytes: metrics::PREFETCH_STAGED_BYTES
.with_label_values(&[&shard_id.to_string()]),
prefetch_staged_items: metrics::PREFETCH_STAGED_SLOTS
.with_label_values(&[&shard_id.to_string()]),
}
}
}

/// Type used internally in the staging area to keep track of requests.
#[derive(Clone, Debug)]
enum PrefetchSlot {
Expand All @@ -102,10 +130,10 @@ enum PrefetchSlot {
Done(Arc<[u8]>),
}

#[derive(Default)]
struct SizeTrackedHashMap {
map: HashMap<CryptoHash, PrefetchSlot>,
size_bytes: usize,
metrics: StagedMetrics,
}

impl SizeTrackedHashMap {
Expand All @@ -115,6 +143,7 @@ impl SizeTrackedHashMap {
if let Some(dropped) = &dropped {
self.size_bytes -= Self::reserved_memory(dropped);
}
self.update_metrics();
dropped
}

Expand All @@ -123,12 +152,19 @@ impl SizeTrackedHashMap {
if let Some(dropped) = &dropped {
self.size_bytes -= Self::reserved_memory(dropped);
}
self.update_metrics();
dropped
}

fn clear(&mut self) {
self.map.clear();
self.size_bytes = 0;
self.update_metrics();
}

fn update_metrics(&self) {
self.metrics.prefetch_staged_bytes.set(self.size_bytes as i64);
self.metrics.prefetch_staged_items.set(self.map.len() as i64);
}

/// Reserved memory capacity for a value from the prefetching area.
Expand Down Expand Up @@ -249,6 +285,18 @@ impl TriePrefetchingStorage {
}

impl PrefetchStagingArea {
fn new(shard_id: ShardId) -> Self {
let inner = InnerPrefetchStagingArea {
slots: SizeTrackedHashMap {
map: Default::default(),
size_bytes: 0,
metrics: StagedMetrics::new(shard_id),
},
};
inner.slots.update_metrics();
Self(Arc::new(Mutex::new(inner)))
}

/// Release a slot in the prefetcher staging area.
///
/// This must only be called after inserting the value to the shard cache.
Expand Down Expand Up @@ -329,10 +377,26 @@ impl PrefetchStagingArea {
}

impl PrefetchApi {
pub fn new(store: Store, shard_cache: TrieCache, shard_uid: ShardUId) -> Self {
pub fn new(
store: Store,
shard_cache: TrieCache,
shard_uid: ShardUId,
trie_config: &TrieConfig,
) -> Self {
let (work_queue_tx, work_queue_rx) = crossbeam::channel::bounded(MAX_QUEUED_WORK_ITEMS);

let this = Self { work_queue_tx, work_queue_rx, prefetching: Default::default() };
let sweat_prefetch_receivers = trie_config.sweat_prefetch_receivers.clone();
let sweat_prefetch_senders = trie_config.sweat_prefetch_senders.clone();
let enable_receipt_prefetching = trie_config.enable_receipt_prefetching;

let this = Self {
work_queue_tx,
work_queue_rx,
prefetching: PrefetchStagingArea::new(shard_uid.shard_id()),
enable_receipt_prefetching,
sweat_prefetch_receivers,
sweat_prefetch_senders,
shard_uid,
};
for _ in 0..NUM_IO_THREADS {
this.start_io_thread(store.clone(), shard_cache.clone(), shard_uid.clone());
}
Expand All @@ -357,6 +421,10 @@ impl PrefetchApi {
let prefetcher_storage =
TriePrefetchingStorage::new(store, shard_uid, shard_cache, self.prefetching.clone());
let work_queue = self.work_queue_rx.clone();
let metric_prefetch_sent =
metrics::PREFETCH_SENT.with_label_values(&[&shard_uid.shard_id.to_string()]);
let metric_prefetch_fail =
metrics::PREFETCH_FAIL.with_label_values(&[&shard_uid.shard_id.to_string()]);
std::thread::spawn(move || {
while let Ok((trie_root, trie_key)) = work_queue.recv() {
// Since the trie root can change,and since the root is not known at the time when the IO threads starts,
Expand All @@ -365,12 +433,14 @@ impl PrefetchApi {
let prefetcher_trie =
Trie::new(Box::new(prefetcher_storage.clone()), trie_root, None);
let storage_key = trie_key.to_vec();
metric_prefetch_sent.inc();
if let Ok(_maybe_value) = prefetcher_trie.get(&storage_key) {
near_o11y::io_trace!(count: "prefetch");
} else {
// This may happen in rare occasions and can be ignored safely.
// See comments in `TriePrefetchingStorage::retrieve_raw_bytes`.
near_o11y::io_trace!(count: "prefetch_failure");
metric_prefetch_fail.inc();
}
}
})
Expand Down
14 changes: 10 additions & 4 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,22 +113,28 @@ impl ShardTries {
.or_insert_with(|| TrieCache::new(&self.0.trie_config, shard_uid, is_view))
.clone()
};

// Do not enable prefetching on view caches.
// 1) Performance of view calls is not crucial.
// 2) A lot of the prefetcher code assumes there is only one "main-thread" per shard active.
// If you want to enable it for view calls, at least make sure they don't share
// the `PrefetchApi` instances with the normal calls.
let prefetch_enabled = !is_view && self.0.trie_config.enable_receipt_prefetching;

let prefetch_enabled = !is_view
&& (self.0.trie_config.enable_receipt_prefetching
|| (!self.0.trie_config.sweat_prefetch_receivers.is_empty()
&& !self.0.trie_config.sweat_prefetch_senders.is_empty()));
let prefetch_api = prefetch_enabled.then(|| {
self.0
.prefetchers
.write()
.expect(POISONED_LOCK_ERR)
.entry(shard_uid)
.or_insert_with(|| {
PrefetchApi::new(self.0.store.clone(), cache.clone(), shard_uid.clone())
PrefetchApi::new(
self.0.store.clone(),
cache.clone(),
shard_uid.clone(),
&self.0.trie_config,
)
})
.clone()
});
Expand Down
Loading

0 comments on commit 86aeccc

Please sign in to comment.