Skip to content

Commit

Permalink
Refactor RPC subscriptions account handling (solana-labs#9888)
Browse files Browse the repository at this point in the history
* Switch subscriptions to use commitment instead of confirmations

* Add bank method to return account and last-modified slot

* Add last_modified_slot to subscription data and use to filter account subscriptions

* Update tests to non-zero last_notified_slot

* Add accounts subscriptions to test; fails at higher tx load

* Pass BankForks to RpcSubscriptions

* Use BankForks on add_account_subscription to properly initialize last_notified_slot

* Bundle subscriptions

* Check for non-equality

* Use commitment to initialize last_notified_slot; revert context.slot chage
  • Loading branch information
CriesofCarrots authored May 7, 2020
1 parent f6e26f6 commit 754c65c
Show file tree
Hide file tree
Showing 10 changed files with 460 additions and 273 deletions.
18 changes: 18 additions & 0 deletions core/src/commitment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,24 @@ impl BlockCommitmentCache {
}
}

#[cfg(test)]
pub fn new_for_tests_with_blockstore_bank(
blockstore: Arc<Blockstore>,
bank: Arc<Bank>,
root: Slot,
) -> Self {
let mut block_commitment: HashMap<Slot, BlockCommitment> = HashMap::new();
block_commitment.insert(0, BlockCommitment::default());
Self {
block_commitment,
blockstore,
total_stake: 42,
largest_confirmed_root: root,
bank,
root,
}
}

#[cfg(test)]
pub(crate) fn set_get_largest_confirmed_root(&mut self, root: Slot) {
self.largest_confirmed_root = root;
Expand Down
21 changes: 10 additions & 11 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,8 @@ impl ReplayStage {

// Vote on a fork
if let Some(ref vote_bank) = vote_bank {
subscriptions.notify_subscribers(
block_commitment_cache.read().unwrap().slot(),
&bank_forks,
);
subscriptions
.notify_subscribers(block_commitment_cache.read().unwrap().slot());
if let Some(votable_leader) =
leader_schedule_cache.slot_leader_at(vote_bank.slot(), Some(vote_bank))
{
Expand Down Expand Up @@ -2060,12 +2058,6 @@ pub(crate) mod tests {
);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0));
let exit = Arc::new(AtomicBool::new(false));
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore(
blockstore.clone(),
))),
));
let mut bank_forks = BankForks::new(0, bank0);

// Insert a non-root bank so that the propagation logic will update this
Expand All @@ -2089,7 +2081,14 @@ pub(crate) mod tests {
assert!(progress.get_propagated_stats(1).unwrap().is_leader_slot);
bank1.freeze();
bank_forks.insert(bank1);
let bank_forks = RwLock::new(bank_forks);
let bank_forks = Arc::new(RwLock::new(bank_forks));
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore(
blockstore.clone(),
))),
));

// Insert shreds for slot NUM_CONSECUTIVE_LEADER_SLOTS,
// chaining to slot 1
Expand Down
Loading

0 comments on commit 754c65c

Please sign in to comment.