Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

[wip] replicator #130

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
1f456c5
state replication
aeyakovenko Apr 18, 2018
efeebf5
wip
aeyakovenko Apr 18, 2018
d620ce5
update
aeyakovenko Apr 18, 2018
073e5ab
docs
aeyakovenko Apr 18, 2018
0c610c4
docs
aeyakovenko Apr 18, 2018
5e11078
Add Stephen
garious Apr 18, 2018
dea5ab2
Add erasure rust logic under feature flag
sakridge Apr 19, 2018
e949211
Merge pull request #131 from sakridge/erasure
garious Apr 19, 2018
3622533
wip
aeyakovenko Apr 19, 2018
d366a07
add gregs abstract as an intro
aeyakovenko Apr 19, 2018
903374a
Merge pull request #132 from aeyakovenko/readme
garious Apr 19, 2018
13a2f05
Remove out for immutable variable
kination Apr 19, 2018
64e2f1b
Merge pull request #133 from djKooks/rm-mut
garious Apr 19, 2018
b91f6bc
report parse errors to stderr
rleungx Apr 19, 2018
43e6741
Merge pull request #134 from rleungx/report-parse-errors-to-stderr
garious Apr 19, 2018
60015ae
report serde parse errors to stderr
rleungx Apr 19, 2018
c6048e2
Workaround linux hang
garious Apr 19, 2018
ca877e6
Merge pull request #136 from rleungx/report-errors-to-stderr
garious Apr 19, 2018
8181bc5
Add -h/--help options for client-demo and testnode
sakridge Apr 19, 2018
7390d6c
update
aeyakovenko Apr 19, 2018
89bf376
Merge pull request #138 from sakridge/help_options
garious Apr 19, 2018
10a0c47
Merge pull request #137 from garious/linux-hang
garious Apr 19, 2018
8cbb7d7
git clone instruction
Apr 20, 2018
39df21d
Merge pull request #142 from ansrivas/master
garious Apr 20, 2018
3da1fa4
improve the error messages
rleungx Apr 21, 2018
041de80
Merge pull request #144 from rleungx/improve-error-messages
garious Apr 21, 2018
3d7969d
initial crdt implementation
aeyakovenko Apr 21, 2018
55b8d0d
cleanup
aeyakovenko Apr 24, 2018
0b39c6f
Merge pull request #145 from aeyakovenko/crdt
aeyakovenko Apr 24, 2018
d69a86a
state replication
aeyakovenko Apr 18, 2018
c579f9b
wip
aeyakovenko Apr 18, 2018
3bd74cc
update
aeyakovenko Apr 18, 2018
97176dc
docs
aeyakovenko Apr 18, 2018
e493d70
docs
aeyakovenko Apr 18, 2018
37448de
wip
aeyakovenko Apr 19, 2018
9dfd18a
update
aeyakovenko Apr 19, 2018
92b8bb6
Fix some compilation issues
sakridge Apr 19, 2018
58d1ddd
Work on test_replicate to test replicate service
sakridge Apr 24, 2018
3789405
update
aeyakovenko Apr 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 68 additions & 3 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::io::Write;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
Expand Down Expand Up @@ -247,6 +247,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}

/// Create a UDP microservice that forwards messages the given AccountantSkel.
/// This service is the network leader
/// Set `exit` to shutdown its threads.
pub fn serve(
obj: &Arc<Mutex<AccountantSkel<W>>>,
Expand All @@ -271,15 +272,15 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {

let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::verifier(&packet_receiver, &verified_sender);
let e = Self::blob_verifier(&blob_receiver, &verified_sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = AccountantSkel::process(
let e = Self::process(
&skel,
&verified_receiver,
&blob_sender,
Expand All @@ -292,6 +293,70 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
});
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
}

/// This service receives messages from a leader in the network and processes the transactions
/// on the accountant state.
/// # Arguments
/// * `obj` - The accoutnant state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accountant

/// * `rsubs` - The subscribers.
/// * `exit` - The exit signal.
/// # Remarks
/// The pipeline is constructed as follows
/// 1. receive blobs from the network, these are out of order
/// 2. verify blobs, PoH, signatures (TODO)
/// 3. reconstruct contiguous window
/// a. order the blobs
/// b. use erasure coding to reconstruct missing blobs
/// c. ask the network for missing blobs
/// d. make sure that the blobs PoH sequences connect (TODO)
/// 4. process the transaction state machine
/// 5. respond with the hash of the state back to the leader
pub fn replicate(
obj: &Arc<Mutex<AccountantSkel<W>>>,
rsubs: Subscribers,
exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> {
let read = UdpSocket::bind(rsubs.me.addr)?;
// make sure we are on the same interface
let mut local = read.local_addr()?;
local.set_port(0);
let write = UdpSocket::bind(local)?;

let blob_recycler = packet::BlobRecycler::default();
let (blob_sender, blob_receiver) = channel();
let t_blob_receiver =
streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?;
let (window_sender, window_receiver) = channel();
let (retransmit_sender, retransmit_receiver) = channel();

let subs = Arc::new(RwLock::new(rsubs));
let t_retransmit = retransmitter(
write,
exit.clone(),
subs,
blob_recycler.clone(),
retransmit_receiver,
);
//TODO
//the packets comming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction
let t_window = streamer::window(
exit.clone(),
blob_recycler.clone(),
blob_receiver,
window_sender,
retransmit_sender,
);

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
});
Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server])
}
}

#[cfg(test)]
Expand Down
5 changes: 3 additions & 2 deletions src/subscribers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ impl Node {

pub struct Subscribers {
data: Vec<Node>,
me: Node,
pub me: Node,
pub leader: Node,
}

impl Subscribers {
pub fn new(me: Node, leader: Node) -> Subscribers {
pub fn new(me: Node, leader: Node, network: &[Node]) -> Subscribers {
let mut h = Subscribers {
data: vec![],
me: me.clone(),
leader: leader.clone(),
};
h.insert(&[me, leader]);
h.insert(network);
h
}

Expand Down