diff --git a/Cargo.toml b/Cargo.toml index 65fd08cf2c7674..73e4022c86f578 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,11 +3,12 @@ name = "solana" description = "High Performance Blockchain" version = "0.4.0" documentation = "https://docs.rs/solana" -homepage = "http://loomprotocol.com/" +homepage = "http://solana.io/" repository = "https://github.com/solana-labs/solana" authors = [ - "Anatoly Yakovenko ", - "Greg Fitzgerald ", + "Anatoly Yakovenko ", + "Greg Fitzgerald ", + "Stephen Akridge ", ] license = "Apache-2.0" @@ -42,6 +43,7 @@ codecov = { repository = "solana-labs/solana", branch = "master", service = "git unstable = [] ipv6 = [] cuda = [] +erasure = [] [dependencies] rayon = "1.0.0" @@ -60,4 +62,4 @@ matches = "^0.1.6" byteorder = "^1.2.1" libc = "^0.2.1" getopts = "^0.2" - +isatty = "0.1" diff --git a/LICENSE b/LICENSE index cae77a0e837cb0..1dee745e3c7e92 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright 2018 Anatoly Yakovenko and Greg Fitzgerald +Copyright 2018 Anatoly Yakovenko, Greg Fitzgerald and Stephen Akridge Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/README.md b/README.md index fa3a9ba1fffea6..611be8a8361a77 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,11 @@ Solana: High Performance Blockchain Solana™ is a new architecture for a high performance blockchain. It aims to support over 700 thousand transactions per second on a gigabit network. +Introduction +=== + +It's possible for a centralized database to process 710,000 transactions per second on a standard gigabit network if the transactions are, on average, no more than 178 bytes. A centralized database can also replicate itself and maintain high availability without significantly compromising that transaction rate using the distributed system technique known as Optimistic Concurrency Control [H.T.Kung, J.T.Robinson (1981)]. At Solana, we're demonstrating that these same theoretical limits apply just as well to blockchain on an adversarial network. The key ingredient? Finding a way to share time when nodes can't trust one-another. Once nodes can trust time, suddenly ~40 years of distributed systems research becomes applicable to blockchain! Furthermore, and much to our surprise, it can implemented using a mechanism that has existed in Bitcoin since day one. The Bitcoin feature is called nLocktime and it can be used to postdate transactions using block height instead of a timestamp. As a Bitcoin client, you'd use block height instead of a timestamp if you don't trust the network. Block height turns out to be an instance of what's being called a Verifiable Delay Function in cryptography circles. It's a cryptographically secure way to say time has passed. In Solana, we use a far more granular verifiable delay function, a SHA 256 hash chain, to checkpoint the ledger and coordinate consensus. With it, we implement Optimistic Concurrency Control and are now well in route towards that theoretical limit of 710,000 transactions per second. + Running the demo === @@ -24,6 +29,13 @@ $ curl https://sh.rustup.rs -sSf | sh $ source $HOME/.cargo/env ``` +Now checkout the code from github: + +```bash +$ git clone https://github.com/solana-labs/solana.git +$ cd solana +``` + The testnode server is initialized with a ledger from stdin and generates new ledger entries on stdout. To create the input ledger, we'll need to create *the mint* and use it to generate a *genesis ledger*. It's done in diff --git a/build.rs b/build.rs index b8d72fe8ed5652..844a92dc7edb97 100644 --- a/build.rs +++ b/build.rs @@ -1,12 +1,15 @@ use std::env; fn main() { + println!("cargo:rustc-link-search=native=."); if !env::var("CARGO_FEATURE_CUDA").is_err() { - println!("cargo:rustc-link-search=native=."); println!("cargo:rustc-link-lib=static=cuda_verify_ed25519"); println!("cargo:rustc-link-search=native=/usr/local/cuda/lib64"); println!("cargo:rustc-link-lib=dylib=cudart"); println!("cargo:rustc-link-lib=dylib=cuda"); println!("cargo:rustc-link-lib=dylib=cudadevrt"); } + if !env::var("CARGO_FEATURE_ERASURE").is_err() { + println!("cargo:rustc-link-lib=dylib=Jerasure"); + } } diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 41d8f90ede1292..f1e000c5c3afc2 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -22,11 +22,15 @@ use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; use transaction::Transaction; +use subscribers::Subscribers; + +use subscribers; +use std::mem::size_of; pub struct AccountantSkel { acc: Accountant, @@ -245,8 +249,32 @@ impl AccountantSkel { } Ok(()) } + /// Process verified blobs, already in order + /// Respond with a signed hash of the state + fn replicate_state( + obj: &Arc>>, + verified_receiver: &BlobReceiver, + blob_sender: &streamer::BlobSender, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let blobs = verified_receiver.recv_timeout(timer)?; + for msgs in blobs { + let entries:Vec = b.read().unwrap().data.deserialize()?; + for e in entries { + obj.lock().unwrap().acc.process_verified_events(e.events)?; + } + //TODO respond back to leader with hash of the state + } + for blob in blobs { + blob_recycler.recycle(blob); + } + Ok(()) + } + /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// This service is the network leader /// Set `exit` to shutdown its threads. pub fn serve( obj: &Arc>>, @@ -279,7 +307,7 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::process( + let e = Self::process( &skel, &verified_receiver, &blob_sender, @@ -292,6 +320,71 @@ impl AccountantSkel { }); Ok(vec![t_receiver, t_responder, t_server, t_verifier]) } + + /// This service receives messages from a leader in the network and processes the transactions + /// on the accountant state. + /// # Arguments + /// * `obj` - The accountant state. + /// * `rsubs` - The subscribers. + /// * `exit` - The exit signal. + /// # Remarks + /// The pipeline is constructed as follows: + /// 1. receive blobs from the network, these are out of order + /// 2. verify blobs, PoH, signatures (TODO) + /// 3. reconstruct contiguous window + /// a. order the blobs + /// b. use erasure coding to reconstruct missing blobs + /// c. ask the network for missing blobs, if erasure coding is insufficient + /// d. make sure that the blobs PoH sequences connect (TODO) + /// 4. process the transaction state machine + /// 5. respond with the hash of the state back to the leader + pub fn replicate( + obj: &Arc>>, + rsubs: Subscribers, + exit: Arc, + ) -> Result>> { + let read = UdpSocket::bind(rsubs.me.addr)?; + // make sure we are on the same interface + let mut local = read.local_addr()?; + local.set_port(0); + let write = UdpSocket::bind(local)?; + + let blob_recycler = packet::BlobRecycler::default(); + let (blob_sender, blob_receiver) = channel(); + let t_blob_receiver = + streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; + let (window_sender, window_receiver) = channel(); + let (retransmit_sender, retransmit_receiver) = channel(); + + let subs = Arc::new(RwLock::new(rsubs)); + let t_retransmit = streamer::retransmitter( + write, + exit.clone(), + subs, + blob_recycler.clone(), + retransmit_receiver, + ); + //TODO + //the packets comming out of blob_receiver need to be sent to the GPU and verified + //then sent to the window, which does the erasure coding reconstruction + let t_window = streamer::window( + exit.clone(), + subs, + blob_recycler.clone(), + blob_receiver, + window_sender, + retransmit_sender, + ); + + let skel = obj.clone(); + let t_server = spawn(move || loop { + let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }); + Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server]) + } } #[cfg(test)] @@ -319,7 +412,7 @@ mod tests { use accountant_skel::{to_packets, Request}; use bincode::serialize; use ecdsa; - use packet::{PacketRecycler, NUM_PACKETS}; + use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS}; use transaction::{memfind, test_tx}; use accountant::Accountant; @@ -339,6 +432,12 @@ mod tests { use std::time::Duration; use transaction::Transaction; + use subscribers::{Node, Subscribers}; + use streamer; + use std::sync::mpsc::channel; + use std::collections::VecDeque; + use packet::{PACKET_DATA_SIZE}; + #[test] fn test_layout() { let tr = test_tx(); @@ -443,6 +542,54 @@ mod tests { exit.store(true, Ordering::Relaxed); } + #[test] + fn test_replicate() { + let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let addr = read.local_addr().unwrap(); + let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let exit = Arc::new(AtomicBool::new(false)); + + let node_me = Node::default(); + let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap()); + let subs = Subscribers::new(node_me, node_leader, &[]); + + let recv_recycler = PacketRecycler::default(); + let resp_recycler = BlobRecycler::default(); + let (s_reader, r_reader) = channel(); + let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap(); + let (s_responder, r_responder) = channel(); + let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder); + + let alice = Mint::new(10_000); + let acc = Accountant::new(&alice); + let bob_pubkey = KeyPair::new().pubkey(); + let historian = Historian::new(&alice.last_id(), Some(30)); + let acc = Arc::new(Mutex::new(AccountantSkel::new( + acc, + alice.last_id(), + sink(), + historian, + ))); + + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); + + let mut msgs = VecDeque::new(); + for i in 0..10 { + let b = resp_recycler.allocate(); + let b_ = b.clone(); + let mut w = b.write().unwrap(); + w.data[0] = i as u8; + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&addr); + msgs.push_back(b_); + } + s_responder.send(msgs).expect("send"); + + exit.store(true, Ordering::Relaxed); + t_receiver.join().expect("join"); + t_responder.join().expect("join"); + } + } #[cfg(all(feature = "unstable", test))] diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 0829c15e2f3a2e..962141720bb861 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -1,20 +1,32 @@ extern crate getopts; +extern crate isatty; extern crate rayon; extern crate serde_json; extern crate solana; use getopts::Options; +use isatty::stdin_isatty; use rayon::prelude::*; use solana::accountant_stub::AccountantStub; use solana::mint::Mint; use solana::signature::{KeyPair, KeyPairUtil}; use solana::transaction::Transaction; use std::env; -use std::io::stdin; +use std::io::{stdin, Read}; use std::net::UdpSocket; +use std::process::exit; use std::thread::sleep; use std::time::{Duration, Instant}; +fn print_usage(program: &str, opts: Options) { + let mut brief = format!("Usage: cat | {} [options]\n\n", program); + brief += " Solana client demo creates a number of transactions and\n"; + brief += " sends them to a target node."; + brief += " Takes json formatted mint file to stdin."; + + print!("{}", opts.usage(&brief)); +} + fn main() { let mut threads = 4usize; let mut addr: String = "127.0.0.1:8000".to_string(); @@ -24,12 +36,21 @@ fn main() { opts.optopt("s", "", "server address", "host:port"); opts.optopt("c", "", "client address", "host:port"); opts.optopt("t", "", "number of threads", "4"); + opts.optflag("h", "help", "print help"); let args: Vec = env::args().collect(); let matches = match opts.parse(&args[1..]) { Ok(m) => m, - Err(f) => panic!(f.to_string()), + Err(e) => { + eprintln!("{}", e); + exit(1); + } }; + if matches.opt_present("h") { + let program = args[0].clone(); + print_usage(&program, opts); + return; + } if matches.opt_present("s") { addr = matches.opt_str("s").unwrap(); } @@ -39,7 +60,23 @@ fn main() { if matches.opt_present("t") { threads = matches.opt_str("t").unwrap().parse().expect("integer"); } - let mint: Mint = serde_json::from_reader(stdin()).unwrap(); + + if stdin_isatty() { + eprintln!("nothing found on stdin, expected a json file"); + exit(1); + } + + let mut buffer = String::new(); + let num_bytes = stdin().read_to_string(&mut buffer).unwrap(); + if num_bytes == 0 { + eprintln!("empty file on stdin, expected a json file"); + exit(1); + } + + let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| { + eprintln!("failed to parse json: {}", e); + exit(1); + }); let mint_keypair = mint.keypair(); let mint_pubkey = mint.pubkey(); diff --git a/src/bin/genesis-demo.rs b/src/bin/genesis-demo.rs index bcf26bc0e0aea9..340f09c6cfd36b 100644 --- a/src/bin/genesis-demo.rs +++ b/src/bin/genesis-demo.rs @@ -1,20 +1,38 @@ +extern crate isatty; extern crate serde_json; extern crate solana; +use isatty::stdin_isatty; use solana::entry::create_entry; use solana::event::Event; use solana::hash::Hash; use solana::mint::Mint; use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; use solana::transaction::Transaction; -use std::io::stdin; +use std::io::{stdin, Read}; +use std::process::exit; fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Event { Event::Transaction(Transaction::new(from, to, tokens, last_id)) } fn main() { - let mint: Mint = serde_json::from_reader(stdin()).unwrap(); + if stdin_isatty() { + eprintln!("nothing found on stdin, expected a json file"); + exit(1); + } + + let mut buffer = String::new(); + let num_bytes = stdin().read_to_string(&mut buffer).unwrap(); + if num_bytes == 0 { + eprintln!("empty file on stdin, expected a json file"); + exit(1); + } + + let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| { + eprintln!("failed to parse json: {}", e); + exit(1); + }); let mut entries = mint.create_entries(); let from = mint.keypair(); @@ -25,6 +43,10 @@ fn main() { entries.push(create_entry(&seed, 0, events)); for entry in entries { - println!("{}", serde_json::to_string(&entry).unwrap()); + let serialized = serde_json::to_string(&entry).unwrap_or_else(|e| { + eprintln!("failed to serialize: {}", e); + exit(1); + }); + println!("{}", serialized); } } diff --git a/src/bin/genesis.rs b/src/bin/genesis.rs index 66d2357b542250..07d7dc89fc0f85 100644 --- a/src/bin/genesis.rs +++ b/src/bin/genesis.rs @@ -1,14 +1,36 @@ //! A command-line executable for generating the chain's genesis block. +extern crate isatty; extern crate serde_json; extern crate solana; +use isatty::stdin_isatty; use solana::mint::Mint; -use std::io::stdin; +use std::io::{stdin, Read}; +use std::process::exit; fn main() { - let mint: Mint = serde_json::from_reader(stdin()).unwrap(); + if stdin_isatty() { + eprintln!("nothing found on stdin, expected a json file"); + exit(1); + } + + let mut buffer = String::new(); + let num_bytes = stdin().read_to_string(&mut buffer).unwrap(); + if num_bytes == 0 { + eprintln!("empty file on stdin, expected a json file"); + exit(1); + } + + let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| { + eprintln!("failed to parse json: {}", e); + exit(1); + }); for x in mint.create_entries() { - println!("{}", serde_json::to_string(&x).unwrap()); + let serialized = serde_json::to_string(&x).unwrap_or_else(|e| { + eprintln!("failed to serialize: {}", e); + exit(1); + }); + println!("{}", serialized); } } diff --git a/src/bin/mint.rs b/src/bin/mint.rs index 880257af7fd514..73a67fb129897a 100644 --- a/src/bin/mint.rs +++ b/src/bin/mint.rs @@ -1,15 +1,29 @@ +extern crate isatty; extern crate serde_json; extern crate solana; +use isatty::stdin_isatty; use solana::mint::Mint; use std::io; +use std::process::exit; fn main() { let mut input_text = String::new(); + if stdin_isatty() { + eprintln!("nothing found on stdin, expected a token number"); + exit(1); + } + io::stdin().read_line(&mut input_text).unwrap(); let trimmed = input_text.trim(); - let tokens = trimmed.parse::().unwrap(); - + let tokens = trimmed.parse::().unwrap_or_else(|e| { + eprintln!("{}", e); + exit(1); + }); let mint = Mint::new(tokens); - println!("{}", serde_json::to_string(&mint).unwrap()); + let serialized = serde_json::to_string(&mint).unwrap_or_else(|e| { + eprintln!("failed to serialize: {}", e); + exit(1); + }); + println!("{}", serialized); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 3e0ac842664e0c..3fa995a718fa0b 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -1,38 +1,73 @@ extern crate env_logger; extern crate getopts; +extern crate isatty; extern crate serde_json; extern crate solana; use getopts::Options; +use isatty::stdin_isatty; use solana::accountant::Accountant; use solana::accountant_skel::AccountantSkel; use solana::entry::Entry; use solana::event::Event; use solana::historian::Historian; use std::env; -use std::io::{self, stdout, BufRead}; +use std::io::{stdin, stdout, Read}; +use std::process::exit; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex}; +fn print_usage(program: &str, opts: Options) { + let mut brief = format!("Usage: cat | {} [options]\n\n", program); + brief += " Run a Solana node to handle transactions and\n"; + brief += " write a new transaction log to stdout.\n"; + brief += " Takes existing transaction log from stdin."; + + print!("{}", opts.usage(&brief)); +} + fn main() { env_logger::init().unwrap(); let mut port = 8000u16; let mut opts = Options::new(); opts.optopt("p", "", "port", "port"); + opts.optflag("h", "help", "print help"); let args: Vec = env::args().collect(); let matches = match opts.parse(&args[1..]) { Ok(m) => m, - Err(f) => panic!(f.to_string()), + Err(e) => { + eprintln!("{}", e); + exit(1); + } }; + if matches.opt_present("h") { + let program = args[0].clone(); + print_usage(&program, opts); + return; + } if matches.opt_present("p") { port = matches.opt_str("p").unwrap().parse().expect("port"); } let addr = format!("0.0.0.0:{}", port); - let stdin = io::stdin(); - let mut entries = stdin - .lock() - .lines() - .map(|line| serde_json::from_str(&line.unwrap()).unwrap()); + + if stdin_isatty() { + eprintln!("nothing found on stdin, expected a log file"); + exit(1); + } + + let mut buffer = String::new(); + let num_bytes = stdin().read_to_string(&mut buffer).unwrap(); + if num_bytes == 0 { + eprintln!("empty file on stdin, expected a log file"); + exit(1); + } + + let mut entries = buffer.lines().map(|line| { + serde_json::from_str(&line).unwrap_or_else(|e| { + eprintln!("failed to parse json: {}", e); + exit(1); + }) + }); // The first item in the ledger is required to be an entry with zero num_hashes, // which implies its id can be used as the ledger's seed. diff --git a/src/crdt.rs b/src/crdt.rs new file mode 100644 index 00000000000000..69de86828dd2e7 --- /dev/null +++ b/src/crdt.rs @@ -0,0 +1,339 @@ +//! The `crdt` module defines a data structure that is shared by all the nodes in the network over +//! a gossip control plane. The goal is to share small bits of of-chain information and detect and +//! repair partitions. +//! +//! This CRDT only supports a very limited set of types. A map of PublicKey -> Versioned Struct. +//! The last version is always picked durring an update. + +use bincode::{deserialize, serialize}; +use byteorder::{LittleEndian, ReadBytesExt}; +use hash::Hash; +use result::Result; +use ring::rand::{SecureRandom, SystemRandom}; +use signature::{PublicKey, Signature}; +use std::collections::HashMap; +use std::io::Cursor; +use std::net::{SocketAddr, UdpSocket}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::{sleep, spawn, JoinHandle}; +use std::time::Duration; + +/// Structure to be replicated by the network +#[derive(Serialize, Deserialize, Clone)] +pub struct ReplicatedData { + id: PublicKey, + sig: Signature, + /// should always be increasing + version: u64, + /// address to connect to for gossip + gossip_addr: SocketAddr, + /// address to connect to for replication + replicate_addr: SocketAddr, + /// address to connect to when this node is leader + lead_addr: SocketAddr, + /// current leader identity + current_leader_id: PublicKey, + /// last verified hash that was submitted to the leader + last_verified_hash: Hash, + /// last verified count, always increasing + last_verified_count: u64, +} + +impl ReplicatedData { + pub fn new(id: PublicKey, gossip_addr: SocketAddr) -> ReplicatedData { + let daddr = "0.0.0.0:0".parse().unwrap(); + ReplicatedData { + id, + sig: Signature::default(), + version: 0, + gossip_addr, + replicate_addr: daddr, + lead_addr: daddr, + current_leader_id: PublicKey::default(), + last_verified_hash: Hash::default(), + last_verified_count: 0, + } + } +} + +/// `Crdt` structure keeps a table of `ReplicatedData` structs +/// # Properties +/// * `table` - map of public id's to versioned and signed ReplicatedData structs +/// * `local` - map of public id's to what `self.update_index` `self.table` was updated +/// * `remote` - map of public id's to the `remote.update_index` was sent +/// * `update_index` - my update index +/// # Remarks +/// This implements two services, `gossip` and `listen`. +/// * `gossip` - asynchronously ask nodes to send updates +/// * `listen` - listen for requests and responses +/// No attempt to keep track of timeouts or dropped requests is made, or should be. +pub struct Crdt { + table: HashMap, + /// Value of my update index when entry in table was updated. + /// Nodes will ask for updates since `update_index`, and this node + /// should respond with all the identities that are greater then the + /// request's `update_index` in this list + local: HashMap, + /// The value of the remote update index that i have last seen + /// This Node will ask external nodes for updates since the value in this list + remote: HashMap, + update_index: u64, + me: PublicKey, + timeout: Duration, +} +// TODO These messages should be signed, and go through the gpu pipeline for spam filtering +#[derive(Serialize, Deserialize)] +enum Protocol { + RequestUpdates(u64, SocketAddr), + //TODO might need a since? + /// from id, form's last update index, ReplicatedData + ReceiveUpdates(PublicKey, u64, Vec), +} + +impl Crdt { + pub fn new(me: ReplicatedData) -> Crdt { + assert_eq!(me.version, 0); + let mut g = Crdt { + table: HashMap::new(), + local: HashMap::new(), + remote: HashMap::new(), + me: me.id, + update_index: 1, + timeout: Duration::new(0, 100_000), + }; + g.local.insert(me.id, g.update_index); + g.table.insert(me.id, me); + g + } + pub fn insert(&mut self, v: &ReplicatedData) { + if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { + trace!("insert! {}", v.version); + self.update_index += 1; + let _ = self.table.insert(v.id, v.clone()); + let _ = self.local.insert(v.id, self.update_index); + } else { + trace!("INSERT FAILED {}", v.version); + } + } + fn random() -> u64 { + let rnd = SystemRandom::new(); + let mut buf = [0u8; 8]; + rnd.fill(&mut buf).unwrap(); + let mut rdr = Cursor::new(&buf); + rdr.read_u64::().unwrap() + } + fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { + trace!("get updates since {}", v); + let data = self.table + .values() + .filter(|x| self.local[&x.id] > v) + .cloned() + .collect(); + let id = self.me; + let ups = self.update_index; + (id, ups, data) + } + + /// Create a random gossip request + /// # Returns + /// (A,B,C) + /// * A - Remote gossip address + /// * B - My gossip address + /// * C - Remote update index to request updates since + fn gossip_request(&self) -> (SocketAddr, SocketAddr, u64) { + let n = (Self::random() as usize) % self.table.len(); + trace!("random {:?} {}", &self.me[0..1], n); + let v = self.table.values().nth(n).unwrap().clone(); + let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); + let my_addr = self.table[&self.me].gossip_addr; + (v.gossip_addr, my_addr, remote_update_index) + } + + /// At random pick a node and try to get updated changes from them + fn run_gossip(obj: &Arc>) -> Result<()> { + //TODO we need to keep track of stakes and weight the selection by stake size + //TODO cache sockets + + // Lock the object only to do this operation and not for any longer + // especially not when doing the `sock.send_to` + let (remote_gossip_addr, my_addr, remote_update_index) = + obj.read().unwrap().gossip_request(); + let mut req_addr = my_addr; + req_addr.set_port(0); + let sock = UdpSocket::bind(req_addr)?; + // TODO this will get chatty, so we need to first ask for number of updates since + // then only ask for specific data that we dont have + let r = serialize(&Protocol::RequestUpdates(remote_update_index, my_addr))?; + sock.send_to(&r, remote_gossip_addr)?; + Ok(()) + } + + /// Apply updates that we received from the identity `from` + /// # Arguments + /// * `from` - identity of the sender of the updates + /// * `update_index` - the number of updates that `from` has completed and this set of `data` represents + /// * `data` - the update data + fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: Vec) { + trace!("got updates {}", data.len()); + // TODO we need to punish/spam resist here + // sig verify the whole update and slash anyone who sends a bad update + for v in data { + // TODO probably an error or attack + if v.id == self.me { + continue; + } + // TODO check that last_verified types are always increasing + self.insert(&v); + } + *self.remote.entry(from).or_insert(update_index) = update_index; + } + + /// randomly pick a node and ask them for updates asynchronously + pub fn gossip(obj: Arc>, exit: Arc) -> JoinHandle<()> { + spawn(move || loop { + let _ = Self::run_gossip(&obj); + if exit.load(Ordering::Relaxed) { + return; + } + //TODO this should be a tuned parameter + sleep(obj.read().unwrap().timeout); + }) + } + + /// Process messages from the network + fn run_listen(obj: &Arc>, sock: &UdpSocket) -> Result<()> { + //TODO cache connections + let mut buf = vec![0u8; 1024 * 64]; + let (amt, src) = sock.recv_from(&mut buf)?; + trace!("got request from {}", src); + buf.resize(amt, 0); + let r = deserialize(&buf)?; + match r { + // TODO sigverify these + Protocol::RequestUpdates(v, addr) => { + trace!("RequestUpdates {}", v); + // only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from` + let (from, ups, data) = obj.read().unwrap().get_updates_since(v); + trace!("get updates since response {} {}", v, data.len()); + let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?; + trace!("send_to {}", addr); + sock.send_to(&rsp, addr).unwrap(); + trace!("send_to done!"); + } + Protocol::ReceiveUpdates(from, ups, data) => { + trace!("ReceivedUpdates"); + obj.write().unwrap().apply_updates(from, ups, data); + } + } + Ok(()) + } + pub fn listen( + obj: Arc>, + sock: UdpSocket, + exit: Arc, + ) -> JoinHandle<()> { + sock.set_read_timeout(Some(Duration::new(2, 0))).unwrap(); + spawn(move || loop { + let _ = Self::run_listen(&obj, &sock); + if exit.load(Ordering::Relaxed) { + return; + } + }) + } +} + +#[cfg(test)] +mod test { + use crdt::{Crdt, ReplicatedData}; + use signature::KeyPair; + use signature::KeyPairUtil; + use std::net::UdpSocket; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Arc, RwLock}; + use std::thread::sleep; + use std::time::Duration; + + /// Test that the network converges. + /// Create a ring a -> b -> c -> d -> e -> a of size num. + /// Run until every node in the network has a full ReplicatedData set. + /// Check that nodes stop sending updates after all the ReplicatedData has been shared. + #[test] + fn gossip_test() { + let num: usize = 5; + let exit = Arc::new(AtomicBool::new(false)); + let listen: Vec<_> = (0..num) + .map(|_| { + let listener = UdpSocket::bind("0.0.0.0:0").unwrap(); + let pubkey = KeyPair::new().pubkey(); + let d = ReplicatedData::new(pubkey, listener.local_addr().unwrap()); + let crdt = Crdt::new(d); + let c = Arc::new(RwLock::new(crdt)); + let l = Crdt::listen(c.clone(), listener, exit.clone()); + (c, l) + }) + .collect(); + for n in 0..num { + let y = n % listen.len(); + let x = (n + 1) % listen.len(); + let mut xv = listen[x].0.write().unwrap(); + let yv = listen[y].0.read().unwrap(); + let mut d = yv.table[&yv.me].clone(); + d.version = 0; + xv.insert(&d); + } + let gossip: Vec<_> = listen + .iter() + .map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone())) + .collect(); + let mut done = true; + for _ in 0..(num * 16) { + done = true; + for &(ref c, _) in listen.iter() { + trace!( + "done updates {} {}", + c.read().unwrap().table.len(), + c.read().unwrap().update_index + ); + //make sure the number of updates doesn't grow unbounded + assert!(c.read().unwrap().update_index <= num as u64); + //make sure we got all the updates + if c.read().unwrap().table.len() != num { + done = false; + } + } + if done == true { + break; + } + sleep(Duration::new(1, 0)); + } + exit.store(true, Ordering::Relaxed); + for j in gossip { + j.join().unwrap(); + } + for (c, j) in listen.into_iter() { + j.join().unwrap(); + // make it clear what failed + // protocol is to chatty, updates should stop after everyone receives `num` + assert!(c.read().unwrap().update_index <= num as u64); + // protocol is not chatty enough, everyone should get `num` entries + assert_eq!(c.read().unwrap().table.len(), num); + } + assert!(done); + } + /// Test that insert drops messages that are older + #[test] + fn insert_test() { + let mut d = ReplicatedData::new(KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap()); + assert_eq!(d.version, 0); + let mut crdt = Crdt::new(d.clone()); + assert_eq!(crdt.table[&d.id].version, 0); + d.version = 2; + crdt.insert(&d); + assert_eq!(crdt.table[&d.id].version, 2); + d.version = 1; + crdt.insert(&d); + assert_eq!(crdt.table[&d.id].version, 2); + } + +} diff --git a/src/erasure.rs b/src/erasure.rs new file mode 100644 index 00000000000000..b8480a73d7529a --- /dev/null +++ b/src/erasure.rs @@ -0,0 +1,420 @@ +// Support erasure coding + +use packet::{BlobRecycler, SharedBlob}; +use std::result; + +//TODO(sakridge) pick these values +const NUM_CODED: usize = 10; +const MAX_MISSING: usize = 2; +const NUM_DATA: usize = NUM_CODED - MAX_MISSING; + +#[derive(Debug, PartialEq, Eq)] +pub enum ErasureError { + NotEnoughBlocksToDecode, + DecodeError, + InvalidBlockSize, +} + +pub type Result = result::Result; + +// k = number of data devices +// m = number of coding devices +// w = word size + +extern "C" { + fn jerasure_matrix_encode( + k: i32, + m: i32, + w: i32, + matrix: *const i32, + data_ptrs: *const *const u8, + coding_ptrs: *const *mut u8, + size: i32, + ); + fn jerasure_matrix_decode( + k: i32, + m: i32, + w: i32, + matrix: *const i32, + row_k_ones: i32, + erasures: *const i32, + data_ptrs: *const *mut u8, + coding_ptrs: *const *const u8, + size: i32, + ) -> i32; + fn galois_single_divide(a: i32, b: i32, w: i32) -> i32; +} + +fn get_matrix(m: i32, k: i32, w: i32) -> Vec { + let mut matrix = vec![0; (m * k) as usize]; + for i in 0..m { + for j in 0..k { + unsafe { + matrix[(i * k + j) as usize] = galois_single_divide(1, i ^ (m + j), w); + } + } + } + matrix +} + +pub const ERASURE_W: i32 = 32; + +// Generate coding blocks into coding +// There are some alignment restrictions, blocks should be aligned by 16 bytes +// which means their size should be >= 16 bytes +pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Result<()> { + if data.len() == 0 { + return Ok(()); + } + let m = coding.len() as i32; + let block_len = data[0].len(); + let matrix: Vec = get_matrix(m, data.len() as i32, ERASURE_W); + let mut coding_arg = Vec::new(); + let mut data_arg = Vec::new(); + for block in data { + if block_len != block.len() { + return Err(ErasureError::InvalidBlockSize); + } + data_arg.push(block.as_ptr()); + } + for mut block in coding { + if block_len != block.len() { + return Err(ErasureError::InvalidBlockSize); + } + coding_arg.push(block.as_mut_ptr()); + } + + unsafe { + jerasure_matrix_encode( + data.len() as i32, + m, + ERASURE_W, + matrix.as_ptr(), + data_arg.as_ptr(), + coding_arg.as_ptr(), + data[0].len() as i32, + ); + } + Ok(()) +} + +// Recover data + coding blocks into data blocks +// data: array of blocks to recover into +// coding: arry of coding blocks +// erasures: list of indices in data where blocks should be recovered +pub fn decode_blocks(data: &mut [&mut [u8]], coding: &[&[u8]], erasures: &[i32]) -> Result<()> { + if data.len() == 0 { + return Ok(()); + } + let block_len = data[0].len(); + let matrix: Vec = get_matrix(coding.len() as i32, data.len() as i32, ERASURE_W); + + // generate coding pointers, blocks should be the same size + let mut coding_arg: Vec<*const u8> = Vec::new(); + for x in coding.iter() { + if x.len() != block_len { + return Err(ErasureError::InvalidBlockSize); + } + coding_arg.push(x.as_ptr()); + } + + // generate data pointers, blocks should be the same size + let mut data_arg: Vec<*mut u8> = Vec::new(); + for x in data.iter_mut() { + if x.len() != block_len { + return Err(ErasureError::InvalidBlockSize); + } + data_arg.push(x.as_mut_ptr()); + } + unsafe { + let ret = jerasure_matrix_decode( + data.len() as i32, + coding.len() as i32, + ERASURE_W, + matrix.as_ptr(), + 0, + erasures.as_ptr(), + data_arg.as_ptr(), + coding_arg.as_ptr(), + data[0].len() as i32, + ); + trace!("jerasure_matrix_decode ret: {}", ret); + for x in data[erasures[0] as usize][0..8].iter() { + trace!("{} ", x) + } + trace!(""); + if ret < 0 { + return Err(ErasureError::DecodeError); + } + } + Ok(()) +} + +// Generate coding blocks in window from consumed to consumed+NUM_DATA +pub fn generate_coding( + re: &BlobRecycler, + window: &mut Vec>, + consumed: usize, +) -> Result<()> { + let mut data_blobs = Vec::new(); + let mut coding_blobs = Vec::new(); + let mut data_locks = Vec::new(); + let mut data_ptrs: Vec<&[u8]> = Vec::new(); + let mut coding_locks = Vec::new(); + let mut coding_ptrs: Vec<&mut [u8]> = Vec::new(); + for i in consumed..consumed + NUM_DATA { + let n = i % window.len(); + data_blobs.push(window[n].clone().unwrap()); + } + for b in &data_blobs { + data_locks.push(b.write().unwrap()); + } + for (i, l) in data_locks.iter_mut().enumerate() { + trace!("i: {} data: {}", i, l.data[0]); + data_ptrs.push(&l.data); + } + + // generate coding ptr array + let coding_start = consumed + NUM_DATA; + let coding_end = consumed + NUM_CODED; + for i in coding_start..coding_end { + let n = i % window.len(); + window[n] = Some(re.allocate()); + coding_blobs.push(window[n].clone().unwrap()); + } + for b in &coding_blobs { + coding_locks.push(b.write().unwrap()); + } + for (i, l) in coding_locks.iter_mut().enumerate() { + trace!("i: {} data: {}", i, l.data[0]); + coding_ptrs.push(&mut l.data); + } + + generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; + trace!("consumed: {}", consumed); + Ok(()) +} + +// Recover missing blocks into window +// missing blocks should be None, will use re +// to allocate new ones. Returns err if not enough +// coding blocks are present to restore +pub fn recover( + re: &BlobRecycler, + window: &mut Vec>, + consumed: usize, +) -> Result<()> { + //recover with erasure coding + let mut data_missing = 0; + let mut coded_missing = 0; + let coding_start = consumed + NUM_DATA; + let coding_end = consumed + NUM_CODED; + for i in consumed..coding_end { + let n = i % window.len(); + if window[n].is_none() { + if i >= coding_start { + coded_missing += 1; + } else { + data_missing += 1; + } + } + } + trace!("missing: data: {} coding: {}", data_missing, coded_missing); + if data_missing > 0 { + if (data_missing + coded_missing) <= MAX_MISSING { + let mut blobs: Vec = Vec::new(); + let mut locks = Vec::new(); + let mut data_ptrs: Vec<&mut [u8]> = Vec::new(); + let mut coding_ptrs: Vec<&[u8]> = Vec::new(); + let mut erasures: Vec = Vec::new(); + for i in consumed..coding_end { + let j = i % window.len(); + let mut b = &mut window[j]; + if b.is_some() { + blobs.push(b.clone().unwrap()); + continue; + } + let n = re.allocate(); + *b = Some(n.clone()); + //mark the missing memory + blobs.push(n); + erasures.push((i - consumed) as i32); + } + erasures.push(-1); + trace!("erasures: {:?}", erasures); + //lock everything + for b in &blobs { + locks.push(b.write().unwrap()); + } + for (i, l) in locks.iter_mut().enumerate() { + if i >= NUM_DATA { + trace!("pushing coding: {}", i); + coding_ptrs.push(&l.data); + } else { + trace!("pushing data: {}", i); + data_ptrs.push(&mut l.data); + } + } + trace!( + "coding_ptrs.len: {} data_ptrs.len {}", + coding_ptrs.len(), + data_ptrs.len() + ); + decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?; + } else { + return Err(ErasureError::NotEnoughBlocksToDecode); + } + } + Ok(()) +} + +#[cfg(test)] +mod test { + use erasure; + use packet::{BlobRecycler, SharedBlob, PACKET_DATA_SIZE}; + extern crate env_logger; + + #[test] + pub fn test_coding() { + let zero_vec = vec![0; 16]; + let mut vs: Vec> = (0..4).map(|i| (i..(16 + i)).collect()).collect(); + let v_orig: Vec = vs[0].clone(); + + let m = 2; + let mut coding_blocks: Vec<_> = (0..m).map(|_| vec![0u8; 16]).collect(); + + { + let mut coding_blocks_slices: Vec<_> = + coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect(); + let v_slices: Vec<_> = vs.iter().map(|x| x.as_slice()).collect(); + + assert!( + erasure::generate_coding_blocks( + coding_blocks_slices.as_mut_slice(), + v_slices.as_slice() + ).is_ok() + ); + } + trace!("coding blocks:"); + for b in &coding_blocks { + trace!("{:?}", b); + } + let erasure: i32 = 1; + let erasures = vec![erasure, -1]; + // clear an entry + vs[erasure as usize].copy_from_slice(zero_vec.as_slice()); + + { + let coding_blocks_slices: Vec<_> = coding_blocks.iter().map(|x| x.as_slice()).collect(); + let mut v_slices: Vec<_> = vs.iter_mut().map(|x| x.as_mut_slice()).collect(); + + assert!( + erasure::decode_blocks( + v_slices.as_mut_slice(), + coding_blocks_slices.as_slice(), + erasures.as_slice(), + ).is_ok() + ); + } + + trace!("vs:"); + for v in &vs { + trace!("{:?}", v); + } + assert_eq!(v_orig, vs[0]); + } + + fn print_window(window: &Vec>) { + for (i, w) in window.iter().enumerate() { + print!("window({}): ", i); + if w.is_some() { + let window_lock = w.clone().unwrap(); + let window_data = window_lock.read().unwrap().data; + for i in 0..8 { + print!("{} ", window_data[i]); + } + } else { + print!("null"); + } + println!(""); + } + } + + #[test] + pub fn test_window_recover() { + let mut window = Vec::new(); + let blob_recycler = BlobRecycler::default(); + let offset = 4; + for i in 0..(4 * erasure::NUM_CODED + 1) { + let b = blob_recycler.allocate(); + let b_ = b.clone(); + let data_len = b.read().unwrap().data.len(); + let mut w = b.write().unwrap(); + w.set_index(i as u64).unwrap(); + assert_eq!(i as u64, w.get_index().unwrap()); + w.meta.size = PACKET_DATA_SIZE; + for k in 0..data_len { + w.data[k] = (k + i) as u8; + } + window.push(Some(b_)); + } + println!("** after-gen:"); + print_window(&window); + assert!(erasure::generate_coding(&blob_recycler, &mut window, offset).is_ok()); + assert!( + erasure::generate_coding(&blob_recycler, &mut window, offset + erasure::NUM_CODED) + .is_ok() + ); + assert!( + erasure::generate_coding( + &blob_recycler, + &mut window, + offset + (2 * erasure::NUM_CODED) + ).is_ok() + ); + assert!( + erasure::generate_coding( + &blob_recycler, + &mut window, + offset + (3 * erasure::NUM_CODED) + ).is_ok() + ); + println!("** after-coding:"); + print_window(&window); + let refwindow = window[offset + 1].clone(); + window[offset + 1] = None; + window[offset + 2] = None; + window[offset + erasure::NUM_CODED + 3] = None; + window[offset + (2 * erasure::NUM_CODED) + 0] = None; + window[offset + (2 * erasure::NUM_CODED) + 1] = None; + window[offset + (2 * erasure::NUM_CODED) + 2] = None; + let window_l0 = &(window[offset + (3 * erasure::NUM_CODED)]).clone().unwrap(); + window_l0.write().unwrap().data[0] = 55; + println!("** after-nulling:"); + print_window(&window); + assert!(erasure::recover(&blob_recycler, &mut window, offset).is_ok()); + assert!(erasure::recover(&blob_recycler, &mut window, offset + erasure::NUM_CODED).is_ok()); + assert!( + erasure::recover( + &blob_recycler, + &mut window, + offset + (2 * erasure::NUM_CODED) + ).is_err() + ); + assert!( + erasure::recover( + &blob_recycler, + &mut window, + offset + (3 * erasure::NUM_CODED) + ).is_ok() + ); + println!("** after-restore:"); + print_window(&window); + let window_l = window[offset + 1].clone().unwrap(); + let ref_l = refwindow.clone().unwrap(); + assert_eq!( + window_l.read().unwrap().data.to_vec(), + ref_l.read().unwrap().data.to_vec() + ); + } +} diff --git a/src/historian.rs b/src/historian.rs index 93c49f733fe56a..412027846fab03 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -16,8 +16,8 @@ pub struct Historian { impl Historian { pub fn new(start_hash: &Hash, ms_per_tick: Option) -> Self { - let (sender, event_receiver) = sync_channel(1000); - let (entry_sender, receiver) = sync_channel(1000); + let (sender, event_receiver) = sync_channel(10_000); + let (entry_sender, receiver) = sync_channel(10_000); let thread_hdl = Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian { diff --git a/src/lib.rs b/src/lib.rs index 54ae1656b8e266..90200b5b84489b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,8 +2,11 @@ pub mod accountant; pub mod accountant_skel; pub mod accountant_stub; +pub mod crdt; pub mod ecdsa; pub mod entry; +#[cfg(feature = "erasure")] +pub mod erasure; pub mod event; pub mod hash; pub mod historian; diff --git a/src/result.rs b/src/result.rs index 9b3c17a3695fde..01872dfbe1138c 100644 --- a/src/result.rs +++ b/src/result.rs @@ -4,6 +4,7 @@ use bincode; use serde_json; use std; use std::any::Any; +use accountant; #[derive(Debug)] pub enum Error { @@ -14,6 +15,7 @@ pub enum Error { RecvError(std::sync::mpsc::RecvError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), + AccountingError(accountant::AccountingError), SendError, Services, } @@ -30,6 +32,11 @@ impl std::convert::From for Error { Error::RecvTimeoutError(e) } } +impl std::convert::From for Error { + fn from(e: accountant::AccountingError) -> Error { + Error::AccountingError(e) + } +} impl std::convert::From> for Error { fn from(_e: std::sync::mpsc::SendError) -> Error { Error::SendError diff --git a/src/streamer.rs b/src/streamer.rs index 368365ec7f4550..0fd8c1b758204d 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -8,7 +8,7 @@ use std::sync::mpsc; use std::sync::{Arc, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; -use subscribers; +use subscribers::Subscribers; pub type PacketReceiver = mpsc::Receiver; pub type PacketSender = mpsc::Sender; @@ -106,12 +106,12 @@ pub fn blob_receiver( fn recv_window( window: &mut Vec>, - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, consumed: &mut usize, r: &BlobReceiver, s: &BlobSender, - cast: &BlobSender, + retransmit: &BlobSender, ) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; @@ -120,7 +120,7 @@ fn recv_window( } { //retransmit all leader blocks - let mut castq = VecDeque::new(); + let mut retransmitq = VecDeque::new(); let rsubs = subs.read().unwrap(); for b in &dq { let p = b.read().unwrap(); @@ -141,18 +141,18 @@ fn recv_window( mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); } - castq.push_back(nv); + retransmitq.push_back(nv); } } - if !castq.is_empty() { - cast.send(castq)?; + if !retransmitq.is_empty() { + retransmit.send(retransmitq)?; } } //send a contiguous set of blocks let mut contq = VecDeque::new(); while let Some(b) = dq.pop_front() { let b_ = b.clone(); - let mut p = b.write().unwrap(); + let p = b.write().unwrap(); let pix = p.get_index()? as usize; let w = pix % NUM_BLOBS; //TODO, after the block are authenticated @@ -183,11 +183,11 @@ fn recv_window( pub fn window( exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, s: BlobSender, - cast: BlobSender, + retransmit: BlobSender, ) -> JoinHandle<()> { spawn(move || { let mut window = vec![None; NUM_BLOBS]; @@ -196,13 +196,13 @@ pub fn window( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &cast); + let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &retransmit); } }) } fn retransmit( - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, r: &BlobReceiver, sock: &UdpSocket, @@ -237,7 +237,7 @@ fn retransmit( pub fn retransmitter( sock: UdpSocket, exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, ) -> JoinHandle<()> { @@ -442,20 +442,21 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::new([0; 8], 0, send.local_addr().unwrap()), + &[], ))); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap(); let (s_window, r_window) = channel(); - let (s_cast, r_cast) = channel(); + let (s_retransmit, r_retransmit) = channel(); let t_window = window( exit.clone(), subs, resp_recycler.clone(), r_reader, s_window, - s_cast, + s_retransmit, ); let (s_responder, r_responder) = channel(); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); @@ -475,8 +476,8 @@ mod test { let mut num = 0; get_blobs(r_window, &mut num); assert_eq!(num, 10); - let mut q = r_cast.recv().unwrap(); - while let Ok(mut nq) = r_cast.try_recv() { + let mut q = r_retransmit.recv().unwrap(); + while let Ok(mut nq) = r_retransmit.try_recv() { q.append(&mut nq); } assert_eq!(q.len(), 10); @@ -494,9 +495,8 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::default(), + &[Node::new([0; 8], 1, read.local_addr().unwrap())] ))); - let n3 = Node::new([0; 8], 1, read.local_addr().unwrap()); - subs.write().unwrap().insert(&[n3]); let (s_retransmit, r_retransmit) = channel(); let blob_recycler = BlobRecycler::default(); let saddr = send.local_addr().unwrap(); diff --git a/src/subscribers.rs b/src/subscribers.rs index 153246d12adb31..b81a54941b599a 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -40,18 +40,19 @@ impl Node { pub struct Subscribers { data: Vec, - me: Node, + pub me: Node, pub leader: Node, } impl Subscribers { - pub fn new(me: Node, leader: Node) -> Subscribers { + pub fn new(me: Node, leader: Node, network: &[Node]) -> Subscribers { let mut h = Subscribers { data: vec![], me: me.clone(), leader: leader.clone(), }; h.insert(&[me, leader]); + h.insert(network); h } @@ -99,7 +100,7 @@ mod test { me.weight = 10; let mut leader = Node::default(); leader.weight = 11; - let mut s = Subscribers::new(me, leader); + let mut s = Subscribers::new(me, leader, &[]); assert_eq!(s.data.len(), 2); assert_eq!(s.data[0].weight, 11); assert_eq!(s.data[1].weight, 10); @@ -116,7 +117,7 @@ mod test { let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind"); let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap()); let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap()); - let mut s = Subscribers::new(n1.clone(), n2.clone()); + let mut s = Subscribers::new(n1.clone(), n2.clone(), &[]); let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap()); s.insert(&[n3]); let mut b = Blob::default();