Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update dependencies and fix unsoundness and CI failure #29

Merged
merged 13 commits into from
Jun 19, 2022
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ codecov = { repository = "jonhoo/bus", branch = "master", service = "github" }
maintenance = { status = "passively-maintained" }

[dependencies]
atomic-option = "0.1"
num_cpus = "1.6.2"
parking_lot_core = "0.7"
crossbeam-channel = "0.4"
parking_lot_core = "0.9"
crossbeam-channel = "0.5"

[profile.release]
debug = true
21 changes: 10 additions & 11 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
jobs:
- template: default.yml@templates
parameters:
minrust: 1.36.0 # MaybeUninit
minrust: 1.49.0 # parking_lot_core
- job: miri
displayName: "Run miri on test suite"
pool:
vmImage: ubuntu-16.04
vmImage: ubuntu-latest
steps:
- template: install-rust.yml@templates
parameters:
rust: nightly
components:
- miri
- bash: rustup toolchain install nightly --component miri && rustup default nightly
displayName: install rust
# ignore leaks due to https://github.com/crossbeam-rs/crossbeam/issues/464
- bash: yes | cargo miri -Zmiri-ignore-leaks test
# disable preemption due to https://github.com/rust-lang/rust/issues/55005
# disable weak memory emulation due to https://github.com/rust-lang/miri/issues/2223
- bash: MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks -Zmiri-preemption-rate=0 -Zmiri-disable-weak-memory-emulation" cargo miri test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to disable isolation now when that was not needed before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is needed for recv_timeout that uses Instant. (recv_timeout has only been tested at doc test, so that flag was not needed before.)

error: unsupported operation: `clock_gettime` not available when isolation is enabled
   --> /home/vsts/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/sys/unix/time.rs:341:26
    |
341 |             cvt(unsafe { libc::clock_gettime(clock, t.as_mut_ptr()) }).unwrap();
    |                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `clock_gettime` not available when isolation is enabled
    |
    = help: pass the flag `-Zmiri-disable-isolation` to disable isolation;
    = help: or pass `-Zmiri-isolation-error=warn` to configure Miri to return an error code from isolated operations (if supported for that operation) and continue with a warning
            
    = note: inside `std::sys::unix::time::inner::<impl std::sys::unix::time::Timespec>::now` at /home/vsts/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/sys/unix/time.rs:341:26
    = note: inside `std::sys::unix::time::inner::Instant::now` at /home/vsts/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/sys/unix/time.rs:275:26
    = note: inside `std::time::Instant::now` at /home/vsts/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/time.rs:276:17
note: inside `bus::BusReader::<bool>::recv_inner` at /home/vsts/work/1/s/src/lib.rs:587:47
   --> /home/vsts/work/1/s/src/lib.rs:587:47
    |
587 |             RecvCondition::Timeout(_) => Some(time::Instant::now()),
    |                                               ^^^^^^^^^^^^^^^^^^^^
note: inside `bus::BusReader::<bool>::recv_timeout` at /home/vsts/work/1/s/src/lib.rs:762:9
   --> /home/vsts/work/1/s/src/lib.rs:762:9
    |
762 |         self.recv_inner(RecvCondition::Timeout(timeout))
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: inside `main::_doctest_main_src_lib_rs_747_0` at src/lib.rs:12:44
   --> src/lib.rs:756:44
    |
12  | assert_eq!(Err(RecvTimeoutError::Timeout), rx.recv_timeout(timeout));
    |                                            ^^^^^^^^^^^^^^^^^^^^^^^^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, makes sense.

Btw, the Futex issue is fixed in Miri (should ship with the next nightly).

displayName: cargo miri test
- job: asan
displayName: "Run address sanitizer on test suite"
pool:
vmImage: ubuntu-16.04
vmImage: ubuntu-latest
steps:
- template: install-rust.yml@templates
parameters:
Expand All @@ -33,7 +32,7 @@ jobs:
- job: lsan
displayName: "Run leak sanitizer on test suite"
pool:
vmImage: ubuntu-16.04
vmImage: ubuntu-latest
steps:
- template: install-rust.yml@templates
parameters:
Expand All @@ -58,5 +57,5 @@ resources:
- repository: templates
type: github
name: crate-ci/azure-pipelines
ref: refs/heads/v0.3
ref: refs/heads/v0.4
endpoint: jonhoo
69 changes: 59 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
//! Multi-send, multi-consumer example
//!
//! ```rust
//! # if cfg!(miri) { return } // Miri is too slow
//! use bus::Bus;
//! use std::thread;
//!
Expand Down Expand Up @@ -99,12 +100,13 @@
#![deny(missing_docs)]
#![warn(rust_2018_idioms)]

