Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tvu cleanup2 #241

Merged
merged 8 commits into from
May 23, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ fn main() {
gossip_sock.local_addr().unwrap(),
replicate_sock.local_addr().unwrap(),
serve_sock.local_addr().unwrap(),
events_sock.local_addr().unwrap(),
);

let mut local = serve_sock.local_addr().unwrap();
Expand All @@ -139,7 +140,7 @@ fn main() {
let respond_socket = UdpSocket::bind(local.clone()).unwrap();

eprintln!("starting server...");
let server = Server::new(
let server = Server::leader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename that to new_leader?

bank,
last_id,
Some(Duration::from_millis(1000)),
Expand Down
15 changes: 11 additions & 4 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::thread::{sleep, spawn, JoinHandle};
use std::time::Duration;

/// Structure to be replicated by the network
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ReplicatedData {
pub id: PublicKey,
sig: Signature,
Expand All @@ -42,7 +42,9 @@ pub struct ReplicatedData {
/// address to connect to for replication
pub replicate_addr: SocketAddr,
/// address to connect to when this node is leader
pub serve_addr: SocketAddr,
pub requests_addr: SocketAddr,
/// events address
pub events_addr: SocketAddr,
/// current leader identity
current_leader_id: PublicKey,
/// last verified hash that was submitted to the leader
Expand All @@ -56,15 +58,17 @@ impl ReplicatedData {
id: PublicKey,
gossip_addr: SocketAddr,
replicate_addr: SocketAddr,
serve_addr: SocketAddr,
requests_addr: SocketAddr,
events_addr: SocketAddr,
) -> ReplicatedData {
ReplicatedData {
id,
sig: Signature::default(),
version: 0,
gossip_addr,
replicate_addr,
serve_addr,
requests_addr,
events_addr,
current_leader_id: PublicKey::default(),
last_verified_hash: Hash::default(),
last_verified_count: 0,
Expand Down Expand Up @@ -515,12 +519,14 @@ mod test {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
let events = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
events.local_addr().unwrap(),
);
let crdt = Crdt::new(d);
trace!(
Expand Down Expand Up @@ -632,6 +638,7 @@ mod test {
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
);
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone());
Expand Down
34 changes: 29 additions & 5 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use std::sync::atomic::AtomicBool;
use std::thread::JoinHandle;
use std::time::Duration;
use tpu::Tpu;
use tvu::Tvu;

pub struct Server {
pub thread_hdls: Vec<JoinHandle<()>>,
}

impl Server {
pub fn new<W: Write + Send + 'static>(
pub fn leader<W: Write + Send + 'static>(
bank: Bank,
start_hash: Hash,
tick_duration: Option<Duration>,
Expand All @@ -26,28 +27,51 @@ impl Server {
events_socket: UdpSocket,
broadcast_socket: UdpSocket,
respond_socket: UdpSocket,
gossip: UdpSocket,
gossip_socket: UdpSocket,
exit: Arc<AtomicBool>,
writer: W,
) -> Self {
let bank = Arc::new(bank);
let mut thread_hdls = vec![];
let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone());
thread_hdls.extend(rpu.thread_hdls);

let tpu = Tpu::new(
bank.clone(),
start_hash,
tick_duration,
me,
events_socket,
broadcast_socket,
gossip,
gossip_socket,
exit.clone(),
writer,
);
thread_hdls.extend(tpu.thread_hdls);

Server { thread_hdls }
}
pub fn validator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename that to new_validator?

bank: Bank,
me: ReplicatedData,
requests_socket: UdpSocket,
respond_socket: UdpSocket,
replicate_socket: UdpSocket,
gossip_socket: UdpSocket,
leader_repl_data: ReplicatedData,
exit: Arc<AtomicBool>,
) -> Self {
let bank = Arc::new(bank);
let mut thread_hdls = vec![];
let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone());
thread_hdls.extend(rpu.thread_hdls);
let tvu = Tvu::new(
bank.clone(),
me,
gossip_socket,
replicate_socket,
leader_repl_data,
exit.clone(),
);
thread_hdls.extend(tvu.thread_hdls);
Server { thread_hdls }
}
}
12 changes: 5 additions & 7 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,12 +650,14 @@ mod test {
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let serve = UdpSocket::bind("127.0.0.1:0").expect("bind");
let event = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let rep_data = ReplicatedData::new(
pubkey_me,
read.local_addr().unwrap(),
send.local_addr().unwrap(),
serve.local_addr().unwrap(),
event.local_addr().unwrap(),
);
let mut crdt_me = Crdt::new(rep_data);
let me_id = crdt_me.my_data().id;
Expand Down Expand Up @@ -712,21 +714,17 @@ mod test {
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
let event = UdpSocket::bind("127.0.0.1:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
event.local_addr().unwrap(),
);
trace!("data: {:?}", d);
let crdt = Crdt::new(d);
trace!(
"id: {} gossip: {} replicate: {} serve: {}",
crdt.my_data().id[0],
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
(Arc::new(RwLock::new(crdt)), gossip, replicate, serve)
}

Expand Down
Loading