Skip to content

Commit

Permalink
Clean up exit flag handing across TVU
Browse files Browse the repository at this point in the history
  • Loading branch information
mvines committed Mar 5, 2019
1 parent a39b4f4 commit 0fd3c0e
Show file tree
Hide file tree
Showing 21 changed files with 96 additions and 138 deletions.
7 changes: 1 addition & 6 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ fn main() -> Result<()> {

let (s_reader, r_reader) = channel();
read_channels.push(r_reader);
read_threads.push(receiver(
Arc::new(read),
exit.clone(),
s_reader,
"bench-streamer",
));
read_threads.push(receiver(Arc::new(read), &exit, s_reader, "bench-streamer"));
}

let t_producer1 = producer(&addr, exit.clone());
Expand Down
11 changes: 7 additions & 4 deletions benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use solana_sdk::signature::{KeypairUtil, Signature};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES};
use std::iter;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
use std::time::Duration;
Expand Down Expand Up @@ -102,7 +103,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect())
})
.collect();
let (poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
Expand All @@ -127,7 +128,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
start += half_len;
start %= verified.len();
});
poh_service.close().unwrap();
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}

#[bench]
Expand Down Expand Up @@ -208,7 +210,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect())
})
.collect();
let (poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
Expand All @@ -233,5 +235,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
start += half_len;
start %= verified.len();
});
poh_service.close().unwrap();
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
23 changes: 14 additions & 9 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ impl Service for BankingStage {
pub fn create_test_recorder(
bank: &Arc<Bank>,
) -> (
Arc<AtomicBool>,
Arc<Mutex<PohRecorder>>,
PohService,
Receiver<WorkingBankEntries>,
Expand All @@ -356,7 +357,7 @@ pub fn create_test_recorder(
PohRecorder::new(bank.tick_height(), bank.last_blockhash());
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit);
(poh_recorder, poh_service, entry_receiver)
(exit, poh_recorder, poh_service, entry_receiver)
}

#[cfg(test)]
Expand All @@ -378,13 +379,14 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank);
let (exit, poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
drop(verified_sender);
exit.store(true, Ordering::Relaxed);
banking_stage.join().unwrap();
poh_service.close().unwrap();
poh_service.join().unwrap();
}

#[test]
Expand All @@ -395,15 +397,16 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank);
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
trace!("sending bank");
sleep(Duration::from_millis(600));
drop(verified_sender);
poh_service.close().unwrap();
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder);

trace!("getting entries");
Expand All @@ -424,7 +427,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank);
Expand Down Expand Up @@ -452,7 +455,8 @@ mod tests {
.unwrap();

drop(verified_sender);
poh_service.close().expect("close");
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder);

