Skip to content

Commit

Permalink
Refactor - Delay visibility of program un-/re-/deployment (solana-lab…
Browse files Browse the repository at this point in the history
…s#29654)

* Use three separate HashMaps instead of the enum TxBankExecutorCacheDiff.

* Replaces all places which deploy programs by a macro.

* Adds a feature gate.

* Adjust tests.

* Makes undeployment visible immediately.
  • Loading branch information
Lichtso authored and nickfrosty committed Mar 12, 2023
1 parent eb6d3d5 commit 758d955
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 228 deletions.
155 changes: 72 additions & 83 deletions program-runtime/src/executor_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,87 +9,77 @@ use {
ops::Div,
sync::{
atomic::{AtomicU64, Ordering::Relaxed},
Arc, RwLock,
Arc,
},
},
};

/// Relation between a TransactionExecutorCacheEntry and its matching BankExecutorCacheEntry
#[repr(u8)]
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum TxBankExecutorCacheDiff {
/// The TransactionExecutorCacheEntry did not change and is the same as the BankExecutorCacheEntry.
None,
/// The TransactionExecutorCacheEntry was inserted, no matching BankExecutorCacheEntry exists, so it needs to be inserted.
Inserted,
/// The TransactionExecutorCacheEntry was replaced, the matching BankExecutorCacheEntry needs to be updated.
Updated,
}

/// An entry of the TransactionExecutorCache
#[derive(Debug)]
pub struct TransactionExecutorCacheEntry {
executor: Arc<dyn Executor>,
difference: TxBankExecutorCacheDiff,
}

/// A subset of the BankExecutorCache containing only the executors relevant to one transaction
///
/// The BankExecutorCache can not be updated directly as transaction batches are
/// processed in parallel, which would cause a race condition.
#[derive(Default, Debug)]
pub struct TransactionExecutorCache {
pub executors: HashMap<Pubkey, TransactionExecutorCacheEntry>,
/// Executors or tombstones which are visible during the transaction
pub visible: HashMap<Pubkey, Option<Arc<dyn Executor>>>,
/// Executors of programs which were re-/deploymed during the transaction
pub deployments: HashMap<Pubkey, Arc<dyn Executor>>,
/// Executors which were missing in the cache and not re-/deploymed during the transaction
pub add_to_cache: HashMap<Pubkey, Arc<dyn Executor>>,
}

