diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index df2c19f022990a..47632cd2ed3634 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -53,11 +53,6 @@ impl Request { } } -/// Parallel verfication of a batch of requests. -pub fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, SocketAddr)> { - reqs.into_par_iter().filter({ |x| x.0.verify() }).collect() -} - #[derive(Serialize, Deserialize, Debug)] pub enum Response { Balance { key: PublicKey, val: Option }, @@ -112,28 +107,39 @@ impl AccountantSkel { } } - fn verifier( - recvr: &streamer::PacketReceiver, - sendr: &Sender<(Vec, Vec>)>, - ) -> Result<()> { + fn recv_batch(recvr: &streamer::PacketReceiver) -> Result> { let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; trace!("got msgs"); - let mut v = Vec::new(); - v.push(msgs); + let mut batch = vec![msgs]; while let Ok(more) = recvr.try_recv() { trace!("got more msgs"); - v.push(more); + batch.push(more); } - info!("batch {}", v.len()); - let chunk = max(1, (v.len() + 3) / 4); - let chunks: Vec<_> = v.chunks(chunk).collect(); - let rvs: Vec<_> = chunks + info!("batch len {}", batch.len()); + Ok(batch) + } + + fn verify_batch(batch: Vec) -> Vec)>> { + let chunk_size = max(1, (batch.len() + 3) / 4); + let batches: Vec<_> = batch.chunks(chunk_size).map(|x| x.to_vec()).collect(); + batches .into_par_iter() - .map(|x| ecdsa::ed25519_verify(&x.to_vec())) - .collect(); - for (v, r) in v.chunks(chunk).zip(rvs) { - sendr.send((v.to_vec(), r))?; + .map(|batch| { + let r = ecdsa::ed25519_verify(&batch); + batch.into_iter().zip(r).collect() + }) + .collect() + } + + fn verifier( + recvr: &streamer::PacketReceiver, + sendr: &Sender)>>, + ) -> Result<()> { + let batch = Self::recv_batch(recvr)?; + let verified_batches = Self::verify_batch(batch); + for xs in verified_batches { + sendr.send(xs)?; } Ok(()) } @@ -151,21 +157,16 @@ impl AccountantSkel { fn process_packets( obj: &Arc>>, - reqs: Vec>, - vers: Vec, + req_vers: Vec<(Request, SocketAddr, u8)>, ) -> Vec<(Response, SocketAddr)> { - let mut rsps = Vec::new(); - for (data, v) in reqs.into_iter().zip(vers.into_iter()) { - if let Some((req, rsp_addr)) = data { - if !req.verify() { - continue; - } - if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) { - rsps.push((resp, rsp_addr)); - } - } - } - rsps + req_vers + .into_iter() + .filter_map(|(req, rsp_addr, v)| { + let mut skel = obj.lock().unwrap(); + skel.log_verified_request(req, v) + .map(|resp| (resp, rsp_addr)) + }) + .collect() } fn serialize_response( @@ -198,16 +199,21 @@ impl AccountantSkel { fn process( obj: &Arc>>, - verified_receiver: &Receiver<(Vec, Vec>)>, + verified_receiver: &Receiver)>>, blob_sender: &streamer::BlobSender, packet_recycler: &packet::PacketRecycler, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); - let (mms, vvs) = verified_receiver.recv_timeout(timer)?; - for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) { + let mms = verified_receiver.recv_timeout(timer)?; + for (msgs, vers) in mms { let reqs = Self::deserialize_packets(&msgs.read().unwrap()); - let rsps = Self::process_packets(obj, reqs, vers); + let req_vers = reqs.into_iter() + .zip(vers) + .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) + .filter(|x| x.0.verify()) + .collect(); + let rsps = Self::process_packets(obj, req_vers); let blobs = Self::serialize_responses(rsps, blob_recycler)?; if !blobs.is_empty() { //don't wake up the other side if there is nothing