Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More refactoring #115

Merged
merged 4 commits into from
Apr 11, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 44 additions & 38 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ impl Request {
}
}

/// Parallel verfication of a batch of requests.
pub fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, SocketAddr)> {
reqs.into_par_iter().filter({ |x| x.0.verify() }).collect()
}

#[derive(Serialize, Deserialize, Debug)]
pub enum Response {
Balance { key: PublicKey, val: Option<i64> },
Expand Down Expand Up @@ -112,28 +107,39 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}
}

fn verifier(
recvr: &streamer::PacketReceiver,
sendr: &Sender<(Vec<SharedPackets>, Vec<Vec<u8>>)>,
) -> Result<()> {
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<Vec<SharedPackets>> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
trace!("got msgs");
let mut v = Vec::new();
v.push(msgs);
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
trace!("got more msgs");
v.push(more);
batch.push(more);
}
info!("batch {}", v.len());
let chunk = max(1, (v.len() + 3) / 4);
let chunks: Vec<_> = v.chunks(chunk).collect();
let rvs: Vec<_> = chunks
info!("batch len {}", batch.len());
Ok(batch)
}

fn verify_batch(batch: Vec<SharedPackets>) -> Vec<Vec<(SharedPackets, Vec<u8>)>> {
let chunk_size = max(1, (batch.len() + 3) / 4);
let batches: Vec<_> = batch.chunks(chunk_size).map(|x| x.to_vec()).collect();
batches
.into_par_iter()
.map(|x| ecdsa::ed25519_verify(&x.to_vec()))
.collect();
for (v, r) in v.chunks(chunk).zip(rvs) {
sendr.send((v.to_vec(), r))?;
.map(|batch| {
let r = ecdsa::ed25519_verify(&batch);
batch.into_iter().zip(r).collect()
})
.collect()
}

fn verifier(
recvr: &streamer::PacketReceiver,
sendr: &Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Result<()> {
let batch = Self::recv_batch(recvr)?;
let verified_batches = Self::verify_batch(batch);
for xs in verified_batches {
sendr.send(xs)?;
}
Ok(())
}
Expand All @@ -151,21 +157,16 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {

fn process_packets(
obj: &Arc<Mutex<AccountantSkel<W>>>,
reqs: Vec<Option<(Request, SocketAddr)>>,
vers: Vec<u8>,
req_vers: Vec<(Request, SocketAddr, u8)>,
) -> Vec<(Response, SocketAddr)> {
let mut rsps = Vec::new();
for (data, v) in reqs.into_iter().zip(vers.into_iter()) {
if let Some((req, rsp_addr)) = data {
if !req.verify() {
continue;
}
if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) {
rsps.push((resp, rsp_addr));
}
}
}
rsps
req_vers
.into_iter()
.filter_map(|(req, rsp_addr, v)| {
let mut skel = obj.lock().unwrap();
skel.log_verified_request(req, v)
.map(|resp| (resp, rsp_addr))
})
.collect()
}

fn serialize_response(
Expand Down Expand Up @@ -198,16 +199,21 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {

fn process(
obj: &Arc<Mutex<AccountantSkel<W>>>,
verified_receiver: &Receiver<(Vec<SharedPackets>, Vec<Vec<u8>>)>,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
blob_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let (mms, vvs) = verified_receiver.recv_timeout(timer)?;
for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) {
let mms = verified_receiver.recv_timeout(timer)?;
for (msgs, vers) in mms {
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
let rsps = Self::process_packets(obj, reqs, vers);
let req_vers = reqs.into_iter()
.zip(vers)
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
.filter(|x| x.0.verify())
.collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to be one big alloc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's using into_iter(). Should reduce to memmove().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vec<Vec<>> into Vec, right? So the output is going to be all of the requests we just pulled up. basically this will kill the improvement we are trying to get with the allocator, since it will allocate all the space for all the IO buffers copy into it, and them drop that buffer later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or do you mean its smart enough to know about the into_iter on line 163 and compile the allocation out?

let rsps = Self::process_packets(obj, req_vers);
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
if !blobs.is_empty() {
//don't wake up the other side if there is nothing
Expand Down