impl TransactionExecutorCache {
pub fn new(executable_keys: impl Iterator<Item = (Pubkey, Arc<dyn Executor>)>) -> Self {
Self {
executors: executable_keys
.map(|(key, executor)| {
let entry = TransactionExecutorCacheEntry {
executor,
difference: TxBankExecutorCacheDiff::None,
};
(key, entry)
})
visible: executable_keys
.map(|(id, executor)| (id, Some(executor)))
.collect(),
deployments: HashMap::new(),
add_to_cache: HashMap::new(),
}
}

pub fn get(&self, key: &Pubkey) -> Option<Arc<dyn Executor>> {
self.executors.get(key).map(|entry| entry.executor.clone())
pub fn get(&self, key: &Pubkey) -> Option<Option<Arc<dyn Executor>>> {
self.visible.get(key).cloned()
}

pub fn set(&mut self, key: Pubkey, executor: Arc<dyn Executor>, replacement: bool) {
let difference = if replacement {
TxBankExecutorCacheDiff::Updated
} else {
TxBankExecutorCacheDiff::Inserted
};
let entry = TransactionExecutorCacheEntry {
executor,
difference,
};
let _was_replaced = self.executors.insert(key, entry).is_some();
pub fn set_tombstone(&mut self, key: Pubkey) {
self.visible.insert(key, None);
}

pub fn update_global_cache(
&self,
global_cache: &RwLock<BankExecutorCache>,
selector: impl Fn(TxBankExecutorCacheDiff) -> bool,
pub fn set(
&mut self,
key: Pubkey,
executor: Arc<dyn Executor>,
upgrade: bool,
delay_visibility_of_program_deployment: bool,
) {
let executors_delta: Vec<_> = self
.executors
.iter()
.filter_map(|(key, entry)| {
selector(entry.difference).then(|| (key, entry.executor.clone()))
})
.collect();
if !executors_delta.is_empty() {
global_cache.write().unwrap().put(&executors_delta);
if upgrade {
if delay_visibility_of_program_deployment {
// Place a tombstone in the cache so that
// we don't load the new version from the database as it should remain invisible
self.set_tombstone(key);
} else {
self.visible.insert(key, Some(executor.clone()));
}
self.deployments.insert(key, executor);
} else {
self.visible.insert(key, Some(executor.clone()));
self.add_to_cache.insert(key, executor);
}
}

pub fn get_executors_added_to_the_cache(&mut self) -> HashMap<Pubkey, Arc<dyn Executor>> {
let mut executors = HashMap::new();
std::mem::swap(&mut executors, &mut self.add_to_cache);
executors
}

pub fn get_executors_which_were_deployed(&mut self) -> HashMap<Pubkey, Arc<dyn Executor>> {
let mut executors = HashMap::new();
std::mem::swap(&mut executors, &mut self.deployments);
executors
}
}

/// Capacity of `BankExecutorCache`
Expand Down Expand Up @@ -197,18 +187,17 @@ impl BankExecutorCache {
}
}

fn put(&mut self, executors: &[(&Pubkey, Arc<dyn Executor>)]) {
pub fn put(&mut self, executors: impl Iterator<Item = (Pubkey, Arc<dyn Executor>)>) {
let mut new_executors: Vec<_> = executors
.iter()
.filter_map(|(key, executor)| {
if let Some(mut entry) = self.remove(key) {
if let Some(mut entry) = self.remove(&key) {
self.stats.replacements.fetch_add(1, Relaxed);
entry.executor = executor.clone();
let _ = self.executors.insert(**key, entry);
let _ = self.executors.insert(key, entry);
None
} else {
self.stats.insertions.fetch_add(1, Relaxed);
Some((*key, executor))
Some((key, executor))
}
})
.collect();
Expand Down Expand Up @@ -251,7 +240,7 @@ impl BankExecutorCache {
executor: executor.clone(),
hit_count: AtomicU64::new(1),
};
let _ = self.executors.insert(*key, entry);
let _ = self.executors.insert(key, entry);
}
}
}
Expand Down Expand Up @@ -388,17 +377,17 @@ mod tests {
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});
let mut cache = BankExecutorCache::new(3, 0);

cache.put(&[(&key1, executor.clone())]);
cache.put(&[(&key2, executor.clone())]);
cache.put(&[(&key3, executor.clone())]);
cache.put([(key1, executor.clone())].into_iter());
cache.put([(key2, executor.clone())].into_iter());
cache.put([(key3, executor.clone())].into_iter());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key3).is_some());

assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_some());
cache.put(&[(&key4, executor.clone())]);
cache.put([(key4, executor.clone())].into_iter());
assert!(cache.get(&key4).is_some());
let num_retained = [&key1, &key2, &key3]
.iter()
Expand All @@ -409,7 +398,7 @@ mod tests {
assert!(cache.get(&key4).is_some());
assert!(cache.get(&key4).is_some());
assert!(cache.get(&key4).is_some());
cache.put(&[(&key3, executor.clone())]);
cache.put([(key3, executor.clone())].into_iter());
assert!(cache.get(&key3).is_some());
let num_retained = [&key1, &key2, &key4]
.iter()
Expand All @@ -428,9 +417,9 @@ mod tests {
let mut cache = BankExecutorCache::new(3, 0);
assert!(cache.current_epoch == 0);

cache.put(&[(&key1, executor.clone())]);
cache.put(&[(&key2, executor.clone())]);
cache.put(&[(&key3, executor.clone())]);
cache.put([(key1, executor.clone())].into_iter());
cache.put([(key2, executor.clone())].into_iter());
cache.put([(key3, executor.clone())].into_iter());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());
Expand All @@ -441,7 +430,7 @@ mod tests {
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key3).is_some());
cache.put(&[(&key4, executor.clone())]);
cache.put([(key4, executor.clone())].into_iter());

