From 99416a63b521c5e0a0f7b09ed65b9c7a62674196 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 31 Aug 2018 15:39:46 -0700 Subject: [PATCH 1/3] fix race with dropping mpsc::Receiver Fix a bug where messages sent into a channel at the same time as the `Receiver` is dropped are never removed. Fixes #909 --- src/sync/mpsc/mod.rs | 53 +++++++++++------ tests/mpsc-close.rs | 131 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 16 deletions(-) diff --git a/src/sync/mpsc/mod.rs b/src/sync/mpsc/mod.rs index 7187579666..d9f36924f2 100644 --- a/src/sync/mpsc/mod.rs +++ b/src/sync/mpsc/mod.rs @@ -813,6 +813,12 @@ impl Receiver { loop { match unsafe { self.inner.message_queue.pop() } { PopResult::Data(msg) => { + // If there are any parked task handles in the parked queue, + // pop one and unpark it. + self.unpark_one(); + // Decrement number of messages + self.dec_num_messages(); + return Async::Ready(msg); } PopResult::Empty => { @@ -863,7 +869,7 @@ impl Receiver { let state = decode_state(curr); // If the channel is closed, then there is no need to park. - if !state.is_open && state.num_messages == 0 { + if state.is_closed() { return TryPark::Closed; } @@ -904,8 +910,8 @@ impl Stream for Receiver { fn poll(&mut self) -> Poll, ()> { loop { // Try to read a message off of the message queue. - let msg = match self.next_message() { - Async::Ready(msg) => msg, + match self.next_message() { + Async::Ready(msg) => return Ok(Async::Ready(msg)), Async::NotReady => { // There are no messages to read, in this case, attempt to // park. The act of parking will verify that the channel is @@ -929,17 +935,7 @@ impl Stream for Receiver { } } } - }; - - // If there are any parked task handles in the parked queue, pop - // one and unpark it. - self.unpark_one(); - - // Decrement number of messages - self.dec_num_messages(); - - // Return the message - return Ok(Async::Ready(msg)); + } } } } @@ -948,8 +944,27 @@ impl Drop for Receiver { fn drop(&mut self) { // Drain the channel of all pending messages self.close(); - while self.next_message().is_ready() { - // ... + + loop { + match self.next_message() { + Async::Ready(_) => {} + Async::NotReady => { + let curr = self.inner.state.load(SeqCst); + let state = decode_state(curr); + + // If the channel is closed, then there is no need to park. + if state.is_closed() { + return; + } + + // TODO: Spinning isn't ideal, it might be worth + // investigating using a condvar or some other strategy + // here. That said, if this case is hit, then another thread + // is about to push the value into the queue and this isn't + // the only spinlock in the impl right now. + thread::yield_now(); + } + } } } } @@ -1125,6 +1140,12 @@ impl Inner { unsafe impl Send for Inner {} unsafe impl Sync for Inner {} +impl State { + fn is_closed(&self) -> bool { + !self.is_open && self.num_messages == 0 + } +} + /* * * ===== Helpers ===== diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs index 253e015705..9097d70249 100644 --- a/tests/mpsc-close.rs +++ b/tests/mpsc-close.rs @@ -1,9 +1,12 @@ extern crate futures; +use std::sync::{Arc, Weak}; use std::thread; +use std::time::{Duration, Instant}; use futures::prelude::*; use futures::sync::mpsc::*; +use futures::task; #[test] fn smoke() { @@ -19,3 +22,131 @@ fn smoke() { t.join().unwrap() } + +// Stress test that `try_send()`s occurring concurrently with receiver +// close/drops don't appear as successful sends. +#[test] +fn stress_try_send_as_receiver_closes() { + const AMT: usize = 10000; + // To provide variable timing characteristics (in the hopes of + // reproducing the collision that leads to a race), we busy-re-poll + // the test MPSC receiver a variable number of times before actually + // stopping. We vary this countdown between 1 and the following + // value. + const MAX_COUNTDOWN: usize = 20; + // When we detect that a successfully sent item is still in the + // queue after a disconnect, we spin for up to 100ms to confirm that + // it is a persistent condition and not a concurrency illusion. + const SPIN_TIMEOUT: Duration = Duration::from_secs(10); + const SPIN_SLEEP: Duration = Duration::from_millis(10); + struct TestRx { + rx: Receiver>, + // The number of times to query `rx` before dropping it. + poll_count: usize + } + struct TestTask { + command_rx: Receiver, + test_rx: Option>>, + countdown: usize, + } + impl TestTask { + /// Create a new TestTask + fn new() -> (TestTask, Sender) { + let (command_tx, command_rx) = channel::(0); + ( + TestTask { + command_rx, + test_rx: None, + countdown: 0, // 0 means no countdown is in progress. + }, + command_tx, + ) + } + } + impl Future for TestTask { + type Item = (); + type Error = (); + fn poll(&mut self) -> Poll<(), ()> { + // Poll the test channel, if one is present. + if let Some(ref mut rx) = self.test_rx { + if let Ok(Async::Ready(v)) = rx.poll() { + let _ = v.expect("test finished unexpectedly!"); + } + self.countdown -= 1; + // Busy-poll until the countdown is finished. + task::current().notify(); + } + // Accept any newly submitted MPSC channels for testing. + match self.command_rx.poll()? { + Async::Ready(Some(TestRx { rx, poll_count })) => { + self.test_rx = Some(rx); + self.countdown = poll_count; + task::current().notify(); + }, + Async::Ready(None) => return Ok(Async::Ready(())), + _ => {}, + } + if self.countdown == 0 { + // Countdown complete -- drop the Receiver. + self.test_rx = None; + } + Ok(Async::NotReady) + } + } + let (f, mut cmd_tx) = TestTask::new(); + let bg = thread::spawn(move || f.wait()); + for i in 0..AMT { + let (mut test_tx, rx) = channel(0); + let poll_count = i % MAX_COUNTDOWN; + cmd_tx.try_send(TestRx { rx, poll_count }).unwrap(); + let mut prev_weak: Option> = None; + let mut attempted_sends = 0; + let mut successful_sends = 0; + loop { + // Create a test item. + let item = Arc::new(()); + let weak = Arc::downgrade(&item); + match test_tx.try_send(item) { + Ok(_) => { + prev_weak = Some(weak); + successful_sends += 1; + } + Err(ref e) if e.is_full() => {} + Err(ref e) if e.is_disconnected() => { + // Test for evidence of the race condition. + if let Some(prev_weak) = prev_weak { + if prev_weak.upgrade().is_some() { + // The previously sent item is still allocated. + // However, there appears to be some aspect of the + // concurrency that can legitimately cause the Arc + // to be momentarily valid. Spin for up to 100ms + // waiting for the previously sent item to be + // dropped. + let t0 = Instant::now(); + let mut spins = 0; + loop { + if prev_weak.upgrade().is_none() { + break; + } + assert!(t0.elapsed() < SPIN_TIMEOUT, + "item not dropped on iteration {} after \ + {} sends ({} successful). spin=({})", + i, attempted_sends, successful_sends, spins + ); + spins += 1; + thread::sleep(SPIN_SLEEP); + } + } + } + break; + } + Err(ref e) => panic!("unexpected error: {}", e), + } + attempted_sends += 1; + } + } + drop(cmd_tx); + bg.join() + .expect("background thread join") + .expect("background thread result"); +} From 0a0b2d56a47c49d6c0bbf79520f60621a1063eb8 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 31 Aug 2018 17:05:05 -0700 Subject: [PATCH 2/3] Get tests passing on 1.15 --- tests/mpsc-close.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs index 9097d70249..d88592cade 100644 --- a/tests/mpsc-close.rs +++ b/tests/mpsc-close.rs @@ -55,7 +55,7 @@ fn stress_try_send_as_receiver_closes() { let (command_tx, command_rx) = channel::(0); ( TestTask { - command_rx, + command_rx: command_rx, test_rx: None, countdown: 0, // 0 means no countdown is in progress. }, @@ -98,7 +98,7 @@ fn stress_try_send_as_receiver_closes() { for i in 0..AMT { let (mut test_tx, rx) = channel(0); let poll_count = i % MAX_COUNTDOWN; - cmd_tx.try_send(TestRx { rx, poll_count }).unwrap(); + cmd_tx.try_send(TestRx { rx: rx, poll_count: poll_count }).unwrap(); let mut prev_weak: Option> = None; let mut attempted_sends = 0; let mut successful_sends = 0; From 7b38d8f1332b2816e99300da1ade373f738d124b Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 31 Aug 2018 17:22:03 -0700 Subject: [PATCH 3/3] Try to fix for 1.15 again --- tests/mpsc-close.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs index d88592cade..061616ae06 100644 --- a/tests/mpsc-close.rs +++ b/tests/mpsc-close.rs @@ -37,8 +37,8 @@ fn stress_try_send_as_receiver_closes() { // When we detect that a successfully sent item is still in the // queue after a disconnect, we spin for up to 100ms to confirm that // it is a persistent condition and not a concurrency illusion. - const SPIN_TIMEOUT: Duration = Duration::from_secs(10); - const SPIN_SLEEP: Duration = Duration::from_millis(10); + const SPIN_TIMEOUT_S: u64 = 10; + const SPIN_SLEEP_MS: u64 = 10; struct TestRx { rx: Receiver>, // The number of times to query `rx` before dropping it. @@ -128,13 +128,13 @@ fn stress_try_send_as_receiver_closes() { if prev_weak.upgrade().is_none() { break; } - assert!(t0.elapsed() < SPIN_TIMEOUT, + assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), "item not dropped on iteration {} after \ {} sends ({} successful). spin=({})", i, attempted_sends, successful_sends, spins ); spins += 1; - thread::sleep(SPIN_SLEEP); + thread::sleep(Duration::from_millis(SPIN_SLEEP_MS)); } } }