//receive entries + ticks
Expand Down Expand Up @@ -481,7 +485,7 @@ mod tests {
let (genesis_block, mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank);
Expand Down Expand Up @@ -516,7 +520,8 @@ mod tests {
.unwrap();

drop(verified_sender);
poh_service.close().expect("close");;
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder);

// Collect the ledger and feed it to a new bank.
Expand Down
15 changes: 5 additions & 10 deletions core/src/blob_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,29 @@
use crate::service::Service;
use crate::streamer::{self, BlobSender};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread::{self, JoinHandle};

pub struct BlobFetchStage {
exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>,
}

impl BlobFetchStage {
pub fn new(socket: Arc<UdpSocket>, sender: &BlobSender, exit: Arc<AtomicBool>) -> Self {
pub fn new(socket: Arc<UdpSocket>, sender: &BlobSender, exit: &Arc<AtomicBool>) -> Self {
Self::new_multi_socket(vec![socket], sender, exit)
}
pub fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
sender: &BlobSender,
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
) -> Self {
let thread_hdls: Vec<_> = sockets
.into_iter()
.map(|socket| streamer::blob_receiver(socket, exit.clone(), sender.clone()))
.map(|socket| streamer::blob_receiver(socket, &exit, sender.clone()))
.collect();

Self { exit, thread_hdls }
}

pub fn close(&self) {
self.exit.store(true, Ordering::Relaxed);
Self { thread_hdls }
}
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/blockstream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ impl BlockstreamService {
slot_full_receiver: Receiver<(u64, Pubkey)>,
blocktree: Arc<Blocktree>,
blockstream_socket: String,
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
) -> Self {
let mut blockstream = Blockstream::new(blockstream_socket);
let exit = exit.clone();
let t_blockstream = Builder::new()
.name("solana-blockstream".to_string())
.spawn(move || loop {
Expand Down
6 changes: 4 additions & 2 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,8 +868,9 @@ impl ClusterInfo {
obj: Arc<RwLock<Self>>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
blob_sender: BlobSender,
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
) -> JoinHandle<()> {
let exit = exit.clone();
Builder::new()
.name("solana-gossip".to_string())
.spawn(move || {
Expand Down Expand Up @@ -1243,8 +1244,9 @@ impl ClusterInfo {
blocktree: Option<Arc<Blocktree>>,
requests_receiver: BlobReceiver,
response_sender: BlobSender,
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
) -> JoinHandle<()> {
let exit = exit.clone();
Builder::new()
.name("solana-listen".to_string())
.spawn(move || loop {
Expand Down
7 changes: 1 addition & 6 deletions core/src/db_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,7 @@ mod test {
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel();
let t_receiver = receiver(
Arc::new(read),
exit.clone(),
s_reader,
"window-streamer-test",
);
let t_receiver = receiver(Arc::new(read), &exit, s_reader, "window-streamer-test");
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);
Expand Down
2 changes: 1 addition & 1 deletion core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl FetchStage {
) -> Self {
let thread_hdls: Vec<_> = sockets
.into_iter()
.map(|socket| streamer::receiver(socket, exit.clone(), sender.clone(), "fetch-stage"))
.map(|socket| streamer::receiver(socket, &exit, sender.clone(), "fetch-stage"))
.collect();

Self { thread_hdls }
Expand Down
2 changes: 1 addition & 1 deletion core/src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl Fullnode {
// Used for notifying many nodes in parallel to exit
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);

// Need to force the poh_recorder to drop the WorkingBank,
// which contains the channel to BroadcastStage. This should be
// sufficient as long as no other rotations are happening that
Expand All @@ -272,7 +273,6 @@ impl Fullnode {
// in motion because exit()/close() are only called by the run() loop
// which is the sole initiator of rotations.
self.poh_recorder.lock().unwrap().clear_bank();
self.poh_service.exit();
}

pub fn close(self) -> Result<()> {
Expand Down
12 changes: 3 additions & 9 deletions core/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,17 @@ impl GossipService {
&cluster_info.read().unwrap().my_data().id,
gossip_socket.local_addr().unwrap()
);
let t_receiver =
streamer::blob_receiver(gossip_socket.clone(), exit.clone(), request_sender);
let t_receiver = streamer::blob_receiver(gossip_socket.clone(), &exit, request_sender);
let (response_sender, response_receiver) = channel();
let t_responder = streamer::responder("gossip", gossip_socket, response_receiver);
let t_listen = ClusterInfo::listen(
cluster_info.clone(),
blocktree,
request_receiver,
response_sender.clone(),
exit.clone(),
);
let t_gossip = ClusterInfo::gossip(
cluster_info.clone(),
bank_forks,
response_sender,
exit.clone(),
exit,
);
let t_gossip = ClusterInfo::gossip(cluster_info.clone(), bank_forks, response_sender, exit);
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Self { thread_hdls }
}
Expand Down
18 changes: 2 additions & 16 deletions core/src/poh_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,9 @@ impl Default for PohServiceConfig {

pub struct PohService {
tick_producer: JoinHandle<()>,
poh_exit: Arc<AtomicBool>,
}

impl PohService {
pub fn exit(&self) {
self.poh_exit.store(true, Ordering::Relaxed);
}

pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}

pub fn new(
poh_recorder: Arc<Mutex<PohRecorder>>,
config: &PohServiceConfig,
Expand All @@ -64,10 +54,7 @@ impl PohService {
})
.unwrap();

Self {
tick_producer,
poh_exit: poh_exit.clone(),
}
Self { tick_producer }
}

fn tick_producer(
Expand Down Expand Up @@ -157,7 +144,7 @@ mod tests {
let poh_service = PohService::new(
poh_recorder.clone(),
&PohServiceConfig::Tick(HASHES_PER_TICK as usize),
&Arc::new(AtomicBool::new(false)),
&exit,
);
poh_recorder.lock().unwrap().set_working_bank(working_bank);

Expand Down Expand Up @@ -192,7 +179,6 @@ mod tests {
}
}
exit.store(true, Ordering::Relaxed);
poh_service.exit();
let _ = poh_service.join().unwrap();
let _ = entry_producer.join().unwrap();
}
Expand Down
7 changes: 4 additions & 3 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct RepairService {
impl RepairService {
fn run(
blocktree: &Arc<Blocktree>,
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
repair_socket: &Arc<UdpSocket>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) {
Expand Down Expand Up @@ -112,13 +112,14 @@ impl RepairService {

pub fn new(
blocktree: Arc<Blocktree>,
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
) -> Self {
let exit = exit.clone();
let t_repair = Builder::new()
.name("solana-repair-service".to_string())
.spawn(move || Self::run(&blocktree, &exit, &repair_socket, &cluster_info))
.spawn(move || Self::run(&blocktree, exit, &repair_socket, &cluster_info))
.unwrap();

RepairService { t_repair }
Expand Down
Loading

0 comments on commit 0fd3c0e

Please sign in to comment.