Skip to content

Commit

Permalink
Fix bank executor stats and remove copy-on-write semantics (#25621)
Browse files Browse the repository at this point in the history
* Fix bank executor stats and remove copy-on-write semantics

* Remove clone implementation for CachedExecutors

* feedback
  • Loading branch information
jstarry authored Jun 4, 2022
1 parent 9851774 commit 61ad8fc
Showing 1 changed file with 160 additions and 76 deletions.
236 changes: 160 additions & 76 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,18 +346,31 @@ struct CachedExecutorsEntry {
executor: Arc<dyn Executor>,
hit_count: AtomicU64,
}

impl Clone for CachedExecutorsEntry {
fn clone(&self) -> Self {
Self {
prev_epoch_count: self.prev_epoch_count,
epoch_count: AtomicU64::new(self.epoch_count.load(Relaxed)),
executor: self.executor.clone(),
hit_count: AtomicU64::new(self.hit_count.load(Relaxed)),
}
}
}

/// LFU Cache of executors with single-epoch memory of usage counts
#[derive(Debug)]
struct CachedExecutors {
max: usize,
capacity: usize,
current_epoch: Epoch,
pub(self) executors: HashMap<Pubkey, CachedExecutorsEntry>,
stats: executor_cache::Stats,
}

impl Default for CachedExecutors {
fn default() -> Self {
Self {
max: MAX_CACHED_EXECUTORS,
capacity: MAX_CACHED_EXECUTORS,
current_epoch: Epoch::default(),
executors: HashMap::default(),
stats: executor_cache::Stats::default(),
Expand All @@ -375,55 +388,42 @@ impl AbiExample for CachedExecutors {
}
}

impl Clone for CachedExecutors {
fn clone(&self) -> Self {
let executors = self.executors.iter().map(|(&key, entry)| {
let entry = CachedExecutorsEntry {
prev_epoch_count: entry.prev_epoch_count,
epoch_count: AtomicU64::new(entry.epoch_count.load(Relaxed)),
executor: entry.executor.clone(),
hit_count: AtomicU64::new(entry.hit_count.load(Relaxed)),
};
(key, entry)
});
impl CachedExecutors {
fn new(max_capacity: usize, current_epoch: Epoch) -> Self {
Self {
max: self.max,
current_epoch: self.current_epoch,
executors: executors.collect(),
capacity: max_capacity,
current_epoch,
executors: HashMap::new(),
stats: executor_cache::Stats::default(),
}
}
}

impl CachedExecutors {
fn clone_with_epoch(self: &Arc<Self>, epoch: Epoch) -> Arc<Self> {
if self.current_epoch == epoch {
return self.clone();
}
let executors = self.executors.iter().map(|(&key, entry)| {
// The total_count = prev_epoch_count + epoch_count will be used for LFU eviction.
// If the epoch has changed, we store the prev_epoch_count and reset the epoch_count to 0.
let entry = CachedExecutorsEntry {
prev_epoch_count: entry.epoch_count.load(Relaxed),
epoch_count: AtomicU64::default(),
executor: entry.executor.clone(),
hit_count: AtomicU64::new(entry.hit_count.load(Relaxed)),
};
(key, entry)
});
Arc::new(Self {
max: self.max,
current_epoch: epoch,
executors: executors.collect(),
stats: executor_cache::Stats::default(),
})
}
fn new_from_parent_bank_executors(
parent_bank_executors: &CachedExecutors,
current_epoch: Epoch,
) -> Self {
let executors = if parent_bank_executors.current_epoch == current_epoch {
parent_bank_executors.executors.clone()
} else {
parent_bank_executors
.executors
.iter()
.map(|(&key, entry)| {
let entry = CachedExecutorsEntry {
prev_epoch_count: entry.epoch_count.load(Relaxed),
epoch_count: AtomicU64::default(),
executor: entry.executor.clone(),
hit_count: AtomicU64::new(entry.hit_count.load(Relaxed)),
};
(key, entry)
})
.collect()
};

fn new(max: usize, current_epoch: Epoch) -> Self {
Self {
max,
capacity: parent_bank_executors.capacity,
current_epoch,
executors: HashMap::new(),
executors,
stats: executor_cache::Stats::default(),
}
}
Expand Down Expand Up @@ -469,7 +469,7 @@ impl CachedExecutors {

let primer_counts = Self::get_primer_counts(counts.as_slice(), new_executors.len());

if self.executors.len() >= self.max {
if self.executors.len() >= self.capacity {
let mut least_keys = counts
.iter()
.take(new_executors.len())
Expand Down Expand Up @@ -1330,9 +1330,7 @@ pub struct Bank {
pub rewards_pool_pubkeys: Arc<HashSet<Pubkey>>,

/// Cached executors
// Inner Arc is meant to implement copy-on-write semantics as opposed to
// sharing mutations (hence RwLock<Arc<...>> instead of Arc<RwLock<...>>).
cached_executors: RwLock<Arc<CachedExecutors>>,
cached_executors: RwLock<CachedExecutors>,

transaction_debug_keys: Option<Arc<HashSet<Pubkey>>>,

Expand Down Expand Up @@ -1517,7 +1515,7 @@ impl Bank {
cluster_type: Option::<ClusterType>::default(),
lazy_rent_collection: AtomicBool::default(),
rewards_pool_pubkeys: Arc::<HashSet<Pubkey>>::default(),
cached_executors: RwLock::<Arc<CachedExecutors>>::default(),
cached_executors: RwLock::<CachedExecutors>::default(),
transaction_debug_keys: Option::<Arc<HashSet<Pubkey>>>::default(),
transaction_log_collector_config: Arc::<RwLock<TransactionLogCollectorConfig>>::default(
),
Expand Down Expand Up @@ -1777,8 +1775,11 @@ impl Bank {

let (cached_executors, cached_executors_time) = Measure::this(
|_| {
let cached_executors = parent.cached_executors.read().unwrap();
RwLock::new(cached_executors.clone_with_epoch(epoch))
let parent_bank_executors = parent.cached_executors.read().unwrap();
RwLock::new(CachedExecutors::new_from_parent_bank_executors(
&parent_bank_executors,
epoch,
))
},
(),
"cached_executors_creation",
Expand Down Expand Up @@ -2205,10 +2206,7 @@ impl Bank {
cluster_type: Some(genesis_config.cluster_type),
lazy_rent_collection: new(),
rewards_pool_pubkeys: new(),
cached_executors: RwLock::new(Arc::new(CachedExecutors::new(
MAX_CACHED_EXECUTORS,
fields.epoch,
))),
cached_executors: RwLock::new(CachedExecutors::new(MAX_CACHED_EXECUTORS, fields.epoch)),
transaction_debug_keys: debug_keys,
transaction_log_collector_config: new(),
transaction_log_collector: new(),
Expand Down Expand Up @@ -4193,15 +4191,17 @@ impl Bank {
return Rc::new(RefCell::new(Executors::default()));
}

let cache = self.cached_executors.read().unwrap();
let executors = executable_keys
.into_iter()
.filter_map(|key| {
cache
.get(key)
.map(|executor| (*key, TransactionExecutor::new_cached(executor)))
})
.collect();
let executors = {
let cache = self.cached_executors.read().unwrap();
executable_keys
.into_iter()
.filter_map(|key| {
cache
.get(key)
.map(|executor| (*key, TransactionExecutor::new_cached(executor)))
})
.collect()
};

Rc::new(RefCell::new(executors))
}
Expand Down Expand Up @@ -4229,21 +4229,17 @@ impl Bank {
.collect();

if !dirty_executors.is_empty() {
let mut cache = self.cached_executors.write().unwrap();
let cache = Arc::make_mut(&mut cache);
cache.put(&dirty_executors);
self.cached_executors.write().unwrap().put(&dirty_executors);
}
}

/// Remove an executor from the bank's cache
fn remove_executor(&self, pubkey: &Pubkey) {
let mut cache = self.cached_executors.write().unwrap();
let _ = Arc::make_mut(&mut cache).remove(pubkey);
let _ = self.cached_executors.write().unwrap().remove(pubkey);
}

pub fn clear_executors(&self) {
let mut cache = self.cached_executors.write().unwrap();
Arc::make_mut(&mut cache).clear();
self.cached_executors.write().unwrap().clear();
}

/// Execute a transaction using the provided loaded accounts and update
Expand Down Expand Up @@ -14664,13 +14660,13 @@ pub(crate) mod tests {
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());

let mut cache = Arc::new(cache).clone_with_epoch(1);
let mut cache = CachedExecutors::new_from_parent_bank_executors(&cache, 1);
assert!(cache.current_epoch == 1);

assert!(cache.get(&key2).is_some());
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key3).is_some());
Arc::make_mut(&mut cache).put(&[(&key4, executor.clone())]);
cache.put(&[(&key4, executor.clone())]);

assert!(cache.get(&key4).is_some());
let num_retained = [&key1, &key2, &key3]
Expand All @@ -14679,8 +14675,8 @@ pub(crate) mod tests {
.count();
assert_eq!(num_retained, 2);

Arc::make_mut(&mut cache).put(&[(&key1, executor.clone())]);
Arc::make_mut(&mut cache).put(&[(&key3, executor.clone())]);
cache.put(&[(&key1, executor.clone())]);
cache.put(&[(&key3, executor.clone())]);
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key3).is_some());
let num_retained = [&key2, &key4]
Expand All @@ -14689,10 +14685,10 @@ pub(crate) mod tests {
.count();
assert_eq!(num_retained, 1);

cache = cache.clone_with_epoch(2);
cache = CachedExecutors::new_from_parent_bank_executors(&cache, 2);
assert!(cache.current_epoch == 2);

Arc::make_mut(&mut cache).put(&[(&key3, executor.clone())]);
cache.put(&[(&key3, executor.clone())]);
assert!(cache.get(&key3).is_some());
}

Expand Down Expand Up @@ -14758,6 +14754,94 @@ pub(crate) mod tests {
assert_eq!(cache.stats.one_hit_wonders.load(Relaxed), 1);
}

#[test]
fn test_cached_executors_stats() {
#[derive(Debug, Default, PartialEq)]
struct ComparableStats {
hits: u64,
misses: u64,
evictions: HashMap<Pubkey, u64>,
insertions: u64,
replacements: u64,
one_hit_wonders: u64,
}
impl From<&executor_cache::Stats> for ComparableStats {
fn from(stats: &executor_cache::Stats) -> Self {
let executor_cache::Stats {
hits,
misses,
evictions,
insertions,
replacements,
one_hit_wonders,
} = stats;
ComparableStats {
hits: hits.load(Relaxed),
misses: misses.load(Relaxed),
evictions: evictions.clone(),
insertions: insertions.load(Relaxed),
replacements: replacements.load(Relaxed),
one_hit_wonders: one_hit_wonders.load(Relaxed),
}
}
}

const CURRENT_EPOCH: Epoch = 0;
let mut cache = CachedExecutors::new(2, CURRENT_EPOCH);
let mut expected_stats = ComparableStats::default();

let program_id1 = Pubkey::new_unique();
let program_id2 = Pubkey::new_unique();
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});

// make sure we're starting from where we think we are
assert_eq!(ComparableStats::from(&cache.stats), expected_stats,);

// insert some executors
cache.put(&[(&program_id1, executor.clone())]);
cache.put(&[(&program_id2, executor.clone())]);
expected_stats.insertions += 2;
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);

// replace a one-hit-wonder executor
cache.put(&[(&program_id1, executor.clone())]);
expected_stats.replacements += 1;
expected_stats.one_hit_wonders += 1;
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);

// hit some executors
cache.get(&program_id1);
cache.get(&program_id1);
cache.get(&program_id2);
expected_stats.hits += 3;
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);

// miss an executor
cache.get(&Pubkey::new_unique());
expected_stats.misses += 1;
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);

// evict an executor
cache.put(&[(&Pubkey::new_unique(), executor.clone())]);
expected_stats.insertions += 1;
expected_stats.evictions.insert(program_id2, 1);
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);

// make sure stats are cleared in new_from_parent
assert_eq!(
ComparableStats::from(
&CachedExecutors::new_from_parent_bank_executors(&cache, CURRENT_EPOCH).stats
),
ComparableStats::default()
);
assert_eq!(
ComparableStats::from(
&CachedExecutors::new_from_parent_bank_executors(&cache, CURRENT_EPOCH + 1).stats
),
ComparableStats::default()
);
}

#[test]
fn test_bank_executor_cache() {
solana_logger::setup();
Expand Down

0 comments on commit 61ad8fc

Please sign in to comment.