From 1f456c55919c07a248e267eda7e7db93c7c99b1b Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 17 Apr 2018 19:26:19 -0700 Subject: [PATCH 01/28] state replication --- src/accountant_skel.rs | 53 ++++++++++++++++++++++++++++++++++++++++++ src/subscribers.rs | 5 ++-- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 41d8f90ede1292..aba4488f96c50b 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -247,6 +247,7 @@ impl AccountantSkel { } /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// This service is the network leader /// Set `exit` to shutdown its threads. pub fn serve( obj: &Arc>>, @@ -269,6 +270,57 @@ impl AccountantSkel { streamer::responder(write, exit.clone(), blob_recycler.clone(), blob_receiver); let (verified_sender, verified_receiver) = channel(); + let exit_ = exit.clone(); + let t_verifier = spawn(move || loop { + let e = Self::blob_verifier(&blob_receiver, &verified_sender); + if e.is_err() && exit_.load(Ordering::Relaxed) { + break; + } + }); + + let skel = obj.clone(); + let t_server = spawn(move || loop { + let e = AccountantSkel::process( + &skel, + &verified_receiver, + &blob_sender, + &packet_recycler, + &blob_recycler, + ); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }); + Ok(vec![t_receiver, t_responder, t_server, t_verifier]) + } + + /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// This service receives messages from a leader in the network + /// Set `exit` to shutdown its threads. + pub fn replicate( + obj: &Arc>>, + rsubs: Subscribers, + addr: &str, + exit: Arc, + ) -> Result>> { + let read = UdpSocket::bind(rsubs.me.addr)?; + // make sure we are on the same interface + let mut local = read.local_addr()?; + local.set_port(0); + let write = UdpSocket::bind(local)?; + + let blob_recycler = packet::BlobRecycler::default(); + let (blob_sender, blob_receiver) = channel(); + let t_blob_receiver = + streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; + let (window_sender, window_receiver) = channel(); + + let subs = Arc::new(RwLock::new(rsubs)); + + let t_window = + streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver); + let (verified_sender, verified_receiver) = channel(); + let exit_ = exit.clone(); let t_verifier = spawn(move || loop { let e = Self::verifier(&packet_receiver, &verified_sender); @@ -292,6 +344,7 @@ impl AccountantSkel { }); Ok(vec![t_receiver, t_responder, t_server, t_verifier]) } + } #[cfg(test)] diff --git a/src/subscribers.rs b/src/subscribers.rs index 153246d12adb31..3484ea01b30e52 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -40,18 +40,19 @@ impl Node { pub struct Subscribers { data: Vec, - me: Node, + pub me: Node, pub leader: Node, } impl Subscribers { - pub fn new(me: Node, leader: Node) -> Subscribers { + pub fn new(me: Node, leader: Node, network: &[Node]) -> Subscribers { let mut h = Subscribers { data: vec![], me: me.clone(), leader: leader.clone(), }; h.insert(&[me, leader]); + h.insert(network); h } From efeebf5394ed98131babb1c7aafa85065b1a1423 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 17 Apr 2018 20:09:37 -0700 Subject: [PATCH 02/28] wip --- src/accountant_skel.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index aba4488f96c50b..a8fa460e9255f1 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -300,7 +300,6 @@ impl AccountantSkel { pub fn replicate( obj: &Arc>>, rsubs: Subscribers, - addr: &str, exit: Arc, ) -> Result>> { let read = UdpSocket::bind(rsubs.me.addr)?; @@ -314,28 +313,27 @@ impl AccountantSkel { let t_blob_receiver = streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; let (window_sender, window_receiver) = channel(); + let (retransmit_sender, retransmit_receiver) = channel(); let subs = Arc::new(RwLock::new(rsubs)); + let t_retransmit = retransmitter( + write, + exit.clone(), + subs, + blob_recycler.clone(), + retransmit_receiver, + ); - let t_window = - streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver); - let (verified_sender, verified_receiver) = channel(); - let exit_ = exit.clone(); - let t_verifier = spawn(move || loop { - let e = Self::verifier(&packet_receiver, &verified_sender); - if e.is_err() && exit_.load(Ordering::Relaxed) { - break; - } - }); + let t_window = + streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver, window_sender, retransmit_sender); let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::process( + let e = AccountantSkel::replicate( &skel, &verified_receiver, &blob_sender, - &packet_recycler, &blob_recycler, ); if e.is_err() && exit.load(Ordering::Relaxed) { From d620ce53ebfec91e8f0923932b8c5bcea652f6bd Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 12:02:54 -0700 Subject: [PATCH 03/28] update --- src/accountant_skel.rs | 47 +++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index a8fa460e9255f1..10daa11b653337 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -22,7 +22,7 @@ use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; @@ -280,7 +280,7 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::process( + let e = Self::process( &skel, &verified_receiver, &blob_sender, @@ -294,9 +294,22 @@ impl AccountantSkel { Ok(vec![t_receiver, t_responder, t_server, t_verifier]) } - /// Create a UDP microservice that forwards messages the given AccountantSkel. - /// This service receives messages from a leader in the network - /// Set `exit` to shutdown its threads. + /// This service receives messages from a leader in the network and processes the transactions + /// on the accountant state. + /// # Arguments + /// * `obj` - The accoutnant state. + /// * `rsubs` - The subscribers. + /// * `exit` - The exit signal. + /// # Remarks + /// The pipeline is constructed as follows + /// 1. receive blobs from the network, these are out of order + /// 2. verify blobs, PoH, signatures + /// 3. reconstruct consequitive window + /// a. order the blobs + /// b. use erasure coding to reconstruct missing blobs + /// c. ask the network for missing blobs + /// 4. process the transaction state machine + /// 5. respond with the hash of the state back to the leader pub fn replicate( obj: &Arc>>, rsubs: Subscribers, @@ -323,26 +336,26 @@ impl AccountantSkel { blob_recycler.clone(), retransmit_receiver, ); - - - let t_window = - streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver, window_sender, retransmit_sender); + //TODO + //the packets comming out of blob_receiver need to be sent to the GPU and verified + //then sent to the window, which does the erasure coding reconstruction + let t_window = streamer::window( + exit.clone(), + blob_recycler.clone(), + blob_receiver, + window_sender, + retransmit_sender, + ); let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::replicate( - &skel, - &verified_receiver, - &blob_sender, - &blob_recycler, - ); + let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } }); - Ok(vec![t_receiver, t_responder, t_server, t_verifier]) + Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server]) } - } #[cfg(test)] From 073e5abbdedb6ad66db95572a7ba69dca3813807 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 12:05:12 -0700 Subject: [PATCH 04/28] docs --- src/accountant_skel.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 10daa11b653337..c6d2333f05b138 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -303,11 +303,12 @@ impl AccountantSkel { /// # Remarks /// The pipeline is constructed as follows /// 1. receive blobs from the network, these are out of order - /// 2. verify blobs, PoH, signatures - /// 3. reconstruct consequitive window + /// 2. verify blobs, PoH, signatures (TODO) + /// 3. reconstruct contiguous window /// a. order the blobs /// b. use erasure coding to reconstruct missing blobs /// c. ask the network for missing blobs + /// d. make sure that the blobs PoH sequences connect (TODO) /// 4. process the transaction state machine /// 5. respond with the hash of the state back to the leader pub fn replicate( From 0c610c4c1619ef20d8dfc7f45f9ce0909502d5a2 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 12:29:33 -0700 Subject: [PATCH 05/28] docs --- src/accountant_skel.rs | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index c6d2333f05b138..feb20a03da8b55 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -245,6 +245,34 @@ impl AccountantSkel { } Ok(()) } + /// Process verified blobs, already in order + /// Respond with a signed hash of the state + fn replicate_state( + obj: &Arc>>, + verified_receiver: &BlobReceiver, + blob_sender: &streamer::BlobSender, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let blobs = verified_receiver.recv_timeout(timer)?; + for msgs in blobs { + let entries = b.read().unwrap().data.deserialize(); + let req_vers = reqs.into_iter() + .zip(vers) + .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) + .filter(|x| x.0.verify()) + .collect(); + let rsps = obj.lock().unwrap().process_packets(req_vers)?; + let blobs = Self::serialize_responses(rsps, blob_recycler)?; + if !blobs.is_empty() { + //don't wake up the other side if there is nothing + blob_sender.send(blobs)?; + } + packet_recycler.recycle(msgs); + } + Ok(()) + } + /// Create a UDP microservice that forwards messages the given AccountantSkel. /// This service is the network leader @@ -307,7 +335,7 @@ impl AccountantSkel { /// 3. reconstruct contiguous window /// a. order the blobs /// b. use erasure coding to reconstruct missing blobs - /// c. ask the network for missing blobs + /// c. ask the network for missing blobs, if erasure coding is insufficient /// d. make sure that the blobs PoH sequences connect (TODO) /// 4. process the transaction state machine /// 5. respond with the hash of the state back to the leader From 5e11078f34ca1acac868b0047fd4969c858acc86 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 18 Apr 2018 17:22:58 -0600 Subject: [PATCH 06/28] Add Stephen --- LICENSE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LICENSE b/LICENSE index cae77a0e837cb0..1dee745e3c7e92 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright 2018 Anatoly Yakovenko and Greg Fitzgerald +Copyright 2018 Anatoly Yakovenko, Greg Fitzgerald and Stephen Akridge Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From dea5ab2f7983ff1df0d51e5a92afbf88037605e4 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Wed, 18 Apr 2018 19:34:57 -0700 Subject: [PATCH 07/28] Add erasure rust logic under feature flag --- Cargo.toml | 6 +- build.rs | 5 +- src/erasure.rs | 420 +++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 + 4 files changed, 430 insertions(+), 3 deletions(-) create mode 100644 src/erasure.rs diff --git a/Cargo.toml b/Cargo.toml index 65fd08cf2c7674..038a854775ebe1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,8 +6,9 @@ documentation = "https://docs.rs/solana" homepage = "http://loomprotocol.com/" repository = "https://github.com/solana-labs/solana" authors = [ - "Anatoly Yakovenko ", - "Greg Fitzgerald ", + "Anatoly Yakovenko ", + "Greg Fitzgerald ", + "Stephen Akridge ", ] license = "Apache-2.0" @@ -42,6 +43,7 @@ codecov = { repository = "solana-labs/solana", branch = "master", service = "git unstable = [] ipv6 = [] cuda = [] +erasure = [] [dependencies] rayon = "1.0.0" diff --git a/build.rs b/build.rs index b8d72fe8ed5652..844a92dc7edb97 100644 --- a/build.rs +++ b/build.rs @@ -1,12 +1,15 @@ use std::env; fn main() { + println!("cargo:rustc-link-search=native=."); if !env::var("CARGO_FEATURE_CUDA").is_err() { - println!("cargo:rustc-link-search=native=."); println!("cargo:rustc-link-lib=static=cuda_verify_ed25519"); println!("cargo:rustc-link-search=native=/usr/local/cuda/lib64"); println!("cargo:rustc-link-lib=dylib=cudart"); println!("cargo:rustc-link-lib=dylib=cuda"); println!("cargo:rustc-link-lib=dylib=cudadevrt"); } + if !env::var("CARGO_FEATURE_ERASURE").is_err() { + println!("cargo:rustc-link-lib=dylib=Jerasure"); + } } diff --git a/src/erasure.rs b/src/erasure.rs new file mode 100644 index 00000000000000..b8480a73d7529a --- /dev/null +++ b/src/erasure.rs @@ -0,0 +1,420 @@ +// Support erasure coding + +use packet::{BlobRecycler, SharedBlob}; +use std::result; + +//TODO(sakridge) pick these values +const NUM_CODED: usize = 10; +const MAX_MISSING: usize = 2; +const NUM_DATA: usize = NUM_CODED - MAX_MISSING; + +#[derive(Debug, PartialEq, Eq)] +pub enum ErasureError { + NotEnoughBlocksToDecode, + DecodeError, + InvalidBlockSize, +} + +pub type Result = result::Result; + +// k = number of data devices +// m = number of coding devices +// w = word size + +extern "C" { + fn jerasure_matrix_encode( + k: i32, + m: i32, + w: i32, + matrix: *const i32, + data_ptrs: *const *const u8, + coding_ptrs: *const *mut u8, + size: i32, + ); + fn jerasure_matrix_decode( + k: i32, + m: i32, + w: i32, + matrix: *const i32, + row_k_ones: i32, + erasures: *const i32, + data_ptrs: *const *mut u8, + coding_ptrs: *const *const u8, + size: i32, + ) -> i32; + fn galois_single_divide(a: i32, b: i32, w: i32) -> i32; +} + +fn get_matrix(m: i32, k: i32, w: i32) -> Vec { + let mut matrix = vec![0; (m * k) as usize]; + for i in 0..m { + for j in 0..k { + unsafe { + matrix[(i * k + j) as usize] = galois_single_divide(1, i ^ (m + j), w); + } + } + } + matrix +} + +pub const ERASURE_W: i32 = 32; + +// Generate coding blocks into coding +// There are some alignment restrictions, blocks should be aligned by 16 bytes +// which means their size should be >= 16 bytes +pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Result<()> { + if data.len() == 0 { + return Ok(()); + } + let m = coding.len() as i32; + let block_len = data[0].len(); + let matrix: Vec = get_matrix(m, data.len() as i32, ERASURE_W); + let mut coding_arg = Vec::new(); + let mut data_arg = Vec::new(); + for block in data { + if block_len != block.len() { + return Err(ErasureError::InvalidBlockSize); + } + data_arg.push(block.as_ptr()); + } + for mut block in coding { + if block_len != block.len() { + return Err(ErasureError::InvalidBlockSize); + } + coding_arg.push(block.as_mut_ptr()); + } + + unsafe { + jerasure_matrix_encode( + data.len() as i32, + m, + ERASURE_W, + matrix.as_ptr(), + data_arg.as_ptr(), + coding_arg.as_ptr(), + data[0].len() as i32, + ); + } + Ok(()) +} + +// Recover data + coding blocks into data blocks +// data: array of blocks to recover into +// coding: arry of coding blocks +// erasures: list of indices in data where blocks should be recovered +pub fn decode_blocks(data: &mut [&mut [u8]], coding: &[&[u8]], erasures: &[i32]) -> Result<()> { + if data.len() == 0 { + return Ok(()); + } + let block_len = data[0].len(); + let matrix: Vec = get_matrix(coding.len() as i32, data.len() as i32, ERASURE_W); + + // generate coding pointers, blocks should be the same size + let mut coding_arg: Vec<*const u8> = Vec::new(); + for x in coding.iter() { + if x.len() != block_len { + return Err(ErasureError::InvalidBlockSize); + } + coding_arg.push(x.as_ptr()); + } + + // generate data pointers, blocks should be the same size + let mut data_arg: Vec<*mut u8> = Vec::new(); + for x in data.iter_mut() { + if x.len() != block_len { + return Err(ErasureError::InvalidBlockSize); + } + data_arg.push(x.as_mut_ptr()); + } + unsafe { + let ret = jerasure_matrix_decode( + data.len() as i32, + coding.len() as i32, + ERASURE_W, + matrix.as_ptr(), + 0, + erasures.as_ptr(), + data_arg.as_ptr(), + coding_arg.as_ptr(), + data[0].len() as i32, + ); + trace!("jerasure_matrix_decode ret: {}", ret); + for x in data[erasures[0] as usize][0..8].iter() { + trace!("{} ", x) + } + trace!(""); + if ret < 0 { + return Err(ErasureError::DecodeError); + } + } + Ok(()) +} + +// Generate coding blocks in window from consumed to consumed+NUM_DATA +pub fn generate_coding( + re: &BlobRecycler, + window: &mut Vec>, + consumed: usize, +) -> Result<()> { + let mut data_blobs = Vec::new(); + let mut coding_blobs = Vec::new(); + let mut data_locks = Vec::new(); + let mut data_ptrs: Vec<&[u8]> = Vec::new(); + let mut coding_locks = Vec::new(); + let mut coding_ptrs: Vec<&mut [u8]> = Vec::new(); + for i in consumed..consumed + NUM_DATA { + let n = i % window.len(); + data_blobs.push(window[n].clone().unwrap()); + } + for b in &data_blobs { + data_locks.push(b.write().unwrap()); + } + for (i, l) in data_locks.iter_mut().enumerate() { + trace!("i: {} data: {}", i, l.data[0]); + data_ptrs.push(&l.data); + } + + // generate coding ptr array + let coding_start = consumed + NUM_DATA; + let coding_end = consumed + NUM_CODED; + for i in coding_start..coding_end { + let n = i % window.len(); + window[n] = Some(re.allocate()); + coding_blobs.push(window[n].clone().unwrap()); + } + for b in &coding_blobs { + coding_locks.push(b.write().unwrap()); + } + for (i, l) in coding_locks.iter_mut().enumerate() { + trace!("i: {} data: {}", i, l.data[0]); + coding_ptrs.push(&mut l.data); + } + + generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; + trace!("consumed: {}", consumed); + Ok(()) +} + +// Recover missing blocks into window +// missing blocks should be None, will use re +// to allocate new ones. Returns err if not enough +// coding blocks are present to restore +pub fn recover( + re: &BlobRecycler, + window: &mut Vec>, + consumed: usize, +) -> Result<()> { + //recover with erasure coding + let mut data_missing = 0; + let mut coded_missing = 0; + let coding_start = consumed + NUM_DATA; + let coding_end = consumed + NUM_CODED; + for i in consumed..coding_end { + let n = i % window.len(); + if window[n].is_none() { + if i >= coding_start { + coded_missing += 1; + } else { + data_missing += 1; + } + } + } + trace!("missing: data: {} coding: {}", data_missing, coded_missing); + if data_missing > 0 { + if (data_missing + coded_missing) <= MAX_MISSING { + let mut blobs: Vec = Vec::new(); + let mut locks = Vec::new(); + let mut data_ptrs: Vec<&mut [u8]> = Vec::new(); + let mut coding_ptrs: Vec<&[u8]> = Vec::new(); + let mut erasures: Vec = Vec::new(); + for i in consumed..coding_end { + let j = i % window.len(); + let mut b = &mut window[j]; + if b.is_some() { + blobs.push(b.clone().unwrap()); + continue; + } + let n = re.allocate(); + *b = Some(n.clone()); + //mark the missing memory + blobs.push(n); + erasures.push((i - consumed) as i32); + } + erasures.push(-1); + trace!("erasures: {:?}", erasures); + //lock everything + for b in &blobs { + locks.push(b.write().unwrap()); + } + for (i, l) in locks.iter_mut().enumerate() { + if i >= NUM_DATA { + trace!("pushing coding: {}", i); + coding_ptrs.push(&l.data); + } else { + trace!("pushing data: {}", i); + data_ptrs.push(&mut l.data); + } + } + trace!( + "coding_ptrs.len: {} data_ptrs.len {}", + coding_ptrs.len(), + data_ptrs.len() + ); + decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?; + } else { + return Err(ErasureError::NotEnoughBlocksToDecode); + } + } + Ok(()) +} + +#[cfg(test)] +mod test { + use erasure; + use packet::{BlobRecycler, SharedBlob, PACKET_DATA_SIZE}; + extern crate env_logger; + + #[test] + pub fn test_coding() { + let zero_vec = vec![0; 16]; + let mut vs: Vec> = (0..4).map(|i| (i..(16 + i)).collect()).collect(); + let v_orig: Vec = vs[0].clone(); + + let m = 2; + let mut coding_blocks: Vec<_> = (0..m).map(|_| vec![0u8; 16]).collect(); + + { + let mut coding_blocks_slices: Vec<_> = + coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect(); + let v_slices: Vec<_> = vs.iter().map(|x| x.as_slice()).collect(); + + assert!( + erasure::generate_coding_blocks( + coding_blocks_slices.as_mut_slice(), + v_slices.as_slice() + ).is_ok() + ); + } + trace!("coding blocks:"); + for b in &coding_blocks { + trace!("{:?}", b); + } + let erasure: i32 = 1; + let erasures = vec![erasure, -1]; + // clear an entry + vs[erasure as usize].copy_from_slice(zero_vec.as_slice()); + + { + let coding_blocks_slices: Vec<_> = coding_blocks.iter().map(|x| x.as_slice()).collect(); + let mut v_slices: Vec<_> = vs.iter_mut().map(|x| x.as_mut_slice()).collect(); + + assert!( + erasure::decode_blocks( + v_slices.as_mut_slice(), + coding_blocks_slices.as_slice(), + erasures.as_slice(), + ).is_ok() + ); + } + + trace!("vs:"); + for v in &vs { + trace!("{:?}", v); + } + assert_eq!(v_orig, vs[0]); + } + + fn print_window(window: &Vec>) { + for (i, w) in window.iter().enumerate() { + print!("window({}): ", i); + if w.is_some() { + let window_lock = w.clone().unwrap(); + let window_data = window_lock.read().unwrap().data; + for i in 0..8 { + print!("{} ", window_data[i]); + } + } else { + print!("null"); + } + println!(""); + } + } + + #[test] + pub fn test_window_recover() { + let mut window = Vec::new(); + let blob_recycler = BlobRecycler::default(); + let offset = 4; + for i in 0..(4 * erasure::NUM_CODED + 1) { + let b = blob_recycler.allocate(); + let b_ = b.clone(); + let data_len = b.read().unwrap().data.len(); + let mut w = b.write().unwrap(); + w.set_index(i as u64).unwrap(); + assert_eq!(i as u64, w.get_index().unwrap()); + w.meta.size = PACKET_DATA_SIZE; + for k in 0..data_len { + w.data[k] = (k + i) as u8; + } + window.push(Some(b_)); + } + println!("** after-gen:"); + print_window(&window); + assert!(erasure::generate_coding(&blob_recycler, &mut window, offset).is_ok()); + assert!( + erasure::generate_coding(&blob_recycler, &mut window, offset + erasure::NUM_CODED) + .is_ok() + ); + assert!( + erasure::generate_coding( + &blob_recycler, + &mut window, + offset + (2 * erasure::NUM_CODED) + ).is_ok() + ); + assert!( + erasure::generate_coding( + &blob_recycler, + &mut window, + offset + (3 * erasure::NUM_CODED) + ).is_ok() + ); + println!("** after-coding:"); + print_window(&window); + let refwindow = window[offset + 1].clone(); + window[offset + 1] = None; + window[offset + 2] = None; + window[offset + erasure::NUM_CODED + 3] = None; + window[offset + (2 * erasure::NUM_CODED) + 0] = None; + window[offset + (2 * erasure::NUM_CODED) + 1] = None; + window[offset + (2 * erasure::NUM_CODED) + 2] = None; + let window_l0 = &(window[offset + (3 * erasure::NUM_CODED)]).clone().unwrap(); + window_l0.write().unwrap().data[0] = 55; + println!("** after-nulling:"); + print_window(&window); + assert!(erasure::recover(&blob_recycler, &mut window, offset).is_ok()); + assert!(erasure::recover(&blob_recycler, &mut window, offset + erasure::NUM_CODED).is_ok()); + assert!( + erasure::recover( + &blob_recycler, + &mut window, + offset + (2 * erasure::NUM_CODED) + ).is_err() + ); + assert!( + erasure::recover( + &blob_recycler, + &mut window, + offset + (3 * erasure::NUM_CODED) + ).is_ok() + ); + println!("** after-restore:"); + print_window(&window); + let window_l = window[offset + 1].clone().unwrap(); + let ref_l = refwindow.clone().unwrap(); + assert_eq!( + window_l.read().unwrap().data.to_vec(), + ref_l.read().unwrap().data.to_vec() + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index 54ae1656b8e266..7cb82ff53327c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,8 @@ pub mod signature; pub mod streamer; pub mod subscribers; pub mod transaction; +#[cfg(feature = "erasure")] +pub mod erasure; extern crate bincode; extern crate byteorder; extern crate chrono; From 3622533ffc703418930dd257dd5e472a100512ad Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 20:12:30 -0700 Subject: [PATCH 08/28] wip --- src/accountant_skel.rs | 22 ++++++++-------------- src/streamer.rs | 36 ++++++++++++++++++------------------ src/subscribers.rs | 4 ++-- 3 files changed, 28 insertions(+), 34 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index feb20a03da8b55..99f846cf45263d 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -256,20 +256,13 @@ impl AccountantSkel { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; for msgs in blobs { - let entries = b.read().unwrap().data.deserialize(); - let req_vers = reqs.into_iter() - .zip(vers) - .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) - .filter(|x| x.0.verify()) - .collect(); - let rsps = obj.lock().unwrap().process_packets(req_vers)?; - let blobs = Self::serialize_responses(rsps, blob_recycler)?; - if !blobs.is_empty() { - //don't wake up the other side if there is nothing - blob_sender.send(blobs)?; + let entries:Vec = b.read().unwrap().data.deserialize()?; + for e in entries { + obj.lock().unwrap().acc.process_verified_events(e.events)?; } - packet_recycler.recycle(msgs); + //TODO respond back to leader with hash of the state } + blob_recycler.recycle(msgs); Ok(()) } @@ -325,11 +318,11 @@ impl AccountantSkel { /// This service receives messages from a leader in the network and processes the transactions /// on the accountant state. /// # Arguments - /// * `obj` - The accoutnant state. + /// * `obj` - The accountant state. /// * `rsubs` - The subscribers. /// * `exit` - The exit signal. /// # Remarks - /// The pipeline is constructed as follows + /// The pipeline is constructed as follows: /// 1. receive blobs from the network, these are out of order /// 2. verify blobs, PoH, signatures (TODO) /// 3. reconstruct contiguous window @@ -370,6 +363,7 @@ impl AccountantSkel { //then sent to the window, which does the erasure coding reconstruction let t_window = streamer::window( exit.clone(), + subs, blob_recycler.clone(), blob_receiver, window_sender, diff --git a/src/streamer.rs b/src/streamer.rs index 368365ec7f4550..d0847d13f61b05 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -8,7 +8,7 @@ use std::sync::mpsc; use std::sync::{Arc, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; -use subscribers; +use subscribers::Subscribers; pub type PacketReceiver = mpsc::Receiver; pub type PacketSender = mpsc::Sender; @@ -106,12 +106,12 @@ pub fn blob_receiver( fn recv_window( window: &mut Vec>, - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, consumed: &mut usize, r: &BlobReceiver, s: &BlobSender, - cast: &BlobSender, + retransmit: &BlobSender, ) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; @@ -120,7 +120,7 @@ fn recv_window( } { //retransmit all leader blocks - let mut castq = VecDeque::new(); + let mut retransmitq = VecDeque::new(); let rsubs = subs.read().unwrap(); for b in &dq { let p = b.read().unwrap(); @@ -141,11 +141,11 @@ fn recv_window( mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); } - castq.push_back(nv); + retransmitq.push_back(nv); } } - if !castq.is_empty() { - cast.send(castq)?; + if !retransmitq.is_empty() { + retransmit.send(retransmitq)?; } } //send a contiguous set of blocks @@ -183,11 +183,11 @@ fn recv_window( pub fn window( exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, s: BlobSender, - cast: BlobSender, + retransmit: BlobSender, ) -> JoinHandle<()> { spawn(move || { let mut window = vec![None; NUM_BLOBS]; @@ -196,13 +196,13 @@ pub fn window( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &cast); + let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &retransmit); } }) } fn retransmit( - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, r: &BlobReceiver, sock: &UdpSocket, @@ -237,7 +237,7 @@ fn retransmit( pub fn retransmitter( sock: UdpSocket, exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, ) -> JoinHandle<()> { @@ -442,20 +442,21 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::new([0; 8], 0, send.local_addr().unwrap()), + &[], ))); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap(); let (s_window, r_window) = channel(); - let (s_cast, r_cast) = channel(); + let (s_retransmit, r_retransmit) = channel(); let t_window = window( exit.clone(), subs, resp_recycler.clone(), r_reader, s_window, - s_cast, + s_retransmit, ); let (s_responder, r_responder) = channel(); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); @@ -475,8 +476,8 @@ mod test { let mut num = 0; get_blobs(r_window, &mut num); assert_eq!(num, 10); - let mut q = r_cast.recv().unwrap(); - while let Ok(mut nq) = r_cast.try_recv() { + let mut q = r_retransmit.recv().unwrap(); + while let Ok(mut nq) = r_retransmit.try_recv() { q.append(&mut nq); } assert_eq!(q.len(), 10); @@ -494,9 +495,8 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::default(), + &[Node::new([0; 8], 1, read.local_addr().unwrap())] ))); - let n3 = Node::new([0; 8], 1, read.local_addr().unwrap()); - subs.write().unwrap().insert(&[n3]); let (s_retransmit, r_retransmit) = channel(); let blob_recycler = BlobRecycler::default(); let saddr = send.local_addr().unwrap(); diff --git a/src/subscribers.rs b/src/subscribers.rs index 3484ea01b30e52..b81a54941b599a 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -100,7 +100,7 @@ mod test { me.weight = 10; let mut leader = Node::default(); leader.weight = 11; - let mut s = Subscribers::new(me, leader); + let mut s = Subscribers::new(me, leader, &[]); assert_eq!(s.data.len(), 2); assert_eq!(s.data[0].weight, 11); assert_eq!(s.data[1].weight, 10); @@ -117,7 +117,7 @@ mod test { let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind"); let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap()); let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap()); - let mut s = Subscribers::new(n1.clone(), n2.clone()); + let mut s = Subscribers::new(n1.clone(), n2.clone(), &[]); let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap()); s.insert(&[n3]); let mut b = Blob::default(); From d366a07403c9b42808aea769987b6ba64f427d81 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 20:17:37 -0700 Subject: [PATCH 09/28] add gregs abstract as an intro --- Cargo.toml | 2 +- README.md | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 038a854775ebe1..5b8e9da12b485d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "solana" description = "High Performance Blockchain" version = "0.4.0" documentation = "https://docs.rs/solana" -homepage = "http://loomprotocol.com/" +homepage = "http://solana.io/" repository = "https://github.com/solana-labs/solana" authors = [ "Anatoly Yakovenko ", diff --git a/README.md b/README.md index fa3a9ba1fffea6..634ab131ed1a8d 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,11 @@ Solana: High Performance Blockchain Solana™ is a new architecture for a high performance blockchain. It aims to support over 700 thousand transactions per second on a gigabit network. +Introduction +=== + +It's possible for a centralized database to process 710,000 transactions per second on a standard gigabit network if the transactions are, on average, no more than 178 bytes. A centralized database can also replicate itself and maintain high availability without significantly compromising that transaction rate using the distributed system technique known as Optimistic Concurrency Control [H.T.Kung, J.T.Robinson (1981)]. At Solana, we're demonstrating that these same theoretical limits apply just as well to blockchain on an adversarial network. The key ingredient? Finding a way to share time when nodes can't trust one-another. Once nodes can trust time, suddenly ~40 years of distributed systems research becomes applicable to blockchain! Furthermore, and much to our surprise, it can implemented using a mechanism that has existed in Bitcoin since day one. The Bitcoin feature is called nLocktime and it can be used to postdate transactions using block height instead of a timestamp. As a Bitcoin client, you'd use block height instead of a timestamp if you don't trust the network. Block height turns out to be an instance of what's being called a Verifiable Delay Function in cryptography circles. It's a cryptographically secure way to say time has passed. In Solana, we use a far more granular verifiable delay function, a SHA 256 hash chain, to checkpoint the ledger and coordinate consensus. With it, we implement Optimistic Concurrency Control and are now well in route towards that theoretical limit of 710,000 transactions per second. + Running the demo === From 13a2f057763caaef86e740649b4a306f7f5c28e0 Mon Sep 17 00:00:00 2001 From: kwangin Date: Thu, 19 Apr 2018 23:00:16 +0900 Subject: [PATCH 10/28] Remove out for immutable variable --- src/streamer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/streamer.rs b/src/streamer.rs index 368365ec7f4550..33882c31dcffda 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -152,7 +152,7 @@ fn recv_window( let mut contq = VecDeque::new(); while let Some(b) = dq.pop_front() { let b_ = b.clone(); - let mut p = b.write().unwrap(); + let p = b.write().unwrap(); let pix = p.get_index()? as usize; let w = pix % NUM_BLOBS; //TODO, after the block are authenticated From b91f6bcbff4d03ad055aa14140c5ad4b8474b9d8 Mon Sep 17 00:00:00 2001 From: rleungx Date: Thu, 19 Apr 2018 22:06:19 +0800 Subject: [PATCH 11/28] report parse errors to stderr --- src/bin/client-demo.rs | 6 +++++- src/bin/testnode.rs | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 0829c15e2f3a2e..50b2dd064029f3 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -12,6 +12,7 @@ use solana::transaction::Transaction; use std::env; use std::io::stdin; use std::net::UdpSocket; +use std::process::exit; use std::thread::sleep; use std::time::{Duration, Instant}; @@ -27,7 +28,10 @@ fn main() { let args: Vec = env::args().collect(); let matches = match opts.parse(&args[1..]) { Ok(m) => m, - Err(f) => panic!(f.to_string()), + Err(e) => { + eprintln!("{}", e); + exit(1); + } }; if matches.opt_present("s") { diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 3e0ac842664e0c..068a4c839ba53f 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -11,6 +11,7 @@ use solana::event::Event; use solana::historian::Historian; use std::env; use std::io::{self, stdout, BufRead}; +use std::process::exit; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex}; @@ -22,7 +23,10 @@ fn main() { let args: Vec = env::args().collect(); let matches = match opts.parse(&args[1..]) { Ok(m) => m, - Err(f) => panic!(f.to_string()), + Err(e) => { + eprintln!("{}", e); + exit(1); + } }; if matches.opt_present("p") { port = matches.opt_str("p").unwrap().parse().expect("port"); From 60015aee04f9a8ec64046b37df2ce7607e534620 Mon Sep 17 00:00:00 2001 From: rleungx Date: Thu, 19 Apr 2018 22:55:47 +0800 Subject: [PATCH 12/28] report serde parse errors to stderr --- src/bin/client-demo.rs | 5 ++++- src/bin/genesis-demo.rs | 12 ++++++++++-- src/bin/genesis.rs | 12 ++++++++++-- src/bin/mint.rs | 7 ++++++- src/bin/testnode.rs | 10 ++++++---- 5 files changed, 36 insertions(+), 10 deletions(-) diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 50b2dd064029f3..1482c17df2c8a0 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -43,7 +43,10 @@ fn main() { if matches.opt_present("t") { threads = matches.opt_str("t").unwrap().parse().expect("integer"); } - let mint: Mint = serde_json::from_reader(stdin()).unwrap(); + let mint: Mint = serde_json::from_reader(stdin()).unwrap_or_else(|e| { + eprintln!("failed to parse json: {}", e); + exit(1); + }); let mint_keypair = mint.keypair(); let mint_pubkey = mint.pubkey(); diff --git a/src/bin/genesis-demo.rs b/src/bin/genesis-demo.rs index bcf26bc0e0aea9..ff983d9a10b742 100644 --- a/src/bin/genesis-demo.rs +++ b/src/bin/genesis-demo.rs @@ -8,13 +8,17 @@ use solana::mint::Mint; use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; use solana::transaction::Transaction; use std::io::stdin; +use std::process::exit; fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Event { Event::Transaction(Transaction::new(from, to, tokens, last_id)) } fn main() { - let mint: Mint = serde_json::from_reader(stdin()).unwrap(); + let mint: Mint = serde_json::from_reader(stdin()).unwrap_or_else(|e| { + eprintln!("failed to parse json: {}", e); + exit(1); + }); let mut entries = mint.create_entries(); let from = mint.keypair(); @@ -25,6 +29,10 @@ fn main() { entries.push(create_entry(&seed, 0, events)); for entry in entries { - println!("{}", serde_json::to_string(&entry).unwrap()); + let serialized = serde_json::to_string(&entry).unwrap_or_else(|e| { + eprintln!("failed to serialize: {}", e); + exit(1); + }); + println!("{}", serialized); } } diff --git a/src/bin/genesis.rs b/src/bin/genesis.rs index 66d2357b542250..10a1437d738a79 100644 --- a/src/bin/genesis.rs +++ b/src/bin/genesis.rs @@ -5,10 +5,18 @@ extern crate solana; use solana::mint::Mint; use std::io::stdin; +use std::process::exit; fn main() { - let mint: Mint = serde_json::from_reader(stdin()).unwrap(); + let mint: Mint = serde_json::from_reader(stdin()).unwrap_or_else(|e| { + eprintln!("failed to parse json: {}", e); + exit(1); + }); for x in mint.create_entries() { - println!("{}", serde_json::to_string(&x).unwrap()); + let serialized = serde_json::to_string(&x).unwrap_or_else(|e| { + eprintln!("failed to serialize: {}", e); + exit(1); + }); + println!("{}", serialized); } } diff --git a/src/bin/mint.rs b/src/bin/mint.rs index 880257af7fd514..9dab5bc76fe5ce 100644 --- a/src/bin/mint.rs +++ b/src/bin/mint.rs @@ -3,6 +3,7 @@ extern crate solana; use solana::mint::Mint; use std::io; +use std::process::exit; fn main() { let mut input_text = String::new(); @@ -11,5 +12,9 @@ fn main() { let tokens = trimmed.parse::().unwrap(); let mint = Mint::new(tokens); - println!("{}", serde_json::to_string(&mint).unwrap()); + let serialized = serde_json::to_string(&mint).unwrap_or_else(|e| { + eprintln!("failed to serialize: {}", e); + exit(1); + }); + println!("{}", serialized); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 068a4c839ba53f..a8587e3ed7de71 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -33,10 +33,12 @@ fn main() { } let addr = format!("0.0.0.0:{}", port); let stdin = io::stdin(); - let mut entries = stdin - .lock() - .lines() - .map(|line| serde_json::from_str(&line.unwrap()).unwrap()); + let mut entries = stdin.lock().lines().map(|line| { + serde_json::from_str(&line.unwrap()).unwrap_or_else(|e| { + eprintln!("failed to parse json: {}", e); + exit(1); + }) + }); // The first item in the ledger is required to be an entry with zero num_hashes, // which implies its id can be used as the ledger's seed. From c6048e2bab4f8fa5f4cda6a9cd2f3961f2371f3c Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 19 Apr 2018 10:02:46 -0700 Subject: [PATCH 13/28] Workaround linux hang Without this patch, Linux systems would hang when running the demo. The root cause (why Linux is acting differently than macOS) was not determined, but we know the problem is caused by a known issue in the transaction pipeline - that entries are not pulled off the historian channel until after the full transaction batch is processed. This patch makes the sync_channel large enough that it should never block on a gigabit network. --- src/historian.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/historian.rs b/src/historian.rs index 93c49f733fe56a..412027846fab03 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -16,8 +16,8 @@ pub struct Historian { impl Historian { pub fn new(start_hash: &Hash, ms_per_tick: Option) -> Self { - let (sender, event_receiver) = sync_channel(1000); - let (entry_sender, receiver) = sync_channel(1000); + let (sender, event_receiver) = sync_channel(10_000); + let (entry_sender, receiver) = sync_channel(10_000); let thread_hdl = Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian { From 8181bc591b6ba5a0b9752f7c5c37e1d42cf5a9b5 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Thu, 19 Apr 2018 10:16:20 -0700 Subject: [PATCH 14/28] Add -h/--help options for client-demo and testnode --- src/bin/client-demo.rs | 15 +++++++++++++++ src/bin/testnode.rs | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 1482c17df2c8a0..bc059a3347ad29 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -16,6 +16,15 @@ use std::process::exit; use std::thread::sleep; use std::time::{Duration, Instant}; +fn print_usage(program: &str, opts: Options) { + let mut brief = format!("Usage: cat | {} [options]\n\n", program); + brief += " Solana client demo creates a number of transactions and\n"; + brief += " sends them to a target node."; + brief += " Takes json formatted mint file to stdin."; + + print!("{}", opts.usage(&brief)); +} + fn main() { let mut threads = 4usize; let mut addr: String = "127.0.0.1:8000".to_string(); @@ -25,6 +34,7 @@ fn main() { opts.optopt("s", "", "server address", "host:port"); opts.optopt("c", "", "client address", "host:port"); opts.optopt("t", "", "number of threads", "4"); + opts.optflag("h", "help", "print help"); let args: Vec = env::args().collect(); let matches = match opts.parse(&args[1..]) { Ok(m) => m, @@ -34,6 +44,11 @@ fn main() { } }; + if matches.opt_present("h") { + let program = args[0].clone(); + print_usage(&program, opts); + return; + } if matches.opt_present("s") { addr = matches.opt_str("s").unwrap(); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index a8587e3ed7de71..4d692f3f3f5fa6 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -15,11 +15,21 @@ use std::process::exit; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex}; +fn print_usage(program: &str, opts: Options) { + let mut brief = format!("Usage: cat | {} [options]\n\n", program); + brief += " Run a Solana node to handle transactions and\n"; + brief += " write a new transaction log to stdout.\n"; + brief += " Takes existing transaction log from stdin."; + + print!("{}", opts.usage(&brief)); +} + fn main() { env_logger::init().unwrap(); let mut port = 8000u16; let mut opts = Options::new(); opts.optopt("p", "", "port", "port"); + opts.optflag("h", "help", "print help"); let args: Vec = env::args().collect(); let matches = match opts.parse(&args[1..]) { Ok(m) => m, @@ -28,6 +38,11 @@ fn main() { exit(1); } }; + if matches.opt_present("h") { + let program = args[0].clone(); + print_usage(&program, opts); + return; + } if matches.opt_present("p") { port = matches.opt_str("p").unwrap().parse().expect("port"); } From 7390d6c97d584bd8c51e820a9ea4ed7ceac765c4 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 19 Apr 2018 10:32:02 -0700 Subject: [PATCH 15/28] update --- src/accountant_skel.rs | 5 +++-- src/result.rs | 7 +++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 99f846cf45263d..09b0c438a62f18 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -27,6 +27,7 @@ use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; use transaction::Transaction; +use subscribers::Subscribers; pub struct AccountantSkel { acc: Accountant, @@ -293,7 +294,7 @@ impl AccountantSkel { let exit_ = exit.clone(); let t_verifier = spawn(move || loop { - let e = Self::blob_verifier(&blob_receiver, &verified_sender); + let e = Self::verifier(&packet_receiver, &verified_sender); if e.is_err() && exit_.load(Ordering::Relaxed) { break; } @@ -351,7 +352,7 @@ impl AccountantSkel { let (retransmit_sender, retransmit_receiver) = channel(); let subs = Arc::new(RwLock::new(rsubs)); - let t_retransmit = retransmitter( + let t_retransmit = streamer::retransmitter( write, exit.clone(), subs, diff --git a/src/result.rs b/src/result.rs index 9b3c17a3695fde..01872dfbe1138c 100644 --- a/src/result.rs +++ b/src/result.rs @@ -4,6 +4,7 @@ use bincode; use serde_json; use std; use std::any::Any; +use accountant; #[derive(Debug)] pub enum Error { @@ -14,6 +15,7 @@ pub enum Error { RecvError(std::sync::mpsc::RecvError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), + AccountingError(accountant::AccountingError), SendError, Services, } @@ -30,6 +32,11 @@ impl std::convert::From for Error { Error::RecvTimeoutError(e) } } +impl std::convert::From for Error { + fn from(e: accountant::AccountingError) -> Error { + Error::AccountingError(e) + } +} impl std::convert::From> for Error { fn from(_e: std::sync::mpsc::SendError) -> Error { Error::SendError From 8cbb7d736238b7cec0532ae99a8afa97aae9da29 Mon Sep 17 00:00:00 2001 From: Ankur Srivastava Date: Fri, 20 Apr 2018 23:02:10 +0200 Subject: [PATCH 16/28] git clone instruction --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 634ab131ed1a8d..611be8a8361a77 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,13 @@ $ curl https://sh.rustup.rs -sSf | sh $ source $HOME/.cargo/env ``` +Now checkout the code from github: + +```bash +$ git clone https://github.com/solana-labs/solana.git +$ cd solana +``` + The testnode server is initialized with a ledger from stdin and generates new ledger entries on stdout. To create the input ledger, we'll need to create *the mint* and use it to generate a *genesis ledger*. It's done in From 3da1fa4d88f0b8850aead69bbe0baea913f9b171 Mon Sep 17 00:00:00 2001 From: rleungx Date: Sat, 21 Apr 2018 21:12:57 +0800 Subject: [PATCH 17/28] improve the error messages --- Cargo.toml | 2 +- src/bin/client-demo.rs | 19 +++++++++++++++++-- src/bin/genesis-demo.rs | 18 ++++++++++++++++-- src/bin/genesis.rs | 18 ++++++++++++++++-- src/bin/mint.rs | 13 +++++++++++-- src/bin/testnode.rs | 22 ++++++++++++++++++---- src/lib.rs | 4 ++-- 7 files changed, 81 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5b8e9da12b485d..73e4022c86f578 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,4 +62,4 @@ matches = "^0.1.6" byteorder = "^1.2.1" libc = "^0.2.1" getopts = "^0.2" - +isatty = "0.1" diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index bc059a3347ad29..962141720bb861 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -1,16 +1,18 @@ extern crate getopts; +extern crate isatty; extern crate rayon; extern crate serde_json; extern crate solana; use getopts::Options; +use isatty::stdin_isatty; use rayon::prelude::*; use solana::accountant_stub::AccountantStub; use solana::mint::Mint; use solana::signature::{KeyPair, KeyPairUtil}; use solana::transaction::Transaction; use std::env; -use std::io::stdin; +use std::io::{stdin, Read}; use std::net::UdpSocket; use std::process::exit; use std::thread::sleep; @@ -58,7 +60,20 @@ fn main() { if matches.opt_present("t") { threads = matches.opt_str("t").unwrap().parse().expect("integer"); } - let mint: Mint = serde_json::from_reader(stdin()).unwrap_or_else(|e| { + + if stdin_isatty() { + eprintln!("nothing found on stdin, expected a json file"); + exit(1); + } + + let mut buffer = String::new(); + let num_bytes = stdin().read_to_string(&mut buffer).unwrap(); + if num_bytes == 0 { + eprintln!("empty file on stdin, expected a json file"); + exit(1); + } + + let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); }); diff --git a/src/bin/genesis-demo.rs b/src/bin/genesis-demo.rs index ff983d9a10b742..340f09c6cfd36b 100644 --- a/src/bin/genesis-demo.rs +++ b/src/bin/genesis-demo.rs @@ -1,13 +1,15 @@ +extern crate isatty; extern crate serde_json; extern crate solana; +use isatty::stdin_isatty; use solana::entry::create_entry; use solana::event::Event; use solana::hash::Hash; use solana::mint::Mint; use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; use solana::transaction::Transaction; -use std::io::stdin; +use std::io::{stdin, Read}; use std::process::exit; fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Event { @@ -15,7 +17,19 @@ fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Ev } fn main() { - let mint: Mint = serde_json::from_reader(stdin()).unwrap_or_else(|e| { + if stdin_isatty() { + eprintln!("nothing found on stdin, expected a json file"); + exit(1); + } + + let mut buffer = String::new(); + let num_bytes = stdin().read_to_string(&mut buffer).unwrap(); + if num_bytes == 0 { + eprintln!("empty file on stdin, expected a json file"); + exit(1); + } + + let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); }); diff --git a/src/bin/genesis.rs b/src/bin/genesis.rs index 10a1437d738a79..07d7dc89fc0f85 100644 --- a/src/bin/genesis.rs +++ b/src/bin/genesis.rs @@ -1,14 +1,28 @@ //! A command-line executable for generating the chain's genesis block. +extern crate isatty; extern crate serde_json; extern crate solana; +use isatty::stdin_isatty; use solana::mint::Mint; -use std::io::stdin; +use std::io::{stdin, Read}; use std::process::exit; fn main() { - let mint: Mint = serde_json::from_reader(stdin()).unwrap_or_else(|e| { + if stdin_isatty() { + eprintln!("nothing found on stdin, expected a json file"); + exit(1); + } + + let mut buffer = String::new(); + let num_bytes = stdin().read_to_string(&mut buffer).unwrap(); + if num_bytes == 0 { + eprintln!("empty file on stdin, expected a json file"); + exit(1); + } + + let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); }); diff --git a/src/bin/mint.rs b/src/bin/mint.rs index 9dab5bc76fe5ce..73a67fb129897a 100644 --- a/src/bin/mint.rs +++ b/src/bin/mint.rs @@ -1,16 +1,25 @@ +extern crate isatty; extern crate serde_json; extern crate solana; +use isatty::stdin_isatty; use solana::mint::Mint; use std::io; use std::process::exit; fn main() { let mut input_text = String::new(); + if stdin_isatty() { + eprintln!("nothing found on stdin, expected a token number"); + exit(1); + } + io::stdin().read_line(&mut input_text).unwrap(); let trimmed = input_text.trim(); - let tokens = trimmed.parse::().unwrap(); - + let tokens = trimmed.parse::().unwrap_or_else(|e| { + eprintln!("{}", e); + exit(1); + }); let mint = Mint::new(tokens); let serialized = serde_json::to_string(&mint).unwrap_or_else(|e| { eprintln!("failed to serialize: {}", e); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 4d692f3f3f5fa6..3fa995a718fa0b 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -1,16 +1,18 @@ extern crate env_logger; extern crate getopts; +extern crate isatty; extern crate serde_json; extern crate solana; use getopts::Options; +use isatty::stdin_isatty; use solana::accountant::Accountant; use solana::accountant_skel::AccountantSkel; use solana::entry::Entry; use solana::event::Event; use solana::historian::Historian; use std::env; -use std::io::{self, stdout, BufRead}; +use std::io::{stdin, stdout, Read}; use std::process::exit; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex}; @@ -47,9 +49,21 @@ fn main() { port = matches.opt_str("p").unwrap().parse().expect("port"); } let addr = format!("0.0.0.0:{}", port); - let stdin = io::stdin(); - let mut entries = stdin.lock().lines().map(|line| { - serde_json::from_str(&line.unwrap()).unwrap_or_else(|e| { + + if stdin_isatty() { + eprintln!("nothing found on stdin, expected a log file"); + exit(1); + } + + let mut buffer = String::new(); + let num_bytes = stdin().read_to_string(&mut buffer).unwrap(); + if num_bytes == 0 { + eprintln!("empty file on stdin, expected a log file"); + exit(1); + } + + let mut entries = buffer.lines().map(|line| { + serde_json::from_str(&line).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); }) diff --git a/src/lib.rs b/src/lib.rs index 7cb82ff53327c3..80a0f607790a4f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,8 @@ pub mod accountant_skel; pub mod accountant_stub; pub mod ecdsa; pub mod entry; +#[cfg(feature = "erasure")] +pub mod erasure; pub mod event; pub mod hash; pub mod historian; @@ -17,8 +19,6 @@ pub mod signature; pub mod streamer; pub mod subscribers; pub mod transaction; -#[cfg(feature = "erasure")] -pub mod erasure; extern crate bincode; extern crate byteorder; extern crate chrono; From 3d7969d8a212ef2789770cbfca2ee36f912e58e1 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 21 Apr 2018 11:02:49 -0700 Subject: [PATCH 18/28] initial crdt implementation --- src/crdt.rs | 346 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 2 files changed, 347 insertions(+) create mode 100644 src/crdt.rs diff --git a/src/crdt.rs b/src/crdt.rs new file mode 100644 index 00000000000000..fa32770a778593 --- /dev/null +++ b/src/crdt.rs @@ -0,0 +1,346 @@ +//! The `crdt` module defines a data structure that is shared by all the nodes in the network over +//! a gossip control plane. The goal is to share small bits of of-chain information and detect and +//! repair partitions. +//! +//! This CRDT only supports a very limited set of types. A map of PublicKey -> Versioned Struct. +//! The last version is always picked durring an update. + +use bincode::{deserialize, serialize}; +use byteorder::{LittleEndian, ReadBytesExt}; +use hash::Hash; +use result::Result; +use ring::rand::{SecureRandom, SystemRandom}; +use signature::{PublicKey, Signature}; +use std::collections::HashMap; +use std::io::Cursor; +use std::net::{SocketAddr, UdpSocket}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::{sleep, spawn, JoinHandle}; +use std::time::Duration; + +/// Structure to be replicated by the network +#[derive(Serialize, Deserialize, Clone)] +pub struct ReplicatedData { + id: PublicKey, + sig: Signature, + /// should always be increasing + version: u64, + /// address to connect to for gossip + gossip_addr: SocketAddr, + /// address to connect to for replication + replicate_addr: SocketAddr, + /// address to connect to when this node is leader + lead_addr: SocketAddr, + /// current leader identity + current_leader_id: PublicKey, + /// last verified hash that was submitted to the leader + last_verified_hash: Hash, + /// last verified count, always increasing + last_verified_count: u64, +} + +impl ReplicatedData { + pub fn new(id: PublicKey, gossip_addr: SocketAddr) -> ReplicatedData { + let daddr = "0.0.0.0:0".parse().unwrap(); + ReplicatedData { + id, + sig: Signature::default(), + version: 0, + gossip_addr, + replicate_addr: daddr, + lead_addr: daddr, + current_leader_id: PublicKey::default(), + last_verified_hash: Hash::default(), + last_verified_count: 0, + } + } + fn verify_sig(&self) -> bool { + //TODO implement this + true + } +} + +/// `Crdt` structure keeps a table of `ReplicatedData` structs +/// # Properties +/// * `table` - map of public id's to versioned and signed ReplicatedData structs +/// * `local` - map of public id's to what `self.update_index` `self.table` was updated +/// * `remote` - map of public id's to the `remote.update_index` was sent +/// * `update_index` - my update index +/// # Remarks +/// This implements two services, `gossip` and `listen`. +/// * `gossip` - asynchronously ask nodes to send updates +/// * `listen` - listen for requests and responses +/// No attempt to keep track of timeouts or dropped requests is made, or should be. +pub struct Crdt { + table: HashMap, + /// Value of my update index when entry in table was updated. + /// Nodes will ask for updates since `update_index`, and this node + /// should respond with all the identities that are greater then the + /// request's `update_index` in this list + local: HashMap, + /// The value of the remote update index that i have last seen + /// This Node will ask external nodes for updates since the value in this list + remote: HashMap, + update_index: u64, + me: PublicKey, + timeout: Duration, +} +// TODO These messages should be signed, and go through the gpu pipeline for spam filtering +#[derive(Serialize, Deserialize)] +enum Protocol { + RequestUpdates(u64, SocketAddr), + //TODO might need a since? + /// from id, form's last update index, ReplicatedData + ReceiveUpdates(PublicKey, u64, Vec), +} + +impl Crdt { + pub fn new(me: ReplicatedData) -> Crdt { + assert_eq!(me.version, 0); + let mut g = Crdt { + table: HashMap::new(), + local: HashMap::new(), + remote: HashMap::new(), + me: me.id, + update_index: 1, + timeout: Duration::new(0, 100_000), + }; + g.local.insert(me.id, g.update_index); + g.table.insert(me.id, me); + g + } + pub fn insert(&mut self, v: &ReplicatedData) { + if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { + trace!("insert! {}", v.version); + self.update_index += 1; + let _ = self.table.insert(v.id, v.clone()); + let _ = self.local.insert(v.id, self.update_index); + } else { + trace!("INSERT FAILED {}", v.version); + } + } + fn random() -> u64 { + let rnd = SystemRandom::new(); + let mut buf = [0u8; 8]; + rnd.fill(&mut buf).unwrap(); + let mut rdr = Cursor::new(&buf); + rdr.read_u64::().unwrap() + } + fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { + trace!("get updates since {}", v); + let data = self.table + .values() + .filter(|x| self.local[&x.id] > v) + .cloned() + .collect(); + let id = self.me; + let ups = self.update_index; + (id, ups, data) + } + + /// Create a random gossip request + /// # Returns + /// (A,B,C) + /// * A - Remote gossip address + /// * B - My gossip address + /// * C - Remote update index to request updates since + fn gossip_request(&self) -> (SocketAddr, SocketAddr, u64) { + let n = (Self::random() as usize) % self.table.len(); + trace!("random {:?} {}", &self.me[0..1], n); + let v = self.table.values().nth(n).unwrap().clone(); + let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); + let my_addr = self.table[&self.me].gossip_addr; + (v.gossip_addr, my_addr, remote_update_index) + } + + /// At random pick a node and try to get updated changes from them + fn run_gossip(obj: &Arc>) -> Result<()> { + //TODO we need to keep track of stakes and weight the selection by stake size + //TODO cache sockets + + // Lock the object only to do this operation and not for any longer + // especially not when doing the `sock.send_to` + let (remote_gossip_addr, my_addr, remote_update_index) = + obj.read().unwrap().gossip_request(); + let mut req_addr = my_addr; + req_addr.set_port(0); + let sock = UdpSocket::bind(req_addr)?; + // TODO this will get chatty, so we need to first ask for number of updates since + // then only ask for specific data that we dont have + let r = serialize(&Protocol::RequestUpdates(remote_update_index, my_addr))?; + sock.send_to(&r, remote_gossip_addr)?; + Ok(()) + } + + /// Apply updates that we received from the identity `from` + /// # Arguments + /// * `from` - identity of the sender of the updates + /// * `update_index` - the number of updates that `from` has completed and this set of `data` represents + /// * `data` - the update data + fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: Vec) { + trace!("got updates {}", data.len()); + // TODO we need to punish/spam resist here + // sig verify the whole update and slash anyone who sends a bad update + for v in data { + if !v.verify_sig() { + continue; + } + // TODO probably an error or attack + if v.id == self.me { + continue; + } + // TODO check that last_verified types are always increasing + self.insert(&v); + } + *self.remote.entry(from).or_insert(update_index) = update_index; + } + + /// randomly pick a node and ask them for updates asynchronously + pub fn gossip(obj: Arc>, exit: Arc) -> JoinHandle<()> { + spawn(move || loop { + let _ = Self::run_gossip(&obj); + if exit.load(Ordering::Relaxed) { + return; + } + //TODO this should be a tuned parameter + sleep(obj.read().unwrap().timeout); + }) + } + + /// Process messages from the network + fn run_listen(obj: &Arc>, sock: &UdpSocket) -> Result<()> { + //TODO cache connections + let mut buf = vec![0u8; 1024 * 64]; + let (amt, src) = sock.recv_from(&mut buf)?; + trace!("got request from {}", src); + buf.resize(amt, 0); + let r = deserialize(&buf)?; + match r { + // TODO sigverify these + Protocol::RequestUpdates(v, addr) => { + trace!("RequestUpdates {}", v); + // only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from` + let (from, ups, data) = obj.read().unwrap().get_updates_since(v); + trace!("get updates since response {} {}", v, data.len()); + let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?; + trace!("send_to {}", addr); + sock.send_to(&rsp, addr).unwrap(); + trace!("send_to done!"); + } + Protocol::ReceiveUpdates(from, ups, data) => { + trace!("ReceivedUpdates"); + obj.write().unwrap().apply_updates(from, ups, data); + } + } + Ok(()) + } + pub fn listen( + obj: Arc>, + sock: UdpSocket, + exit: Arc, + ) -> JoinHandle<()> { + sock.set_read_timeout(Some(Duration::new(2, 0))).unwrap(); + spawn(move || loop { + let _ = Self::run_listen(&obj, &sock); + if exit.load(Ordering::Relaxed) { + return; + } + }) + } +} + +#[cfg(test)] +mod test { + use crdt::{Crdt, ReplicatedData}; + use signature::KeyPair; + use signature::KeyPairUtil; + use std::net::UdpSocket; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Arc, RwLock}; + use std::thread::sleep; + use std::time::Duration; + + /// Test that the network converges. + /// Create a ring a -> b -> c -> d -> e -> a of size num. + /// Run until every node in the network has a full ReplicatedData set. + /// Check that nodes stop sending updates after all the ReplicatedData has been shared. + #[test] + fn gossip_test() { + let num: usize = 5; + let exit = Arc::new(AtomicBool::new(false)); + let listen: Vec<_> = (0..num) + .map(|_| { + let listener = UdpSocket::bind("0.0.0.0:0").unwrap(); + let pubkey = KeyPair::new().pubkey(); + let d = ReplicatedData::new(pubkey, listener.local_addr().unwrap()); + let crdt = Crdt::new(d); + let c = Arc::new(RwLock::new(crdt)); + let l = Crdt::listen(c.clone(), listener, exit.clone()); + (c, l) + }) + .collect(); + for n in 0..num { + let y = n % listen.len(); + let x = (n + 1) % listen.len(); + let mut xv = listen[x].0.write().unwrap(); + let yv = listen[y].0.read().unwrap(); + let mut d = yv.table[&yv.me].clone(); + d.version = 0; + xv.insert(&d); + } + let gossip: Vec<_> = listen + .iter() + .map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone())) + .collect(); + let mut done = true; + for _ in 0..(num * 16) { + done = true; + for &(ref c, _) in listen.iter() { + trace!( + "done updates {} {}", + c.read().unwrap().table.len(), + c.read().unwrap().update_index + ); + //make sure the number of updates doesn't grow unbounded + assert!(c.read().unwrap().update_index <= num as u64); + //make sure we got all the updates + if c.read().unwrap().table.len() != num { + done = false; + } + } + if done == true { + break; + } + sleep(Duration::new(1, 0)); + } + exit.store(true, Ordering::Relaxed); + for j in gossip { + j.join().unwrap(); + } + for (c, j) in listen.into_iter() { + j.join().unwrap(); + // make it clear what failed + // protocol is to chatty, updates should stop after everyone receives `num` + assert!(c.read().unwrap().update_index <= num as u64); + // protocol is not chatty enough, everyone should get `num` entries + assert_eq!(c.read().unwrap().table.len(), num); + } + assert!(done); + } + /// Test that insert drops messages that are older + #[test] + fn insert_test() { + let mut d = ReplicatedData::new(KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap()); + assert_eq!(d.version, 0); + let mut crdt = Crdt::new(d.clone()); + assert_eq!(crdt.table[&d.id].version, 0); + d.version = 2; + crdt.insert(&d); + assert_eq!(crdt.table[&d.id].version, 2); + d.version = 1; + crdt.insert(&d); + assert_eq!(crdt.table[&d.id].version, 2); + } + +} diff --git a/src/lib.rs b/src/lib.rs index 80a0f607790a4f..90200b5b84489b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod accountant; pub mod accountant_skel; pub mod accountant_stub; +pub mod crdt; pub mod ecdsa; pub mod entry; #[cfg(feature = "erasure")] From 55b8d0db4d3d76da54af86f79c5ceb2acbad7c1f Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Mon, 23 Apr 2018 23:33:21 -0700 Subject: [PATCH 19/28] cleanup --- src/crdt.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index fa32770a778593..69de86828dd2e7 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -55,10 +55,6 @@ impl ReplicatedData { last_verified_count: 0, } } - fn verify_sig(&self) -> bool { - //TODO implement this - true - } } /// `Crdt` structure keeps a table of `ReplicatedData` structs @@ -183,9 +179,6 @@ impl Crdt { // TODO we need to punish/spam resist here // sig verify the whole update and slash anyone who sends a bad update for v in data { - if !v.verify_sig() { - continue; - } // TODO probably an error or attack if v.id == self.me { continue; From d69a86aa3e33b88999738a76e2cdcfef5791e781 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 17 Apr 2018 19:26:19 -0700 Subject: [PATCH 20/28] state replication --- src/accountant_skel.rs | 53 ++++++++++++++++++++++++++++++++++++++++++ src/subscribers.rs | 5 ++-- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 41d8f90ede1292..aba4488f96c50b 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -247,6 +247,7 @@ impl AccountantSkel { } /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// This service is the network leader /// Set `exit` to shutdown its threads. pub fn serve( obj: &Arc>>, @@ -269,6 +270,57 @@ impl AccountantSkel { streamer::responder(write, exit.clone(), blob_recycler.clone(), blob_receiver); let (verified_sender, verified_receiver) = channel(); + let exit_ = exit.clone(); + let t_verifier = spawn(move || loop { + let e = Self::blob_verifier(&blob_receiver, &verified_sender); + if e.is_err() && exit_.load(Ordering::Relaxed) { + break; + } + }); + + let skel = obj.clone(); + let t_server = spawn(move || loop { + let e = AccountantSkel::process( + &skel, + &verified_receiver, + &blob_sender, + &packet_recycler, + &blob_recycler, + ); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }); + Ok(vec![t_receiver, t_responder, t_server, t_verifier]) + } + + /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// This service receives messages from a leader in the network + /// Set `exit` to shutdown its threads. + pub fn replicate( + obj: &Arc>>, + rsubs: Subscribers, + addr: &str, + exit: Arc, + ) -> Result>> { + let read = UdpSocket::bind(rsubs.me.addr)?; + // make sure we are on the same interface + let mut local = read.local_addr()?; + local.set_port(0); + let write = UdpSocket::bind(local)?; + + let blob_recycler = packet::BlobRecycler::default(); + let (blob_sender, blob_receiver) = channel(); + let t_blob_receiver = + streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; + let (window_sender, window_receiver) = channel(); + + let subs = Arc::new(RwLock::new(rsubs)); + + let t_window = + streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver); + let (verified_sender, verified_receiver) = channel(); + let exit_ = exit.clone(); let t_verifier = spawn(move || loop { let e = Self::verifier(&packet_receiver, &verified_sender); @@ -292,6 +344,7 @@ impl AccountantSkel { }); Ok(vec![t_receiver, t_responder, t_server, t_verifier]) } + } #[cfg(test)] diff --git a/src/subscribers.rs b/src/subscribers.rs index 153246d12adb31..3484ea01b30e52 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -40,18 +40,19 @@ impl Node { pub struct Subscribers { data: Vec, - me: Node, + pub me: Node, pub leader: Node, } impl Subscribers { - pub fn new(me: Node, leader: Node) -> Subscribers { + pub fn new(me: Node, leader: Node, network: &[Node]) -> Subscribers { let mut h = Subscribers { data: vec![], me: me.clone(), leader: leader.clone(), }; h.insert(&[me, leader]); + h.insert(network); h } From c579f9b59a6d73128a60ca549bab4573e32fc12b Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 17 Apr 2018 20:09:37 -0700 Subject: [PATCH 21/28] wip --- src/accountant_skel.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index aba4488f96c50b..a8fa460e9255f1 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -300,7 +300,6 @@ impl AccountantSkel { pub fn replicate( obj: &Arc>>, rsubs: Subscribers, - addr: &str, exit: Arc, ) -> Result>> { let read = UdpSocket::bind(rsubs.me.addr)?; @@ -314,28 +313,27 @@ impl AccountantSkel { let t_blob_receiver = streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; let (window_sender, window_receiver) = channel(); + let (retransmit_sender, retransmit_receiver) = channel(); let subs = Arc::new(RwLock::new(rsubs)); + let t_retransmit = retransmitter( + write, + exit.clone(), + subs, + blob_recycler.clone(), + retransmit_receiver, + ); - let t_window = - streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver); - let (verified_sender, verified_receiver) = channel(); - let exit_ = exit.clone(); - let t_verifier = spawn(move || loop { - let e = Self::verifier(&packet_receiver, &verified_sender); - if e.is_err() && exit_.load(Ordering::Relaxed) { - break; - } - }); + let t_window = + streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver, window_sender, retransmit_sender); let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::process( + let e = AccountantSkel::replicate( &skel, &verified_receiver, &blob_sender, - &packet_recycler, &blob_recycler, ); if e.is_err() && exit.load(Ordering::Relaxed) { From 3bd74cc9abb8f547b64d75aef0e95007a41ab9de Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 12:02:54 -0700 Subject: [PATCH 22/28] update --- src/accountant_skel.rs | 47 +++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index a8fa460e9255f1..10daa11b653337 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -22,7 +22,7 @@ use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; @@ -280,7 +280,7 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::process( + let e = Self::process( &skel, &verified_receiver, &blob_sender, @@ -294,9 +294,22 @@ impl AccountantSkel { Ok(vec![t_receiver, t_responder, t_server, t_verifier]) } - /// Create a UDP microservice that forwards messages the given AccountantSkel. - /// This service receives messages from a leader in the network - /// Set `exit` to shutdown its threads. + /// This service receives messages from a leader in the network and processes the transactions + /// on the accountant state. + /// # Arguments + /// * `obj` - The accoutnant state. + /// * `rsubs` - The subscribers. + /// * `exit` - The exit signal. + /// # Remarks + /// The pipeline is constructed as follows + /// 1. receive blobs from the network, these are out of order + /// 2. verify blobs, PoH, signatures + /// 3. reconstruct consequitive window + /// a. order the blobs + /// b. use erasure coding to reconstruct missing blobs + /// c. ask the network for missing blobs + /// 4. process the transaction state machine + /// 5. respond with the hash of the state back to the leader pub fn replicate( obj: &Arc>>, rsubs: Subscribers, @@ -323,26 +336,26 @@ impl AccountantSkel { blob_recycler.clone(), retransmit_receiver, ); - - - let t_window = - streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver, window_sender, retransmit_sender); + //TODO + //the packets comming out of blob_receiver need to be sent to the GPU and verified + //then sent to the window, which does the erasure coding reconstruction + let t_window = streamer::window( + exit.clone(), + blob_recycler.clone(), + blob_receiver, + window_sender, + retransmit_sender, + ); let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::replicate( - &skel, - &verified_receiver, - &blob_sender, - &blob_recycler, - ); + let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } }); - Ok(vec![t_receiver, t_responder, t_server, t_verifier]) + Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server]) } - } #[cfg(test)] From 97176dcabc9738a3060aff6b4a67cdf8e074b0f6 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 12:05:12 -0700 Subject: [PATCH 23/28] docs --- src/accountant_skel.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 10daa11b653337..c6d2333f05b138 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -303,11 +303,12 @@ impl AccountantSkel { /// # Remarks /// The pipeline is constructed as follows /// 1. receive blobs from the network, these are out of order - /// 2. verify blobs, PoH, signatures - /// 3. reconstruct consequitive window + /// 2. verify blobs, PoH, signatures (TODO) + /// 3. reconstruct contiguous window /// a. order the blobs /// b. use erasure coding to reconstruct missing blobs /// c. ask the network for missing blobs + /// d. make sure that the blobs PoH sequences connect (TODO) /// 4. process the transaction state machine /// 5. respond with the hash of the state back to the leader pub fn replicate( From e493d7095138f753c2532a9d08887d1c31d21ce9 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 12:29:33 -0700 Subject: [PATCH 24/28] docs --- src/accountant_skel.rs | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index c6d2333f05b138..feb20a03da8b55 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -245,6 +245,34 @@ impl AccountantSkel { } Ok(()) } + /// Process verified blobs, already in order + /// Respond with a signed hash of the state + fn replicate_state( + obj: &Arc>>, + verified_receiver: &BlobReceiver, + blob_sender: &streamer::BlobSender, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let blobs = verified_receiver.recv_timeout(timer)?; + for msgs in blobs { + let entries = b.read().unwrap().data.deserialize(); + let req_vers = reqs.into_iter() + .zip(vers) + .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) + .filter(|x| x.0.verify()) + .collect(); + let rsps = obj.lock().unwrap().process_packets(req_vers)?; + let blobs = Self::serialize_responses(rsps, blob_recycler)?; + if !blobs.is_empty() { + //don't wake up the other side if there is nothing + blob_sender.send(blobs)?; + } + packet_recycler.recycle(msgs); + } + Ok(()) + } + /// Create a UDP microservice that forwards messages the given AccountantSkel. /// This service is the network leader @@ -307,7 +335,7 @@ impl AccountantSkel { /// 3. reconstruct contiguous window /// a. order the blobs /// b. use erasure coding to reconstruct missing blobs - /// c. ask the network for missing blobs + /// c. ask the network for missing blobs, if erasure coding is insufficient /// d. make sure that the blobs PoH sequences connect (TODO) /// 4. process the transaction state machine /// 5. respond with the hash of the state back to the leader From 37448deb93c9cd784714ef27b9b4fcd9f0decdb1 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 20:12:30 -0700 Subject: [PATCH 25/28] wip --- src/accountant_skel.rs | 22 ++++++++-------------- src/streamer.rs | 36 ++++++++++++++++++------------------ src/subscribers.rs | 4 ++-- 3 files changed, 28 insertions(+), 34 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index feb20a03da8b55..99f846cf45263d 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -256,20 +256,13 @@ impl AccountantSkel { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; for msgs in blobs { - let entries = b.read().unwrap().data.deserialize(); - let req_vers = reqs.into_iter() - .zip(vers) - .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) - .filter(|x| x.0.verify()) - .collect(); - let rsps = obj.lock().unwrap().process_packets(req_vers)?; - let blobs = Self::serialize_responses(rsps, blob_recycler)?; - if !blobs.is_empty() { - //don't wake up the other side if there is nothing - blob_sender.send(blobs)?; + let entries:Vec = b.read().unwrap().data.deserialize()?; + for e in entries { + obj.lock().unwrap().acc.process_verified_events(e.events)?; } - packet_recycler.recycle(msgs); + //TODO respond back to leader with hash of the state } + blob_recycler.recycle(msgs); Ok(()) } @@ -325,11 +318,11 @@ impl AccountantSkel { /// This service receives messages from a leader in the network and processes the transactions /// on the accountant state. /// # Arguments - /// * `obj` - The accoutnant state. + /// * `obj` - The accountant state. /// * `rsubs` - The subscribers. /// * `exit` - The exit signal. /// # Remarks - /// The pipeline is constructed as follows + /// The pipeline is constructed as follows: /// 1. receive blobs from the network, these are out of order /// 2. verify blobs, PoH, signatures (TODO) /// 3. reconstruct contiguous window @@ -370,6 +363,7 @@ impl AccountantSkel { //then sent to the window, which does the erasure coding reconstruction let t_window = streamer::window( exit.clone(), + subs, blob_recycler.clone(), blob_receiver, window_sender, diff --git a/src/streamer.rs b/src/streamer.rs index 33882c31dcffda..0fd8c1b758204d 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -8,7 +8,7 @@ use std::sync::mpsc; use std::sync::{Arc, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; -use subscribers; +use subscribers::Subscribers; pub type PacketReceiver = mpsc::Receiver; pub type PacketSender = mpsc::Sender; @@ -106,12 +106,12 @@ pub fn blob_receiver( fn recv_window( window: &mut Vec>, - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, consumed: &mut usize, r: &BlobReceiver, s: &BlobSender, - cast: &BlobSender, + retransmit: &BlobSender, ) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; @@ -120,7 +120,7 @@ fn recv_window( } { //retransmit all leader blocks - let mut castq = VecDeque::new(); + let mut retransmitq = VecDeque::new(); let rsubs = subs.read().unwrap(); for b in &dq { let p = b.read().unwrap(); @@ -141,11 +141,11 @@ fn recv_window( mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); } - castq.push_back(nv); + retransmitq.push_back(nv); } } - if !castq.is_empty() { - cast.send(castq)?; + if !retransmitq.is_empty() { + retransmit.send(retransmitq)?; } } //send a contiguous set of blocks @@ -183,11 +183,11 @@ fn recv_window( pub fn window( exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, s: BlobSender, - cast: BlobSender, + retransmit: BlobSender, ) -> JoinHandle<()> { spawn(move || { let mut window = vec![None; NUM_BLOBS]; @@ -196,13 +196,13 @@ pub fn window( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &cast); + let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &retransmit); } }) } fn retransmit( - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, r: &BlobReceiver, sock: &UdpSocket, @@ -237,7 +237,7 @@ fn retransmit( pub fn retransmitter( sock: UdpSocket, exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, ) -> JoinHandle<()> { @@ -442,20 +442,21 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::new([0; 8], 0, send.local_addr().unwrap()), + &[], ))); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap(); let (s_window, r_window) = channel(); - let (s_cast, r_cast) = channel(); + let (s_retransmit, r_retransmit) = channel(); let t_window = window( exit.clone(), subs, resp_recycler.clone(), r_reader, s_window, - s_cast, + s_retransmit, ); let (s_responder, r_responder) = channel(); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); @@ -475,8 +476,8 @@ mod test { let mut num = 0; get_blobs(r_window, &mut num); assert_eq!(num, 10); - let mut q = r_cast.recv().unwrap(); - while let Ok(mut nq) = r_cast.try_recv() { + let mut q = r_retransmit.recv().unwrap(); + while let Ok(mut nq) = r_retransmit.try_recv() { q.append(&mut nq); } assert_eq!(q.len(), 10); @@ -494,9 +495,8 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::default(), + &[Node::new([0; 8], 1, read.local_addr().unwrap())] ))); - let n3 = Node::new([0; 8], 1, read.local_addr().unwrap()); - subs.write().unwrap().insert(&[n3]); let (s_retransmit, r_retransmit) = channel(); let blob_recycler = BlobRecycler::default(); let saddr = send.local_addr().unwrap(); diff --git a/src/subscribers.rs b/src/subscribers.rs index 3484ea01b30e52..b81a54941b599a 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -100,7 +100,7 @@ mod test { me.weight = 10; let mut leader = Node::default(); leader.weight = 11; - let mut s = Subscribers::new(me, leader); + let mut s = Subscribers::new(me, leader, &[]); assert_eq!(s.data.len(), 2); assert_eq!(s.data[0].weight, 11); assert_eq!(s.data[1].weight, 10); @@ -117,7 +117,7 @@ mod test { let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind"); let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap()); let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap()); - let mut s = Subscribers::new(n1.clone(), n2.clone()); + let mut s = Subscribers::new(n1.clone(), n2.clone(), &[]); let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap()); s.insert(&[n3]); let mut b = Blob::default(); From 9dfd18a11026233730b9861f30da85949722fc00 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 19 Apr 2018 10:32:02 -0700 Subject: [PATCH 26/28] update --- src/accountant_skel.rs | 5 +++-- src/result.rs | 7 +++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 99f846cf45263d..09b0c438a62f18 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -27,6 +27,7 @@ use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; use transaction::Transaction; +use subscribers::Subscribers; pub struct AccountantSkel { acc: Accountant, @@ -293,7 +294,7 @@ impl AccountantSkel { let exit_ = exit.clone(); let t_verifier = spawn(move || loop { - let e = Self::blob_verifier(&blob_receiver, &verified_sender); + let e = Self::verifier(&packet_receiver, &verified_sender); if e.is_err() && exit_.load(Ordering::Relaxed) { break; } @@ -351,7 +352,7 @@ impl AccountantSkel { let (retransmit_sender, retransmit_receiver) = channel(); let subs = Arc::new(RwLock::new(rsubs)); - let t_retransmit = retransmitter( + let t_retransmit = streamer::retransmitter( write, exit.clone(), subs, diff --git a/src/result.rs b/src/result.rs index 9b3c17a3695fde..01872dfbe1138c 100644 --- a/src/result.rs +++ b/src/result.rs @@ -4,6 +4,7 @@ use bincode; use serde_json; use std; use std::any::Any; +use accountant; #[derive(Debug)] pub enum Error { @@ -14,6 +15,7 @@ pub enum Error { RecvError(std::sync::mpsc::RecvError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), + AccountingError(accountant::AccountingError), SendError, Services, } @@ -30,6 +32,11 @@ impl std::convert::From for Error { Error::RecvTimeoutError(e) } } +impl std::convert::From for Error { + fn from(e: accountant::AccountingError) -> Error { + Error::AccountingError(e) + } +} impl std::convert::From> for Error { fn from(_e: std::sync::mpsc::SendError) -> Error { Error::SendError From 92b8bb6445bc13775a690101b9eb2a8bc54be5ae Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Thu, 19 Apr 2018 15:43:19 -0700 Subject: [PATCH 27/28] Fix some compilation issues --- src/accountant_skel.rs | 53 +++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 9 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 09b0c438a62f18..579e260ea71e38 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -27,7 +27,9 @@ use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; use transaction::Transaction; -use subscribers::Subscribers; + +use subscribers; +use std::mem::size_of; pub struct AccountantSkel { acc: Accountant, @@ -250,20 +252,26 @@ impl AccountantSkel { /// Respond with a signed hash of the state fn replicate_state( obj: &Arc>>, - verified_receiver: &BlobReceiver, + verified_receiver: &streamer::BlobReceiver, blob_sender: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; - for msgs in blobs { - let entries:Vec = b.read().unwrap().data.deserialize()?; + for msgs in &blobs { + let blob = msgs.read().unwrap(); + let mut entries:Vec = Vec::new(); + for i in 0..blob.meta.size/size_of::() { + entries.push(deserialize(&blob.data[i..i+size_of::()]).unwrap()); + } for e in entries { obj.lock().unwrap().acc.process_verified_events(e.events)?; } //TODO respond back to leader with hash of the state } - blob_recycler.recycle(msgs); + for blob in blobs { + blob_recycler.recycle(blob); + } Ok(()) } @@ -335,7 +343,7 @@ impl AccountantSkel { /// 5. respond with the hash of the state back to the leader pub fn replicate( obj: &Arc>>, - rsubs: Subscribers, + rsubs: subscribers::Subscribers, exit: Arc, ) -> Result>> { let read = UdpSocket::bind(rsubs.me.addr)?; @@ -347,7 +355,7 @@ impl AccountantSkel { let blob_recycler = packet::BlobRecycler::default(); let (blob_sender, blob_receiver) = channel(); let t_blob_receiver = - streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; + streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender.clone())?; let (window_sender, window_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel(); @@ -355,7 +363,7 @@ impl AccountantSkel { let t_retransmit = streamer::retransmitter( write, exit.clone(), - subs, + subs.clone(), blob_recycler.clone(), retransmit_receiver, ); @@ -373,7 +381,8 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler); + let e = Self::replicate_state(&skel, &window_receiver, + &blob_sender, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } @@ -427,6 +436,8 @@ mod tests { use std::time::Duration; use transaction::Transaction; + use subscribers::{Node, Subscribers}; + #[test] fn test_layout() { let tr = test_tx(); @@ -531,6 +542,30 @@ mod tests { exit.store(true, Ordering::Relaxed); } + #[test] + fn test_replicate() { + let serve_port = 9004; + let send_port = 9005; + let addr = format!("127.0.0.1:{}", serve_port); + let send_addr = format!("127.0.0.1:{}", send_port); + let alice = Mint::new(10_000); + let acc = Accountant::new(&alice); + let bob_pubkey = KeyPair::new().pubkey(); + let exit = Arc::new(AtomicBool::new(false)); + let historian = Historian::new(&alice.last_id(), Some(30)); + let acc = Arc::new(Mutex::new(AccountantSkel::new( + acc, + alice.last_id(), + sink(), + historian, + ))); + let node_me = Node::default(); + let node_leader = Node::default(); + let subs = Subscribers::new(node_me, node_leader, &[]); + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); + exit.store(true, Ordering::Relaxed); + } + } #[cfg(all(feature = "unstable", test))] From 58d1ddd2e67a118f7d306577b45252dcee5f8aeb Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Tue, 24 Apr 2018 10:57:40 -0700 Subject: [PATCH 28/28] Work on test_replicate to test replicate service generate some messages to send to replicator service --- src/accountant_skel.rs | 46 +++++++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 579e260ea71e38..3f06aafcb9e27e 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -416,7 +416,7 @@ mod tests { use accountant_skel::{to_packets, Request}; use bincode::serialize; use ecdsa; - use packet::{PacketRecycler, NUM_PACKETS}; + use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS}; use transaction::{memfind, test_tx}; use accountant::Accountant; @@ -437,6 +437,10 @@ mod tests { use transaction::Transaction; use subscribers::{Node, Subscribers}; + use streamer; + use std::sync::mpsc::channel; + use std::collections::VecDeque; + use packet::{PACKET_DATA_SIZE}; #[test] fn test_layout() { @@ -544,14 +548,25 @@ mod tests { #[test] fn test_replicate() { - let serve_port = 9004; - let send_port = 9005; - let addr = format!("127.0.0.1:{}", serve_port); - let send_addr = format!("127.0.0.1:{}", send_port); + let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let addr = read.local_addr().unwrap(); + let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let exit = Arc::new(AtomicBool::new(false)); + + let node_me = Node::default(); + let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap()); + let subs = Subscribers::new(node_me, node_leader, &[]); + + let recv_recycler = PacketRecycler::default(); + let resp_recycler = BlobRecycler::default(); + let (s_reader, r_reader) = channel(); + let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap(); + let (s_responder, r_responder) = channel(); + let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let alice = Mint::new(10_000); let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); - let exit = Arc::new(AtomicBool::new(false)); let historian = Historian::new(&alice.last_id(), Some(30)); let acc = Arc::new(Mutex::new(AccountantSkel::new( acc, @@ -559,11 +574,24 @@ mod tests { sink(), historian, ))); - let node_me = Node::default(); - let node_leader = Node::default(); - let subs = Subscribers::new(node_me, node_leader, &[]); + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); + + let mut msgs = VecDeque::new(); + for i in 0..10 { + let b = resp_recycler.allocate(); + let b_ = b.clone(); + let mut w = b.write().unwrap(); + w.data[0] = i as u8; + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&addr); + msgs.push_back(b_); + } + s_responder.send(msgs).expect("send"); + exit.store(true, Ordering::Relaxed); + t_receiver.join().expect("join"); + t_responder.join().expect("join"); } }