use atomic_option::AtomicOption;
use crossbeam_channel as mpsc;
use parking_lot_core::SpinWait;

use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::Deref;
use std::ptr;
use std::sync::atomic;
use std::sync::mpsc as std_mpsc;
use std::sync::Arc;
Expand Down Expand Up @@ -189,22 +191,24 @@ impl<T: Clone + Sync> Seat<T> {
// we're the last reader, so we may need to notify the writer there's space in the buf.
// can be relaxed, since the acquire at the top already guarantees that we'll see
// updates.
waiting = self.waiting.take(atomic::Ordering::Relaxed);
waiting = self.waiting.take();

// since we're the last reader, no-one else will be cloning this value, so we can
// safely take a mutable reference, and just take the val instead of cloning it.
unsafe { &mut *self.state.get() }.val.take().unwrap()
} else {
state
let v = state
.val
.clone()
.expect("seat that should be occupied was empty")
.expect("seat that should be occupied was empty");

// let writer know that we no longer need this item.
// state is no longer safe to access.
#[allow(clippy::drop_ref)]
drop(state);
v
};

// let writer know that we no longer need this item.
// state is no longer safe to access.
#[allow(clippy::drop_ref)]
drop(state);
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
self.read.fetch_add(1, atomic::Ordering::AcqRel);

if let Some(t) = waiting {
Expand Down Expand Up @@ -378,7 +382,7 @@ impl<T> Bus<T> {
// no, so block by parking and telling readers to notify on last read
self.state.ring[fence]
.waiting
.replace(Some(Box::new(thread::current())), atomic::Ordering::Relaxed);
.swap(Some(Box::new(thread::current())));

// need the atomic fetch_add to ensure reader threads will see the new .waiting
self.state.ring[fence]
Expand Down Expand Up @@ -416,7 +420,7 @@ impl<T> Bus<T> {
let state = unsafe { &mut *next.state.get() };
state.max = readers;
state.val = Some(val);
next.waiting.replace(None, atomic::Ordering::Relaxed);
next.waiting.take();
next.read.store(0, atomic::Ordering::Release);
}
self.rleft[tail] = 0;
Expand Down Expand Up @@ -817,3 +821,48 @@ impl<T: Clone + Sync> Iterator for BusIntoIter<T> {
self.0.recv().ok()
}
}

struct AtomicOption<T> {
ptr: atomic::AtomicPtr<T>,
_marker: PhantomData<Option<Box<T>>>,
}

unsafe impl<T: Send> Send for AtomicOption<T> {}
unsafe impl<T: Send> Sync for AtomicOption<T> {}

impl<T> AtomicOption<T> {
fn empty() -> Self {
Self {
ptr: atomic::AtomicPtr::new(ptr::null_mut()),
_marker: PhantomData,
}
}

fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
let old = match val {
Some(val) => self.ptr.swap(Box::into_raw(val), atomic::Ordering::AcqRel),
// Acquire is needed to synchronize with the store of a non-null ptr, but since a null ptr
// will never be dereferenced, there is no need to synchronize the store of a null ptr.
None => self.ptr.swap(ptr::null_mut(), atomic::Ordering::Acquire),
};
if old.is_null() {
None
} else {
// SAFETY:
// - AcqRel/Acquire ensures that it does not read a pointer to potentially invalid memory.
// - We've checked that old is not null.
// - We do not store invalid pointers other than null in self.ptr.
Some(unsafe { Box::from_raw(old) })
}
}

fn take(&self) -> Option<Box<T>> {
self.swap(None)
}
}

impl<T> Drop for AtomicOption<T> {
fn drop(&mut self) {
drop(self.take());
}
}