Skip to content

Commit

Permalink
Add voting service (solana-labs#18552) (solana-labs#18722)
Browse files Browse the repository at this point in the history
Co-authored-by: sakridge <sakridge@gmail.com>
  • Loading branch information
mergify[bot] and sakridge authored Jul 16, 2021
1 parent 4145c62 commit 2a93147
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 28 deletions.
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub mod unfrozen_gossip_verified_vote_hashes;
pub mod validator;
pub mod verified_vote_packets;
pub mod vote_stake_tracker;
pub mod voting_service;
pub mod window_service;

#[macro_use]
Expand Down
77 changes: 49 additions & 28 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
result::Result,
rewards_recorder_service::RewardsRecorderSender,
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
voting_service::VoteOp,
window_service::DuplicateSlotReceiver,
};
use solana_client::rpc_response::SlotUpdate;
Expand Down Expand Up @@ -309,6 +310,7 @@ impl ReplayStage {
gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
cluster_slots_update_sender: ClusterSlotsUpdateSender,
voting_sender: Sender<VoteOp>,
) -> Self {
let ReplayStageConfig {
my_pubkey,
Expand Down Expand Up @@ -513,12 +515,14 @@ impl ReplayStage {
if let Some(my_latest_landed_vote) = progress.my_latest_landed_vote(heaviest_bank_on_same_voted_fork.slot()) {
Self::refresh_last_vote(&mut tower, &cluster_info,
heaviest_bank_on_same_voted_fork,
&poh_recorder, my_latest_landed_vote,
my_latest_landed_vote,
&vote_account,
&authorized_voter_keypairs.read().unwrap(),
&mut voted_signatures,
has_new_vote_been_rooted, &mut
last_vote_refresh_time);
last_vote_refresh_time,
&voting_sender,
);
}
}

Expand Down Expand Up @@ -576,7 +580,6 @@ impl ReplayStage {

Self::handle_votable_bank(
vote_bank,
&poh_recorder,
switch_fork_decision,
&bank_forks,
&mut tower,
Expand All @@ -599,6 +602,7 @@ impl ReplayStage {
&mut voted_signatures,
&mut has_new_vote_been_rooted,
&mut replay_timing,
&voting_sender,
);
};
voting_time.stop();
Expand Down Expand Up @@ -1285,7 +1289,6 @@ impl ReplayStage {
#[allow(clippy::too_many_arguments)]
fn handle_votable_bank(
bank: &Arc<Bank>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
switch_fork_decision: &SwitchForkDecision,
bank_forks: &Arc<RwLock<BankForks>>,
tower: &mut Tower,
Expand All @@ -1308,6 +1311,7 @@ impl ReplayStage {
vote_signatures: &mut Vec<Signature>,
has_new_vote_been_rooted: &mut bool,
replay_timing: &mut ReplayTiming,
voting_sender: &Sender<VoteOp>,
) {
if bank.is_empty() {
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
Expand Down Expand Up @@ -1385,14 +1389,14 @@ impl ReplayStage {
Self::push_vote(
cluster_info,
bank,
poh_recorder,
vote_account_pubkey,
authorized_voter_keypairs,
tower,
switch_fork_decision,
vote_signatures,
*has_new_vote_been_rooted,
replay_timing,
voting_sender,
);
}

Expand Down Expand Up @@ -1486,13 +1490,13 @@ impl ReplayStage {
tower: &mut Tower,
cluster_info: &ClusterInfo,
heaviest_bank_on_same_fork: &Bank,
poh_recorder: &Mutex<PohRecorder>,
my_latest_landed_vote: Slot,
vote_account_pubkey: &Pubkey,
authorized_voter_keypairs: &[Arc<Keypair>],
vote_signatures: &mut Vec<Signature>,
has_new_vote_been_rooted: bool,
last_vote_refresh_time: &mut LastVoteRefreshTime,
voting_sender: &Sender<VoteOp>,
) {
let last_voted_slot = tower.last_voted_slot();
if last_voted_slot.is_none() {
Expand Down Expand Up @@ -1549,11 +1553,12 @@ impl ReplayStage {
("target_bank_slot", heaviest_bank_on_same_fork.slot(), i64),
("target_bank_hash", hash_string, String),
);
let _ = cluster_info.send_vote(
&vote_tx,
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder),
);
cluster_info.refresh_vote(vote_tx, last_voted_slot);
voting_sender
.send(VoteOp::RefreshVote {
tx: vote_tx,
last_voted_slot,
})
.unwrap_or_else(|err| warn!("Error: {:?}", err));
last_vote_refresh_time.last_refresh_time = Instant::now();
}
}
Expand All @@ -1562,14 +1567,14 @@ impl ReplayStage {
fn push_vote(
cluster_info: &ClusterInfo,
bank: &Bank,
poh_recorder: &Mutex<PohRecorder>,
vote_account_pubkey: &Pubkey,
authorized_voter_keypairs: &[Arc<Keypair>],
tower: &mut Tower,
switch_fork_decision: &SwitchForkDecision,
vote_signatures: &mut Vec<Signature>,
has_new_vote_been_rooted: bool,
replay_timing: &mut ReplayTiming,
voting_sender: &Sender<VoteOp>,
) {
let mut generate_time = Measure::start("generate_vote");
let vote_tx = Self::generate_vote_tx(
Expand All @@ -1586,16 +1591,14 @@ impl ReplayStage {
replay_timing.generate_vote_us += generate_time.as_us();
if let Some(vote_tx) = vote_tx {
tower.refresh_last_vote_tx_blockhash(vote_tx.message.recent_blockhash);
let mut send_time = Measure::start("send_vote");
let _ = cluster_info.send_vote(
&vote_tx,
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder),
);
send_time.stop();
let mut push_time = Measure::start("push_vote");
cluster_info.push_vote(&tower.tower_slots(), vote_tx);
push_time.stop();
replay_timing.vote_push_us += push_time.as_us();

let tower_slots = tower.tower_slots();
voting_sender
.send(VoteOp::PushVote {
tx: vote_tx,
tower_slots,
})
.unwrap_or_else(|err| warn!("Error: {:?}", err));
}
}

Expand Down Expand Up @@ -2546,6 +2549,7 @@ mod tests {
vote_state::{VoteState, VoteStateVersions},
vote_transaction,
};
use std::sync::mpsc::channel;
use std::{
fs::remove_dir_all,
iter,
Expand Down Expand Up @@ -4672,6 +4676,7 @@ mod tests {
}
}
}
let (voting_sender, voting_receiver) = channel();

// Simulate landing a vote for slot 0 landing in slot 1
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
Expand All @@ -4680,15 +4685,20 @@ mod tests {
ReplayStage::push_vote(
&cluster_info,
&bank0,
&poh_recorder,
&my_vote_pubkey,
&my_vote_keypair,
&mut tower,
&SwitchForkDecision::SameFork,
&mut voted_signatures,
has_new_vote_been_rooted,
&mut ReplayTiming::default(),
&voting_sender,
);
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);

let mut cursor = Cursor::default();
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
Expand All @@ -4709,13 +4719,13 @@ mod tests {
&mut tower,
&cluster_info,
refresh_bank,
&poh_recorder,
Tower::last_voted_slot_in_bank(refresh_bank, &my_vote_pubkey).unwrap(),
&my_vote_pubkey,
&my_vote_keypair,
&mut voted_signatures,
has_new_vote_been_rooted,
&mut last_vote_refresh_time,
&voting_sender,
);

// No new votes have been submitted to gossip
Expand All @@ -4732,15 +4742,19 @@ mod tests {
ReplayStage::push_vote(
&cluster_info,
&bank1,
&poh_recorder,
&my_vote_pubkey,
&my_vote_keypair,
&mut tower,
&SwitchForkDecision::SameFork,
&mut voted_signatures,
has_new_vote_been_rooted,
&mut ReplayTiming::default(),
&voting_sender,
);
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
let vote_tx = &votes[0];
Expand All @@ -4754,14 +4768,15 @@ mod tests {
&mut tower,
&cluster_info,
&bank2,
&poh_recorder,
Tower::last_voted_slot_in_bank(&bank2, &my_vote_pubkey).unwrap(),
&my_vote_pubkey,
&my_vote_keypair,
&mut voted_signatures,
has_new_vote_been_rooted,
&mut last_vote_refresh_time,
&voting_sender,
);

// No new votes have been submitted to gossip
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert!(votes.is_empty());
Expand Down Expand Up @@ -4790,14 +4805,19 @@ mod tests {
&mut tower,
&cluster_info,
&expired_bank,
&poh_recorder,
Tower::last_voted_slot_in_bank(&expired_bank, &my_vote_pubkey).unwrap(),
&my_vote_pubkey,
&my_vote_keypair,
&mut voted_signatures,
has_new_vote_been_rooted,
&mut last_vote_refresh_time,
&voting_sender,
);
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);

assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
Expand Down Expand Up @@ -4846,14 +4866,15 @@ mod tests {
&mut tower,
&cluster_info,
&expired_bank_sibling,
&poh_recorder,
Tower::last_voted_slot_in_bank(&expired_bank_sibling, &my_vote_pubkey).unwrap(),
&my_vote_pubkey,
&my_vote_keypair,
&mut voted_signatures,
has_new_vote_been_rooted,
&mut last_vote_refresh_time,
&voting_sender,
);

let (_, votes) = cluster_info.get_votes(&mut cursor);
assert!(votes.is_empty());
assert_eq!(
Expand Down
9 changes: 9 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
sigverify_shreds::ShredSigVerifier,
sigverify_stage::SigVerifyStage,
snapshot_packager_service::PendingSnapshotPackage,
voting_service::VotingService,
};
use crossbeam_channel::unbounded;
use solana_gossip::cluster_info::ClusterInfo;
Expand Down Expand Up @@ -66,6 +67,7 @@ pub struct Tvu {
ledger_cleanup_service: Option<LedgerCleanupService>,
accounts_background_service: AccountsBackgroundService,
accounts_hash_verifier: AccountsHashVerifier,
voting_service: VotingService,
}

pub struct Sockets {
Expand Down Expand Up @@ -276,6 +278,10 @@ impl Tvu {
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
};

let (voting_sender, voting_receiver) = channel();
let voting_service =
VotingService::new(voting_receiver, cluster_info.clone(), poh_recorder.clone());

let replay_stage = ReplayStage::new(
replay_stage_config,
blockstore.clone(),
Expand All @@ -293,6 +299,7 @@ impl Tvu {
gossip_confirmed_slots_receiver,
gossip_verified_vote_hash_receiver,
cluster_slots_update_sender,
voting_sender,
);

let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
Expand Down Expand Up @@ -323,6 +330,7 @@ impl Tvu {
ledger_cleanup_service,
accounts_background_service,
accounts_hash_verifier,
voting_service,
}
}

Expand All @@ -336,6 +344,7 @@ impl Tvu {
self.accounts_background_service.join()?;
self.replay_stage.join()?;
self.accounts_hash_verifier.join()?;
self.voting_service.join()?;
Ok(())
}
}
Expand Down
Loading

0 comments on commit 2a93147

Please sign in to comment.