assert!(cache.get(&key4).is_some());
let num_retained = [&key1, &key2, &key3]
Expand All @@ -450,8 +439,8 @@ mod tests {
.count();
assert_eq!(num_retained, 2);

cache.put(&[(&key1, executor.clone())]);
cache.put(&[(&key3, executor.clone())]);
cache.put([(key1, executor.clone())].into_iter());
cache.put([(key3, executor.clone())].into_iter());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key3).is_some());
let num_retained = [&key2, &key4]
Expand All @@ -463,7 +452,7 @@ mod tests {
cache = BankExecutorCache::new_from_parent_bank_executors(&cache, 2);
assert!(cache.current_epoch == 2);

cache.put(&[(&key3, executor.clone())]);
cache.put([(key3, executor.clone())].into_iter());
assert!(cache.get(&key3).is_some());
}

Expand All @@ -475,11 +464,11 @@ mod tests {
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});
let mut cache = BankExecutorCache::new(2, 0);

cache.put(&[(&key1, executor.clone())]);
cache.put([(key1, executor.clone())].into_iter());
for _ in 0..5 {
let _ = cache.get(&key1);
}
cache.put(&[(&key2, executor.clone())]);
cache.put([(key2, executor.clone())].into_iter());
// make key1's use-count for sure greater than key2's
let _ = cache.get(&key1);

Expand All @@ -491,7 +480,7 @@ mod tests {
entries.sort_by_key(|(_, v)| *v);
assert!(entries[0].1 < entries[1].1);

cache.put(&[(&key3, executor.clone())]);
cache.put([(key3, executor.clone())].into_iter());
assert!(cache.get(&entries[0].0).is_none());
assert!(cache.get(&entries[1].0).is_some());
}
Expand All @@ -508,10 +497,10 @@ mod tests {
assert_eq!(cache.stats.one_hit_wonders.load(Relaxed), 0);

// add our one-hit-wonder
cache.put(&[(&one_hit_wonder, executor.clone())]);
cache.put([(one_hit_wonder, executor.clone())].into_iter());
assert_eq!(cache.executors[&one_hit_wonder].hit_count.load(Relaxed), 1);
// displace the one-hit-wonder with "popular program"
cache.put(&[(&popular, executor.clone())]);
cache.put([(popular, executor.clone())].into_iter());
assert_eq!(cache.executors[&popular].hit_count.load(Relaxed), 1);

// one-hit-wonder counter incremented
Expand All @@ -522,7 +511,7 @@ mod tests {
assert_eq!(cache.executors[&popular].hit_count.load(Relaxed), 2);

// evict "popular program"
cache.put(&[(&one_hit_wonder, executor.clone())]);
cache.put([(one_hit_wonder, executor.clone())].into_iter());
assert_eq!(cache.executors[&one_hit_wonder].hit_count.load(Relaxed), 1);

// one-hit-wonder counter not incremented
Expand Down Expand Up @@ -593,13 +582,13 @@ mod tests {
assert_eq!(ComparableStats::from(&cache.stats), expected_stats,);

// insert some executors
cache.put(&[(&program_id1, executor.clone())]);
cache.put(&[(&program_id2, executor.clone())]);
cache.put([(program_id1, executor.clone())].into_iter());
cache.put([(program_id2, executor.clone())].into_iter());
expected_stats.insertions += 2;
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);

// replace a one-hit-wonder executor
cache.put(&[(&program_id1, executor.clone())]);
cache.put([(program_id1, executor.clone())].into_iter());
expected_stats.replacements += 1;
expected_stats.one_hit_wonders += 1;
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);
Expand All @@ -617,7 +606,7 @@ mod tests {
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);

// evict an executor
cache.put(&[(&Pubkey::new_unique(), executor.clone())]);
cache.put([(Pubkey::new_unique(), executor.clone())].into_iter());
expected_stats.insertions += 1;
expected_stats.evictions.insert(program_id2, 1);
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);
Expand Down
Loading

0 comments on commit 758d955

Please sign in to comment.