diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9c0be40..9722b5f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -164,7 +164,7 @@ jobs: strategy: fail-fast: false matrix: - sanitizer: [address, leak] + sanitizer: [address, leak, memory, thread] steps: - uses: actions/checkout@master - uses: actions-rs/toolchain@v1 @@ -172,13 +172,14 @@ jobs: override: true profile: minimal toolchain: nightly + components: rust-src - uses: actions-rs/install@v0.1 with: crate: cargo-fuzz - uses: actions-rs/cargo@v1 with: command: fuzz - args: run -s ${{ matrix.sanitizer }} fuzzer -- -max_total_time=120 + args: run -s ${{ matrix.sanitizer }} fuzzer -Z build-std -- -max_total_time=120 coverage: needs: [test] diff --git a/src/channel.rs b/src/channel.rs index 229c3cc..a2d357a 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -308,8 +308,8 @@ mod tests { let (tx, _rx) = ring_channel::<()>(NonZeroUsize::try_from(1)?); #[allow(clippy::redundant_clone)] let x = tx.clone(); - assert_eq!(x.handle.senders.load(Ordering::Relaxed), 2); - assert_eq!(x.handle.receivers.load(Ordering::Relaxed), 1); + assert_eq!(x.handle.senders.load(Ordering::SeqCst), 2); + assert_eq!(x.handle.receivers.load(Ordering::SeqCst), 1); } #[proptest] @@ -317,36 +317,36 @@ mod tests { let (_tx, rx) = ring_channel::<()>(NonZeroUsize::try_from(1)?); #[allow(clippy::redundant_clone)] let x = rx.clone(); - assert_eq!(x.handle.senders.load(Ordering::Relaxed), 1); - assert_eq!(x.handle.receivers.load(Ordering::Relaxed), 2); + assert_eq!(x.handle.senders.load(Ordering::SeqCst), 1); + assert_eq!(x.handle.receivers.load(Ordering::SeqCst), 2); } #[proptest] fn dropping_sender_decrements_senders_counter() { let (_, rx) = ring_channel::<()>(NonZeroUsize::try_from(1)?); - assert_eq!(rx.handle.senders.load(Ordering::Relaxed), 0); - assert_eq!(rx.handle.receivers.load(Ordering::Relaxed), 1); + assert_eq!(rx.handle.senders.load(Ordering::SeqCst), 0); + assert_eq!(rx.handle.receivers.load(Ordering::SeqCst), 1); } #[proptest] fn dropping_receiver_decrements_receivers_counter() { let (tx, _) = ring_channel::<()>(NonZeroUsize::try_from(1)?); - assert_eq!(tx.handle.senders.load(Ordering::Relaxed), 1); - assert_eq!(tx.handle.receivers.load(Ordering::Relaxed), 0); + assert_eq!(tx.handle.senders.load(Ordering::SeqCst), 1); + assert_eq!(tx.handle.receivers.load(Ordering::SeqCst), 0); } #[proptest] fn channel_is_disconnected_if_there_are_no_senders() { let (_, rx) = ring_channel::<()>(NonZeroUsize::try_from(1)?); - assert_eq!(rx.handle.senders.load(Ordering::Relaxed), 0); - assert!(!rx.handle.connected.load(Ordering::Relaxed)); + assert_eq!(rx.handle.senders.load(Ordering::SeqCst), 0); + assert!(!rx.handle.connected.load(Ordering::SeqCst)); } #[proptest] fn channel_is_disconnected_if_there_are_no_receivers() { let (tx, _) = ring_channel::<()>(NonZeroUsize::try_from(1)?); - assert_eq!(tx.handle.receivers.load(Ordering::Relaxed), 0); - assert!(!tx.handle.connected.load(Ordering::Relaxed)); + assert_eq!(tx.handle.receivers.load(Ordering::SeqCst), 0); + assert!(!tx.handle.connected.load(Ordering::SeqCst)); } #[proptest] @@ -682,7 +682,7 @@ mod tests { #[cfg(feature = "futures_api")] #[cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1388 #[proptest] - fn stream_wakes_on_sink(#[strategy(1..=100usize)] n: usize) { + fn stream_wakes_on_sink(#[strategy(1..=10usize)] n: usize) { let rt = runtime::Builder::new_multi_thread().build()?; let (tx, rx) = ring_channel(NonZeroUsize::try_from(n)?); let _prevent_disconnection = tx.clone(); diff --git a/src/control.rs b/src/control.rs index 2345016..34ec6e6 100644 --- a/src/control.rs +++ b/src/control.rs @@ -62,9 +62,9 @@ impl Deref for ControlBlockRef { impl Drop for ControlBlockRef { fn drop(&mut self) { - debug_assert!(!self.connected.load(Ordering::Relaxed)); - debug_assert_eq!(self.senders.load(Ordering::Relaxed), 0); - debug_assert_eq!(self.receivers.load(Ordering::Relaxed), 0); + debug_assert!(!self.connected.load(Ordering::SeqCst)); + debug_assert_eq!(self.senders.load(Ordering::SeqCst), 0); + debug_assert_eq!(self.receivers.load(Ordering::SeqCst), 0); unsafe { Box::from_raw(&**self as *const ControlBlock as *mut ControlBlock) }; } @@ -78,14 +78,14 @@ mod tests { #[proptest] fn control_block_starts_connected() { let ctrl = ControlBlock::<()>::new(1); - assert!(ctrl.connected.load(Ordering::Relaxed)); + assert!(ctrl.connected.load(Ordering::SeqCst)); } #[proptest] fn control_block_starts_with_reference_counters_equal_to_one() { let ctrl = ControlBlock::<()>::new(1); - assert_eq!(ctrl.senders.load(Ordering::Relaxed), 1); - assert_eq!(ctrl.receivers.load(Ordering::Relaxed), 1); + assert_eq!(ctrl.senders.load(Ordering::SeqCst), 1); + assert_eq!(ctrl.receivers.load(Ordering::SeqCst), 1); } #[proptest] diff --git a/src/waitlist.rs b/src/waitlist.rs index c3314cb..7e5c892 100644 --- a/src/waitlist.rs +++ b/src/waitlist.rs @@ -1,4 +1,4 @@ -use core::sync::atomic::*; +use core::sync::atomic::{AtomicUsize, Ordering}; use crossbeam_queue::SegQueue; use crossbeam_utils::CachePadded; use derivative::Derivative; @@ -30,6 +30,12 @@ struct Drain<'a, T> { count: usize, } +impl<'a, T> Drop for Drain<'a, T> { + fn drop(&mut self) { + self.for_each(drop); + } +} + impl<'a, T> Iterator for Drain<'a, T> { type Item = T; @@ -66,7 +72,7 @@ mod tests { #[proptest] fn waitlist_starts_empty() { let waitlist = Waitlist::<()>::new(); - assert_eq!(waitlist.len.load(Ordering::Relaxed), 0); + assert_eq!(waitlist.len.load(Ordering::SeqCst), 0); assert_eq!(waitlist.queue.len(), 0); } @@ -80,7 +86,7 @@ mod tests { waitlist.push(item); } - assert_eq!(waitlist.len.load(Ordering::Relaxed), items.len()); + assert_eq!(waitlist.len.load(Ordering::SeqCst), items.len()); assert_eq!(waitlist.queue.len(), items.len()); } @@ -95,7 +101,23 @@ mod tests { } assert_eq!(waitlist.drain().collect::>(), items); - assert_eq!(waitlist.len.load(Ordering::Relaxed), 0); + assert_eq!(waitlist.len.load(Ordering::SeqCst), 0); + assert_eq!(waitlist.queue.len(), 0); + } + + #[proptest] + fn items_are_popped_if_drain_iterator_is_dropped( + #[any(size_range(1..=10).lift())] items: Vec, + ) { + let waitlist = Waitlist::new(); + + for &item in &items { + waitlist.push(item); + } + + drop(waitlist.drain()); + + assert_eq!(waitlist.len.load(Ordering::SeqCst), 0); assert_eq!(waitlist.queue.len(), 0); }