diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 1907589cbd2065..1663e1e60da53e 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -131,6 +131,7 @@ fn main() { gossip_sock.local_addr().unwrap(), replicate_sock.local_addr().unwrap(), serve_sock.local_addr().unwrap(), + events_sock.local_addr().unwrap(), ); let mut local = serve_sock.local_addr().unwrap(); @@ -139,7 +140,7 @@ fn main() { let respond_socket = UdpSocket::bind(local.clone()).unwrap(); eprintln!("starting server..."); - let server = Server::new( + let server = Server::new_leader( bank, last_id, Some(Duration::from_millis(1000)), diff --git a/src/crdt.rs b/src/crdt.rs index 7029e79b21be9e..eef6981f4720dd 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -31,7 +31,7 @@ use std::thread::{sleep, spawn, JoinHandle}; use std::time::Duration; /// Structure to be replicated by the network -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct ReplicatedData { pub id: PublicKey, sig: Signature, @@ -42,7 +42,9 @@ pub struct ReplicatedData { /// address to connect to for replication pub replicate_addr: SocketAddr, /// address to connect to when this node is leader - pub serve_addr: SocketAddr, + pub requests_addr: SocketAddr, + /// events address + pub events_addr: SocketAddr, /// current leader identity current_leader_id: PublicKey, /// last verified hash that was submitted to the leader @@ -56,7 +58,8 @@ impl ReplicatedData { id: PublicKey, gossip_addr: SocketAddr, replicate_addr: SocketAddr, - serve_addr: SocketAddr, + requests_addr: SocketAddr, + events_addr: SocketAddr, ) -> ReplicatedData { ReplicatedData { id, @@ -64,7 +67,8 @@ impl ReplicatedData { version: 0, gossip_addr, replicate_addr, - serve_addr, + requests_addr, + events_addr, current_leader_id: PublicKey::default(), last_verified_hash: Hash::default(), last_verified_count: 0, @@ -515,12 +519,14 @@ mod test { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); + let events = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), serve.local_addr().unwrap(), + events.local_addr().unwrap(), ); let crdt = Crdt::new(d); trace!( @@ -632,6 +638,7 @@ mod test { "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), ); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()); diff --git a/src/server.rs b/src/server.rs index b250e86d84bf0a..12828128ebeb5c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -11,13 +11,14 @@ use std::sync::atomic::AtomicBool; use std::thread::JoinHandle; use std::time::Duration; use tpu::Tpu; +use tvu::Tvu; pub struct Server { pub thread_hdls: Vec>, } impl Server { - pub fn new( + pub fn new_leader( bank: Bank, start_hash: Hash, tick_duration: Option, @@ -26,7 +27,7 @@ impl Server { events_socket: UdpSocket, broadcast_socket: UdpSocket, respond_socket: UdpSocket, - gossip: UdpSocket, + gossip_socket: UdpSocket, exit: Arc, writer: W, ) -> Self { @@ -34,7 +35,6 @@ impl Server { let mut thread_hdls = vec![]; let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); thread_hdls.extend(rpu.thread_hdls); - let tpu = Tpu::new( bank.clone(), start_hash, @@ -42,12 +42,36 @@ impl Server { me, events_socket, broadcast_socket, - gossip, + gossip_socket, exit.clone(), writer, ); thread_hdls.extend(tpu.thread_hdls); - + Server { thread_hdls } + } + pub fn new_validator( + bank: Bank, + me: ReplicatedData, + requests_socket: UdpSocket, + respond_socket: UdpSocket, + replicate_socket: UdpSocket, + gossip_socket: UdpSocket, + leader_repl_data: ReplicatedData, + exit: Arc, + ) -> Self { + let bank = Arc::new(bank); + let mut thread_hdls = vec![]; + let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); + thread_hdls.extend(rpu.thread_hdls); + let tvu = Tvu::new( + bank.clone(), + me, + gossip_socket, + replicate_socket, + leader_repl_data, + exit.clone(), + ); + thread_hdls.extend(tvu.thread_hdls); Server { thread_hdls } } } diff --git a/src/streamer.rs b/src/streamer.rs index ee83cd25d027c6..719aa46aa4b6fc 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -650,12 +650,14 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let serve = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let event = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let rep_data = ReplicatedData::new( pubkey_me, read.local_addr().unwrap(), send.local_addr().unwrap(), serve.local_addr().unwrap(), + event.local_addr().unwrap(), ); let mut crdt_me = Crdt::new(rep_data); let me_id = crdt_me.my_data().id; @@ -712,21 +714,17 @@ mod test { let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let serve = UdpSocket::bind("127.0.0.1:0").unwrap(); + let event = UdpSocket::bind("127.0.0.1:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), serve.local_addr().unwrap(), + event.local_addr().unwrap(), ); + trace!("data: {:?}", d); let crdt = Crdt::new(d); - trace!( - "id: {} gossip: {} replicate: {} serve: {}", - crdt.my_data().id[0], - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - serve.local_addr().unwrap(), - ); (Arc::new(RwLock::new(crdt)), gossip, replicate, serve) } diff --git a/src/thin_client.rs b/src/thin_client.rs index b5632c9ae3e3dc..249119e0af6aa1 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -192,47 +192,28 @@ mod tests { use std::thread::sleep; use std::time::Duration; use streamer::default_window; - use tvu::{self, Tvu}; + use tvu::tests::TestNode; #[test] fn test_thin_client() { logger::setup(); - let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(1, 0))) - .unwrap(); - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let events_addr = events_socket.local_addr().unwrap(); - let addr = requests_socket.local_addr().unwrap(); - let pubkey = KeyPair::new().pubkey(); - let d = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - "0.0.0.0:0".parse().unwrap(), - requests_socket.local_addr().unwrap(), - ); + let leader = TestNode::new(); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let mut local = requests_socket.local_addr().unwrap(); - local.set_port(0); - let broadcast_socket = UdpSocket::bind(local).unwrap(); - let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - - let server = Server::new( + let server = Server::new_leader( bank, alice.last_id(), Some(Duration::from_millis(30)), - d, - requests_socket, - events_socket, - broadcast_socket, - respond_socket, - gossip, + leader.data.clone(), + leader.sockets.requests, + leader.sockets.event, + leader.sockets.broadcast, + leader.sockets.respond, + leader.sockets.gossip, exit.clone(), sink(), ); @@ -241,7 +222,12 @@ mod tests { let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(addr, requests_socket, events_addr, events_socket); + let mut client = ThinClient::new( + leader.data.requests_addr, + requests_socket, + leader.data.events_addr, + events_socket, + ); let last_id = client.get_last_id().wait().unwrap(); let _sig = client .transfer(500, &alice.keypair(), bob_pubkey, &last_id) @@ -257,30 +243,22 @@ mod tests { #[test] fn test_bad_sig() { logger::setup(); - let (leader_data, leader_gossip, _, leader_serve, _leader_events) = tvu::test_node(); + let leader = TestNode::new(); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let serve_addr = leader_serve.local_addr().unwrap(); - let mut local = leader_serve.local_addr().unwrap(); - local.set_port(0); - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let broadcast_socket = UdpSocket::bind(local).unwrap(); - let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let events_addr = events_socket.local_addr().unwrap(); - - let server = Server::new( + let server = Server::new_leader( bank, alice.last_id(), Some(Duration::from_millis(30)), - leader_data, - leader_serve, - events_socket, - broadcast_socket, - respond_socket, - leader_gossip, + leader.data.clone(), + leader.sockets.requests, + leader.sockets.event, + leader.sockets.broadcast, + leader.sockets.respond, + leader.sockets.gossip, exit.clone(), sink(), ); @@ -291,7 +269,12 @@ mod tests { .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(serve_addr, requests_socket, events_addr, events_socket); + let mut client = ThinClient::new( + leader.data.requests_addr, + requests_socket, + leader.data.events_addr, + events_socket, + ); let last_id = client.get_last_id().wait().unwrap(); let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); @@ -312,45 +295,25 @@ mod tests { t.join().unwrap(); } } - - fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { - let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); - serve.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); - - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); - let pubkey = KeyPair::new().pubkey(); - let leader = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - serve.local_addr().unwrap(), - ); - (leader, gossip, serve, replicate, events_socket) - } - fn replicant( leader: &ReplicatedData, exit: Arc, alice: &Mint, threads: &mut Vec>, ) { - let replicant = test_node(); - let replicant_bank = { - let bank = Bank::new(&alice); - Arc::new(Tvu::new(bank, alice.last_id(), None)) - }; - let mut ts = Tvu::serve( - &replicant_bank, - replicant.0.clone(), - replicant.1, - replicant.2, - replicant.3, + let replicant = TestNode::new(); + let replicant_bank = Bank::new(&alice); + let mut ts = Server::new_validator( + replicant_bank, + replicant.data.clone(), + replicant.sockets.requests, + replicant.sockets.respond, + replicant.sockets.replicate, + replicant.sockets.gossip, leader.clone(), exit.clone(), - ).unwrap(); - threads.append(&mut ts); + ); + threads.append(&mut ts.thread_hdls); } fn converge( @@ -360,18 +323,23 @@ mod tests { threads: &mut Vec>, ) -> Vec { //lets spy on the network - let (mut spy, spy_gossip, _, _, _) = test_node(); + let mut spy = TestNode::new(); let daddr = "0.0.0.0:0".parse().unwrap(); - let me = spy.id.clone(); - spy.replicate_addr = daddr; - spy.serve_addr = daddr; - let mut spy_crdt = Crdt::new(spy); + let me = spy.data.id.clone(); + spy.data.replicate_addr = daddr; + spy.data.requests_addr = daddr; + let mut spy_crdt = Crdt::new(spy.data); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = default_window(); - let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); + let t_spy_listen = Crdt::listen( + spy_ref.clone(), + spy_window, + spy.sockets.gossip, + exit.clone(), + ); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); //wait for the network to converge let mut converged = false; @@ -393,7 +361,7 @@ mod tests { .values() .into_iter() .filter(|x| x.id != me) - .map(|x| x.serve_addr) + .map(|x| x.requests_addr) .collect(); v.clone() } @@ -403,38 +371,32 @@ mod tests { logger::setup(); const N: usize = 5; trace!("test_multi_accountant_stub"); - let leader = test_node(); + let leader = TestNode::new(); let alice = Mint::new(10_000); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let leader_bank = Bank::new(&alice); - - let mut local = leader.2.local_addr().unwrap(); - local.set_port(0); - let broadcast_socket = UdpSocket::bind(local).unwrap(); - let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let events_addr = leader.4.local_addr().unwrap(); - - let server = Server::new( + let events_addr = leader.data.events_addr; + let server = Server::new_leader( leader_bank, alice.last_id(), None, - leader.0.clone(), - leader.2, - leader.4, - broadcast_socket, - respond_socket, - leader.1, + leader.data.clone(), + leader.sockets.requests, + leader.sockets.event, + leader.sockets.broadcast, + leader.sockets.respond, + leader.sockets.gossip, exit.clone(), sink(), ); let mut threads = server.thread_hdls; for _ in 0..N { - replicant(&leader.0, exit.clone(), &alice, &mut threads); + replicant(&leader.data, exit.clone(), &alice, &mut threads); } - let addrs = converge(&leader.0, exit.clone(), N + 2, &mut threads); + let addrs = converge(&leader.data, exit.clone(), N + 2, &mut threads); //contains the leader addr as well assert_eq!(addrs.len(), N + 1); //verify leader can do transfer @@ -446,9 +408,9 @@ mod tests { let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader.0.serve_addr, + leader.data.requests_addr, requests_socket, - events_addr, + leader.data.events_addr, events_socket, ); trace!("getting leader last_id"); diff --git a/src/tvu.rs b/src/tvu.rs index 18d09d08b1f322..d8faf3a315dc4b 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -1,67 +1,58 @@ //! The `tvu` module implements the Transaction Validation Unit, a //! 5-stage transaction validation pipeline in software. +//! 1. streamer +//! - Incoming blobs are picked up from the replicate socket. +//! 2. verifier +//! - TODO Blobs are sent to the GPU, and while the memory is there the PoH stream is verified +//! along with the ecdsa signature for the blob and each signature in all the transactions. Blobs +//! with errors are dropped, or marked for slashing. +//! 3.a retransmit +//! - Blobs originating from the parent (leader atm is the only parent), are retransmit to all the +//! peers in the crdt. Peers is everyone who is not me or the leader that has a known replicate +//! address. +//! 3.b window +//! - Verified blobs are placed into a window, indexed by the counter set by the leader.sockets. This could +//! be the PoH counter if its monitonically increasing in each blob. Easure coding is used to +//! recover any missing packets, and requests are made at random to peers and parents to retransmit +//! a missing packet. +//! 4. accountant +//! - Contigous blobs are sent to the accountant for processing transactions +//! 5. validator +//! - TODO Validation messages are sent back to the leader use bank::Bank; -use banking_stage::BankingStage; use crdt::{Crdt, ReplicatedData}; -use hash::Hash; use packet; -use record_stage::RecordStage; use replicate_stage::ReplicateStage; -use result::Result; -use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; -use std::time::Duration; use streamer; -use write_stage::WriteStage; pub struct Tvu { - bank: Arc, - start_hash: Hash, - tick_duration: Option, + pub thread_hdls: Vec>, } impl Tvu { - /// Create a new Tvu that wraps the given Bank. - pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option) -> Self { - Tvu { - bank: Arc::new(bank), - start_hash, - tick_duration, - } - } - /// This service receives messages from a leader in the network and processes the transactions /// on the bank state. /// # Arguments - /// * `obj` - The bank state. + /// * `bank` - The bank state. /// * `me` - my configuration + /// * `gossip` - my gosisp socket + /// * `replicte` - my replicte socket /// * `leader` - leader configuration /// * `exit` - The exit signal. - /// # Remarks - /// The pipeline is constructed as follows: - /// 1. receive blobs from the network, these are out of order - /// 2. verify blobs, PoH, signatures (TODO) - /// 3. reconstruct contiguous window - /// a. order the blobs - /// b. use erasure coding to reconstruct missing blobs - /// c. ask the network for missing blobs, if erasure coding is insufficient - /// d. make sure that the blobs PoH sequences connect (TODO) - /// 4. process the transaction state machine - /// 5. respond with the hash of the state back to the leader - pub fn serve( - obj: &Arc, + pub fn new( + bank: Arc, me: ReplicatedData, gossip: UdpSocket, - requests_socket: UdpSocket, replicate: UdpSocket, leader: ReplicatedData, exit: Arc, - ) -> Result>> { + ) -> Self { //replicate pipeline let crdt = Arc::new(RwLock::new(Crdt::new(me))); crdt.write() @@ -74,10 +65,11 @@ impl Tvu { let window = streamer::default_window(); let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); + // TODO pull this socket out through the public interface // make sure we are on the same interface - let mut local = replicate.local_addr()?; + let mut local = replicate.local_addr().expect("tvu: get local address"); local.set_port(0); - let write = UdpSocket::bind(local)?; + let write = UdpSocket::bind(local).expect("tvu: bind to local socket"); let blob_recycler = packet::BlobRecycler::default(); let (blob_sender, blob_receiver) = channel(); @@ -86,7 +78,7 @@ impl Tvu { blob_recycler.clone(), replicate, blob_sender.clone(), - )?; + ).expect("tvu: blob receiver creation"); let (window_sender, window_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel(); @@ -112,45 +104,13 @@ impl Tvu { ); let replicate_stage = ReplicateStage::new( - obj.bank.clone(), + bank.clone(), exit.clone(), window_receiver, blob_recycler.clone(), ); - //serve pipeline - // make sure we are on the same interface - let mut local = requests_socket.local_addr()?; - local.set_port(0); - - let packet_recycler = packet::PacketRecycler::default(); - let (packet_sender, packet_receiver) = channel(); - let t_packet_receiver = streamer::receiver( - requests_socket, - exit.clone(), - packet_recycler.clone(), - packet_sender, - ); - - let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - - let banking_stage = BankingStage::new( - obj.bank.clone(), - exit.clone(), - sig_verify_stage.verified_receiver, - packet_recycler.clone(), - ); - - let record_stage = RecordStage::new( - banking_stage.signal_receiver, - &obj.start_hash, - obj.tick_duration, - ); - - let write_stage = - WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver); - - let mut threads = vec![ + let threads = vec![ //replicate threads t_blob_receiver, t_retransmit, @@ -158,16 +118,16 @@ impl Tvu { replicate_stage.thread_hdl, t_gossip, t_listen, - //serve threads - t_packet_receiver, - banking_stage.thread_hdl, - write_stage.thread_hdl, ]; - threads.extend(sig_verify_stage.thread_hdls.into_iter()); - Ok(threads) + Tvu { + thread_hdls: threads, + } } } +#[cfg(test)] +use std::time::Duration; + #[cfg(test)] pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { use signature::{KeyPair, KeyPairUtil}; @@ -185,16 +145,18 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), requests_socket.local_addr().unwrap(), + events_socket.local_addr().unwrap(), ); (d, gossip, replicate, requests_socket, events_socket) } #[cfg(test)] -mod tests { +pub mod tests { use bank::Bank; use bincode::serialize; use chrono::prelude::*; use crdt::Crdt; + use crdt::ReplicatedData; use entry::Entry; use event::Event; use hash::{hash, Hash}; @@ -203,40 +165,41 @@ mod tests { use packet::BlobRecycler; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; + use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; use streamer; - use tvu::{test_node, Tvu}; + use tvu::Tvu; /// Test that mesasge sent from leader to target1 and repliated to target2 #[test] fn test_replicate() { logger::setup(); - let (leader_data, leader_gossip, _, leader_serve, _) = test_node(); - let (target1_data, target1_gossip, target1_replicate, target1_serve, _) = test_node(); - let (target2_data, target2_gossip, target2_replicate, _, _) = test_node(); + let leader = TestNode::new(); + let target1 = TestNode::new(); + let target2 = TestNode::new(); let exit = Arc::new(AtomicBool::new(false)); //start crdt_leader - let mut crdt_l = Crdt::new(leader_data.clone()); - crdt_l.set_leader(leader_data.id); + let mut crdt_l = Crdt::new(leader.data.clone()); + crdt_l.set_leader(leader.data.id); let cref_l = Arc::new(RwLock::new(crdt_l)); let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone()); let window1 = streamer::default_window(); - let t_l_listen = Crdt::listen(cref_l, window1, leader_gossip, exit.clone()); + let t_l_listen = Crdt::listen(cref_l, window1, leader.sockets.gossip, exit.clone()); //start crdt2 - let mut crdt2 = Crdt::new(target2_data.clone()); - crdt2.insert(&leader_data); - crdt2.set_leader(leader_data.id); - let leader_id = leader_data.id; + let mut crdt2 = Crdt::new(target2.data.clone()); + crdt2.insert(&leader.data); + crdt2.set_leader(leader.data.id); + let leader_id = leader.data.id; let cref2 = Arc::new(RwLock::new(crdt2)); let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone()); let window2 = streamer::default_window(); - let t2_listen = Crdt::listen(cref2, window2, target2_gossip, exit.clone()); + let t2_listen = Crdt::listen(cref2, window2, target2.sockets.gossip, exit.clone()); // setup some blob services to send blobs into the socket // to simulate the source peer and get blobs out of the socket to @@ -247,14 +210,14 @@ mod tests { let t_receiver = streamer::blob_receiver( exit.clone(), recv_recycler.clone(), - target2_replicate, + target2.sockets.replicate, s_reader, ).unwrap(); // simulate leader sending messages let (s_responder, r_responder) = channel(); let t_responder = streamer::responder( - leader_serve, + leader.sockets.requests, exit.clone(), resp_recycler.clone(), r_responder, @@ -262,22 +225,16 @@ mod tests { let starting_balance = 10_000; let mint = Mint::new(starting_balance); - let bank = Bank::new(&mint); - let tvu = Arc::new(Tvu::new( - bank, - mint.last_id(), - Some(Duration::from_millis(30)), - )); - let replicate_addr = target1_data.replicate_addr; - let threads = Tvu::serve( - &tvu, - target1_data, - target1_gossip, - target1_serve, - target1_replicate, - leader_data, + let replicate_addr = target1.data.replicate_addr; + let bank = Arc::new(Bank::new(&mint)); + let tvu = Tvu::new( + bank.clone(), + target1.data, + target1.sockets.gossip, + target1.sockets.replicate, + leader.data, exit.clone(), - ).unwrap(); + ); let mut alice_ref_balance = starting_balance; let mut msgs = VecDeque::new(); @@ -292,8 +249,6 @@ mod tests { w.set_index(i).unwrap(); w.set_id(leader_id).unwrap(); - let bank = &tvu.bank; - let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let entry0 = Entry::new(&cur_hash, i, vec![tr0]); bank.register_entry_id(&cur_hash); @@ -333,7 +288,6 @@ mod tests { msgs.push(msg); } - let bank = &tvu.bank; let alice_balance = bank.get_balance(&mint.keypair().pubkey()).unwrap(); assert_eq!(alice_balance, alice_ref_balance); @@ -341,7 +295,7 @@ mod tests { assert_eq!(bob_balance, starting_balance - alice_ref_balance); exit.store(true, Ordering::Relaxed); - for t in threads { + for t in tvu.thread_hdls { t.join().expect("join"); } t2_gossip.join().expect("join"); @@ -351,4 +305,45 @@ mod tests { t_l_gossip.join().expect("join"); t_l_listen.join().expect("join"); } + pub struct Sockets { + pub gossip: UdpSocket, + pub requests: UdpSocket, + pub replicate: UdpSocket, + pub event: UdpSocket, + pub respond: UdpSocket, + pub broadcast: UdpSocket, + } + pub struct TestNode { + pub data: ReplicatedData, + pub sockets: Sockets, + } + impl TestNode { + pub fn new() -> TestNode { + let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); + let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); + let event = UdpSocket::bind("0.0.0.0:0").unwrap(); + let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); + let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let pubkey = KeyPair::new().pubkey(); + let data = ReplicatedData::new( + pubkey, + gossip.local_addr().unwrap(), + replicate.local_addr().unwrap(), + requests.local_addr().unwrap(), + event.local_addr().unwrap(), + ); + TestNode { + data: data, + sockets: Sockets { + gossip, + requests, + replicate, + event, + respond, + broadcast, + }, + } + } + } }