Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Boot CRDT from TPU and added Fetch stage #278

Merged
merged 3 commits into from
May 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/entry_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use bank::Bank;
use entry::Entry;
use ledger;
use ledger::Block;
use packet;
use result::Result;
use serde_json;
Expand Down Expand Up @@ -63,7 +63,7 @@ impl<'a> EntryWriter<'a> {
let mut q = VecDeque::new();
let list = self.write_entries(writer, entry_receiver)?;
trace!("New blobs? {}", list.len());
ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
list.to_blobs(blob_recycler, &mut q);
if !q.is_empty() {
trace!("broadcasting {}", q.len());
broadcast.send(q)?;
Expand Down
31 changes: 31 additions & 0 deletions src/fetch_stage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//! The `fetch_stage` batches input from a UDP socket and sends it to a channel.

use packet;
use std::net::UdpSocket;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::thread::JoinHandle;
use streamer;

pub struct FetchStage {
pub packet_receiver: streamer::PacketReceiver,
pub thread_hdl: JoinHandle<()>,
}

impl FetchStage {
pub fn new(
socket: UdpSocket,
exit: Arc<AtomicBool>,
packet_recycler: packet::PacketRecycler,
) -> Self {
let (packet_sender, packet_receiver) = channel();
let thread_hdl =
streamer::receiver(socket, exit.clone(), packet_recycler.clone(), packet_sender);

FetchStage {
packet_receiver,
thread_hdl,
}
}
}
132 changes: 64 additions & 68 deletions src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use transaction::Transaction;
pub trait Block {
/// Verifies the hashes and counts of a slice of transactions are all consistent.
fn verify(&self, start_hash: &Hash) -> bool;
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque<SharedBlob>);
}

impl Block for [Entry] {
Expand All @@ -24,6 +25,66 @@ impl Block for [Entry] {
let entry_pairs = genesis.par_iter().chain(self).zip(self);
entry_pairs.all(|(x0, x1)| x1.verify(&x0.id))
}

fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque<SharedBlob>) {
let mut start = 0;
let mut end = 0;
while start < self.len() {
let mut entries: Vec<Vec<Entry>> = Vec::new();
let mut total = 0;
for i in &self[start..] {
total += size_of::<Transaction>() * i.transactions.len();
total += size_of::<Entry>();
if total >= BLOB_DATA_SIZE {
break;
}
end += 1;
}
// See if we need to split the transactions
if end <= start {
let mut transaction_start = 0;
let num_transactions_per_blob = BLOB_DATA_SIZE / size_of::<Transaction>();
let total_entry_chunks = (self[end].transactions.len() + num_transactions_per_blob
- 1) / num_transactions_per_blob;
trace!(
"splitting transactions end: {} total_chunks: {}",
end,
total_entry_chunks
);
for _ in 0..total_entry_chunks {
let transaction_end = min(
transaction_start + num_transactions_per_blob,
self[end].transactions.len(),
);
let mut entry = Entry {
num_hashes: self[end].num_hashes,
id: self[end].id,
transactions: self[end].transactions[transaction_start..transaction_end]
.to_vec(),
};
entries.push(vec![entry]);
transaction_start = transaction_end;
}
end += 1;
} else {
entries.push(self[start..end].to_vec());
}

for entry in entries {
let b = blob_recycler.allocate();
let pos = {
let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &entry).expect("failed to serialize output");
out.position() as usize
};
assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos);
q.push_back(b);
}
start = end;
}
}
}

/// Create a vector of Entries of length `transaction_batches.len()` from `start_hash` hash, `num_hashes`, and `transaction_batches`.
Expand All @@ -43,70 +104,6 @@ pub fn next_entries(
entries
}

pub fn process_entry_list_into_blobs(
list: &Vec<Entry>,
blob_recycler: &packet::BlobRecycler,
q: &mut VecDeque<SharedBlob>,
) {
let mut start = 0;
let mut end = 0;
while start < list.len() {
let mut entries: Vec<Vec<Entry>> = Vec::new();
let mut total = 0;
for i in &list[start..] {
total += size_of::<Transaction>() * i.transactions.len();
total += size_of::<Entry>();
if total >= BLOB_DATA_SIZE {
break;
}
end += 1;
}
// See if we need to split the transactions
if end <= start {
let mut transaction_start = 0;
let num_transactions_per_blob = BLOB_DATA_SIZE / size_of::<Transaction>();
let total_entry_chunks = (list[end].transactions.len() + num_transactions_per_blob - 1)
/ num_transactions_per_blob;
trace!(
"splitting transactions end: {} total_chunks: {}",
end,
total_entry_chunks
);
for _ in 0..total_entry_chunks {
let transaction_end = min(
transaction_start + num_transactions_per_blob,
list[end].transactions.len(),
);
let mut entry = Entry {
num_hashes: list[end].num_hashes,
id: list[end].id,
transactions: list[end].transactions[transaction_start..transaction_end]
.to_vec(),
};
entries.push(vec![entry]);
transaction_start = transaction_end;
}
end += 1;
} else {
entries.push(list[start..end].to_vec());
}

for entry in entries {
let b = blob_recycler.allocate();
let pos = {
let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &entry).expect("failed to serialize output");
out.position() as usize
};
assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos);
q.push_back(b);
}
start = end;
}
}

