From 476e6a967b9eb6c7058a2ced2eed5f13ed6de7a8 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Mon, 25 Jun 2018 13:29:53 -0700 Subject: [PATCH 1/2] Chunk blobs into window size to avoid window overrun Fixes #447 --- src/streamer.rs | 99 +++++++++++++++++++++++++++---------------------- 1 file changed, 54 insertions(+), 45 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index 072fc39d903a0d..7ebeeadb5e0bd4 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -445,63 +445,72 @@ fn broadcast( while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq); } - let mut blobs: Vec<_> = dq.into_iter().collect(); + + // flatten deque to vec + let blobs_vec: Vec<_> = dq.into_iter().collect(); + + // We could receive more blobs than window slots so + // break them up into window-sized chunks to process + let blobs_chunked: Vec<_> = blobs_vec + .chunks(WINDOW_SIZE) + .map(|x| x.to_vec()) + .collect(); print_window(window, *receive_index as usize); - // Insert the coding blobs into the blob stream - #[cfg(feature = "erasure")] - erasure::add_coding_blobs(recycler, &mut blobs, *receive_index); + for mut blobs in blobs_chunked { + // Insert the coding blobs into the blob stream + #[cfg(feature = "erasure")] + erasure::add_coding_blobs(recycler, &mut blobs, *receive_index); - let blobs_len = blobs.len(); - info!("broadcast blobs.len: {}", blobs_len); + let blobs_len = blobs.len(); + debug!("broadcast blobs.len: {}", blobs_len); - // Index the blobs - Crdt::index_blobs(crdt, &blobs, receive_index)?; - // keep the cache of blobs that are broadcast - { - let mut win = window.write().unwrap(); - for b in &blobs { - let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix as usize) % WINDOW_SIZE; - if let Some(x) = &win[pos] { - trace!( - "popped {} at {}", - x.read().unwrap().get_index().unwrap(), - pos - ); - recycler.recycle(x.clone()); + // Index the blobs + Crdt::index_blobs(crdt, &blobs, receive_index)?; + // keep the cache of blobs that are broadcast + { + let mut win = window.write().unwrap(); + assert!(blobs.len() <= win.len()); + for b in &blobs { + let ix = b.read().unwrap().get_index().expect("blob index"); + let pos = (ix as usize) % WINDOW_SIZE; + if let Some(x) = &win[pos] { + trace!( + "popped {} at {}", + x.read().unwrap().get_index().unwrap(), + pos + ); + recycler.recycle(x.clone()); + } + trace!("null {}", pos); + win[pos] = None; + assert!(win[pos].is_none()); + } + while let Some(b) = blobs.pop() { + let ix = b.read().unwrap().get_index().expect("blob index"); + let pos = (ix as usize) % WINDOW_SIZE; + trace!("caching {} at {}", ix, pos); + assert!(win[pos].is_none()); + win[pos] = Some(b); } - trace!("null {}", pos); - win[pos] = None; - assert!(win[pos].is_none()); - } - while let Some(b) = blobs.pop() { - let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix as usize) % WINDOW_SIZE; - trace!("caching {} at {}", ix, pos); - assert!(win[pos].is_none()); - win[pos] = Some(b); } - } - // Fill in the coding blob data from the window data blobs - #[cfg(feature = "erasure")] - { - if erasure::generate_coding( - &mut window.write().unwrap(), - *receive_index as usize, - blobs_len, - ).is_err() + // Fill in the coding blob data from the window data blobs + #[cfg(feature = "erasure")] { - return Err(Error::GenericError); + erasure::generate_coding( + &mut window.write().unwrap(), + *receive_index as usize, + blobs_len, + ).map_err(|_| Error::GenericError)?; } - } - *receive_index += blobs_len as u64; + *receive_index += blobs_len as u64; - // Send blobs out from the window - Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?; + // Send blobs out from the window + Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?; + } Ok(()) } From bc2892e2af4313ac7d2aad3ac28fe7042c01de33 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 25 Jun 2018 16:50:58 -0600 Subject: [PATCH 2/2] Don't clone() Arc before recycling This might fix an awful bug where the streamer reuses a Blob before the current user is done with it. Recycler should probably assert ref count is one? * Also don't collect() an iterator before iterating over it. --- src/streamer.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index 7ebeeadb5e0bd4..8fe1b949d44311 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -6,6 +6,7 @@ use erasure; use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE}; use result::{Error, Result}; use std::collections::VecDeque; +use std::mem; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; @@ -451,10 +452,7 @@ fn broadcast( // We could receive more blobs than window slots so // break them up into window-sized chunks to process - let blobs_chunked: Vec<_> = blobs_vec - .chunks(WINDOW_SIZE) - .map(|x| x.to_vec()) - .collect(); + let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE).map(|x| x.to_vec()); print_window(window, *receive_index as usize); @@ -475,17 +473,15 @@ fn broadcast( for b in &blobs { let ix = b.read().unwrap().get_index().expect("blob index"); let pos = (ix as usize) % WINDOW_SIZE; - if let Some(x) = &win[pos] { + if let Some(x) = mem::replace(&mut win[pos], None) { trace!( "popped {} at {}", x.read().unwrap().get_index().unwrap(), pos ); - recycler.recycle(x.clone()); + recycler.recycle(x); } trace!("null {}", pos); - win[pos] = None; - assert!(win[pos].is_none()); } while let Some(b) = blobs.pop() { let ix = b.read().unwrap().get_index().expect("blob index");