Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Move block-time caching earlier (#17109) (#17150)
Browse files Browse the repository at this point in the history
* Require that blockstore block-time only be recognized slot, instead of root

* Move cache_block_time to after Bank freeze

* Single use statement

* Pass transaction_status_sender by reference

* Remove unnecessary slot-existence check before caching block time altogether

* Move block-time existence check into Blockstore::cache_block_time, Blockstore no longer needed in blockstore_processor helper

(cherry picked from commit 6e9deaf)

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
  • Loading branch information
mergify[bot] and CriesofCarrots authored May 10, 2021
1 parent 094271b commit 0cf8388
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 97 deletions.
24 changes: 13 additions & 11 deletions core/src/cache_block_time_service.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_ledger::blockstore::Blockstore;
use solana_measure::measure::Measure;
use solana_runtime::bank::Bank;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
pub use solana_ledger::blockstore_processor::CacheBlockTimeSender;
use {
crossbeam_channel::{Receiver, RecvTimeoutError},
solana_ledger::blockstore::Blockstore,
solana_measure::measure::Measure,
solana_runtime::bank::Bank,
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};

pub type CacheBlockTimeReceiver = Receiver<Arc<Bank>>;
pub type CacheBlockTimeSender = Sender<Arc<Bank>>;

pub struct CacheBlockTimeService {
thread_hdl: JoinHandle<()>,
Expand Down
53 changes: 9 additions & 44 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,8 @@ impl ReplayStage {
&my_pubkey,
&vote_account,
&mut progress,
transaction_status_sender.clone(),
transaction_status_sender.as_ref(),
cache_block_time_sender.as_ref(),
&verify_recyclers,
&mut heaviest_subtree_fork_choice,
&replay_vote_sender,
Expand Down Expand Up @@ -576,7 +577,6 @@ impl ReplayStage {
&subscriptions,
&block_commitment_cache,
&mut heaviest_subtree_fork_choice,
&cache_block_time_sender,
&bank_notification_sender,
&mut gossip_duplicate_confirmed_slots,
&mut unfrozen_gossip_verified_vote_hashes,
Expand Down Expand Up @@ -1216,7 +1216,7 @@ impl ReplayStage {
bank: &Arc<Bank>,
blockstore: &Blockstore,
bank_progress: &mut ForkProgress,
transaction_status_sender: Option<TransactionStatusSender>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
verify_recyclers: &VerifyRecyclers,
) -> result::Result<usize, BlockstoreProcessorError> {
Expand Down Expand Up @@ -1323,7 +1323,6 @@ impl ReplayStage {
subscriptions: &Arc<RpcSubscriptions>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
cache_block_time_sender: &Option<CacheBlockTimeSender>,
bank_notification_sender: &Option<BankNotificationSender>,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
Expand Down Expand Up @@ -1360,12 +1359,6 @@ impl ReplayStage {
blockstore
.set_roots(&rooted_slots)
.expect("Ledger set roots failed");
Self::cache_block_times(
blockstore,
bank_forks,
&rooted_slots,
cache_block_time_sender,
);
let highest_confirmed_root = Some(
block_commitment_cache
.read()
Expand Down Expand Up @@ -1659,7 +1652,8 @@ impl ReplayStage {
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
transaction_status_sender: Option<TransactionStatusSender>,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_time_sender: Option<&CacheBlockTimeSender>,
verify_recyclers: &VerifyRecyclers,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
replay_vote_sender: &ReplayVoteSender,
Expand Down Expand Up @@ -1723,7 +1717,7 @@ impl ReplayStage {
&bank,
&blockstore,
bank_progress,
transaction_status_sender.clone(),
transaction_status_sender,
replay_vote_sender,
verify_recyclers,
);
Expand Down Expand Up @@ -1758,7 +1752,7 @@ impl ReplayStage {
);
did_complete_bank = true;
info!("bank frozen: {}", bank.slot());
if let Some(transaction_status_sender) = transaction_status_sender.clone() {
if let Some(transaction_status_sender) = transaction_status_sender {
transaction_status_sender.send_transaction_status_freeze_message(&bank);
}
bank.freeze();
Expand All @@ -1784,6 +1778,7 @@ impl ReplayStage {
.send(BankNotification::Frozen(bank.clone()))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
}
blockstore_processor::cache_block_time(&bank, cache_block_time_sender);

let bank_hash = bank.hash();
if let Some(new_frozen_voters) =
Expand Down Expand Up @@ -2477,36 +2472,6 @@ impl ReplayStage {
}
}

fn cache_block_times(
blockstore: &Arc<Blockstore>,
bank_forks: &Arc<RwLock<BankForks>>,
rooted_slots: &[Slot],
cache_block_time_sender: &Option<CacheBlockTimeSender>,
) {
if let Some(cache_block_time_sender) = cache_block_time_sender {
for slot in rooted_slots {
if blockstore
.get_block_time(*slot)
.unwrap_or_default()
.is_none()
{
if let Some(rooted_bank) = bank_forks.read().unwrap().get(*slot) {
cache_block_time_sender
.send(rooted_bank.clone())
.unwrap_or_else(|err| {
warn!("cache_block_time_sender failed: {:?}", err)
});
} else {
error!(
"rooted_bank {:?} not available in BankForks; block time not cached",
slot
);
}
}
}
}
}

pub fn get_unlock_switch_vote_slot(cluster_type: ClusterType) -> Slot {
match cluster_type {
ClusterType::Development => 0,
Expand Down Expand Up @@ -3404,7 +3369,7 @@ pub(crate) mod tests {
&bank,
&mut entries,
true,
Some(TransactionStatusSender {
Some(&TransactionStatusSender {
sender: transaction_status_sender,
enable_cpi_and_log_storage: false,
}),
Expand Down
2 changes: 1 addition & 1 deletion core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ mod tests {
..ProcessOptions::default()
};
let (bank_forks, cached_leader_schedule) =
process_blockstore(&genesis_config, &blockstore, Vec::new(), opts).unwrap();
process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap();
let leader_schedule_cache = Arc::new(cached_leader_schedule);
let bank_forks = Arc::new(RwLock::new(bank_forks));

Expand Down
5 changes: 4 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,10 @@ fn new_banks_from_ledger(
process_options,
transaction_history_services
.transaction_status_sender
.clone(),
.as_ref(),
transaction_history_services
.cache_block_time_sender
.as_ref(),
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
Expand Down
1 change: 1 addition & 0 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ fn load_bank_forks(
snapshot_config.as_ref(),
process_options,
None,
None,
)
}

Expand Down
9 changes: 6 additions & 3 deletions ledger/src/bank_forks_utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
blockstore::Blockstore,
blockstore_processor::{
self, BlockstoreProcessorError, BlockstoreProcessorResult, ProcessOptions,
TransactionStatusSender,
self, BlockstoreProcessorError, BlockstoreProcessorResult, CacheBlockTimeSender,
ProcessOptions, TransactionStatusSender,
},
entry::VerifyRecyclers,
leader_schedule_cache::LeaderScheduleCache,
Expand Down Expand Up @@ -36,7 +36,8 @@ pub fn load(
shrink_paths: Option<Vec<PathBuf>>,
snapshot_config: Option<&SnapshotConfig>,
process_options: ProcessOptions,
transaction_status_sender: Option<TransactionStatusSender>,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_time_sender: Option<&CacheBlockTimeSender>,
) -> LoadResult {
if let Some(snapshot_config) = snapshot_config.as_ref() {
info!(
Expand Down Expand Up @@ -96,6 +97,7 @@ pub fn load(
&process_options,
&VerifyRecyclers::default(),
transaction_status_sender,
cache_block_time_sender,
),
Some(deserialized_snapshot_hash),
);
Expand All @@ -113,6 +115,7 @@ pub fn load(
&blockstore,
account_paths,
process_options,
cache_block_time_sender,
),
None,
)
Expand Down
7 changes: 4 additions & 3 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1712,10 +1712,11 @@ impl Blockstore {
}

pub fn cache_block_time(&self, slot: Slot, timestamp: UnixTimestamp) -> Result<()> {
if !self.is_root(slot) {
return Err(BlockstoreError::SlotNotRooted);
if self.get_block_time(slot).unwrap_or_default().is_none() {
self.blocktime_cf.put(slot, &timestamp)
} else {
Ok(())
}
self.blocktime_cf.put(slot, &timestamp)
}

pub fn get_first_available_block(&self) -> Result<Slot> {
Expand Down
Loading

0 comments on commit 0cf8388

Please sign in to comment.