pub fn reconstruct_entries_from_blobs(blobs: &VecDeque<SharedBlob>) -> Vec<Entry> {
let mut entries_to_apply: Vec<Entry> = Vec::new();
let mut last_id = Hash::default();
Expand Down Expand Up @@ -159,13 +156,12 @@ mod tests {
let transactions = vec![tx0.clone(); 10000];
let e0 = Entry::new(&zero, 0, transactions);

let entry_list = vec![e0.clone(); 1];
let entries = vec![e0.clone(); 1];
let blob_recycler = BlobRecycler::default();
let mut blob_q = VecDeque::new();
process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
let entries = reconstruct_entries_from_blobs(&blob_q);
entries.to_blobs(&blob_recycler, &mut blob_q);

assert_eq!(entry_list, entries);
assert_eq!(reconstruct_entries_from_blobs(&blob_q), entries);
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod entry;
pub mod entry_writer;
#[cfg(feature = "erasure")]
pub mod erasure;
pub mod fetch_stage;
pub mod hash;
pub mod ledger;
pub mod logger;
Expand Down
28 changes: 23 additions & 5 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
//! The `server` module hosts all the server microservices.

use bank::Bank;
use crdt::ReplicatedData;
use crdt::{Crdt, ReplicatedData};
use hash::Hash;
use packet;
use rpu::Rpu;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use std::time::Duration;
use streamer;
use tpu::Tpu;
use tvu::Tvu;

Expand All @@ -35,18 +37,34 @@ impl Server {
let mut thread_hdls = vec![];
let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone());
thread_hdls.extend(rpu.thread_hdls);

let blob_recycler = packet::BlobRecycler::default();
let tpu = Tpu::new(
bank.clone(),
start_hash,
tick_duration,
me,
transactions_socket,
broadcast_socket,
gossip_socket,
blob_recycler.clone(),
exit.clone(),
writer,
);
thread_hdls.extend(tpu.thread_hdls);

let crdt = Arc::new(RwLock::new(Crdt::new(me)));
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let window = streamer::default_window();
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip_socket, exit.clone());

let t_broadcast = streamer::broadcaster(
broadcast_socket,
exit.clone(),
crdt.clone(),
window,
blob_recycler.clone(),
tpu.blob_receiver,
);
thread_hdls.extend(vec![t_gossip, t_listen, t_broadcast]);

Server { thread_hdls }
}
pub fn new_validator(
Expand Down
53 changes: 16 additions & 37 deletions src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@

use bank::Bank;
use banking_stage::BankingStage;
use crdt::{Crdt, ReplicatedData};
use fetch_stage::FetchStage;
use hash::Hash;
use packet;
use packet::{BlobRecycler, PacketRecycler};
use record_stage::RecordStage;
use sigverify_stage::SigVerifyStage;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::Duration;
use streamer;
use streamer::BlobReceiver;
use write_stage::WriteStage;

pub struct Tpu {
pub blob_receiver: BlobReceiver,
pub thread_hdls: Vec<JoinHandle<()>>,
}

Expand All @@ -27,25 +27,18 @@ impl Tpu {
bank: Arc<Bank>,
start_hash: Hash,
tick_duration: Option<Duration>,
me: ReplicatedData,
transactions_socket: UdpSocket,
broadcast_socket: UdpSocket,
gossip: UdpSocket,
blob_recycler: BlobRecycler,
exit: Arc<AtomicBool>,
writer: W,
) -> Self {
let packet_recycler = packet::PacketRecycler::default();
let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver(
transactions_socket,
exit.clone(),
packet_recycler.clone(),
packet_sender,
);
let packet_recycler = PacketRecycler::default();

let sigverify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
let fetch_stage =
FetchStage::new(transactions_socket, exit.clone(), packet_recycler.clone());

let sigverify_stage = SigVerifyStage::new(exit.clone(), fetch_stage.packet_receiver);

let blob_recycler = packet::BlobRecycler::default();
let banking_stage = BankingStage::new(
bank.clone(),
exit.clone(),
Expand All @@ -64,30 +57,16 @@ impl Tpu {
record_stage.entry_receiver,
);

let crdt = Arc::new(RwLock::new(Crdt::new(me)));
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let window = streamer::default_window();
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone());

let t_broadcast = streamer::broadcaster(
broadcast_socket,
exit.clone(),
crdt.clone(),
window,
blob_recycler.clone(),
write_stage.blob_receiver,
);

let mut thread_hdls = vec![
t_receiver,
fetch_stage.thread_hdl,
banking_stage.thread_hdl,
record_stage.thread_hdl,
write_stage.thread_hdl,
t_gossip,
t_listen,
t_broadcast,
];
thread_hdls.extend(sigverify_stage.thread_hdls.into_iter());
Tpu { thread_hdls }
Tpu {
blob_receiver: write_stage.blob_receiver,
thread_hdls,
}
}
}