Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(oneshot channel): ensure msg won't be dropped on sender side when send returns ok #6558

Merged
22 changes: 20 additions & 2 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,14 @@ impl<T> Receiver<T> {
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if let Some(inner) = self.inner.as_ref() {
inner.close();
let state = inner.close();

if state.is_complete() {
// SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
// so only the receiver can access the value.
drop(unsafe { inner.consume_value() });
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
Expand Down Expand Up @@ -1202,14 +1209,16 @@ impl<T> Inner<T> {
}

/// Called by `Receiver` to indicate that the value will never be received.
fn close(&self) {
fn close(&self) -> State {
let prev = State::set_closed(&self.state);

if prev.is_tx_task_set() && !prev.is_complete() {
unsafe {
self.tx_task.with_task(Waker::wake_by_ref);
}
}

prev
}

/// Consumes the value. This function does not check `state`.
Expand Down Expand Up @@ -1248,6 +1257,15 @@ impl<T> Drop for Inner<T> {
self.tx_task.drop_task();
}
}

// SAFETY: we have `&mut self`, and therefore we have
// exclusive access to the value.
unsafe {
// Note: the assertion holds because if the value has been sent by sender,
// we must ensure that the value must have been consumed by the receiver before
// dropping the `Inner`.
debug_assert!(self.consume_value().is_none());
}
}
}

Expand Down
47 changes: 47 additions & 0 deletions tokio/src/sync/tests/loom_oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,50 @@ fn changing_tx_task() {
}
});
}

#[test]
fn checking_tx_send_ok_not_drop() {
use std::borrow::Borrow;
use std::cell::Cell;

loom::thread_local! {
static IS_RX: Cell<bool> = Cell::new(true);
}

struct Msg;

impl Drop for Msg {
fn drop(&mut self) {
IS_RX.with(|is_rx: &Cell<_>| {
// On `tx.send(msg)` returning `Err(msg)`,
// we call `std::mem::forget(msg)`, so that
// `drop` is not expected to be called in the
// tx thread.
assert!(is_rx.get());
});
}
}

let mut builder = loom::model::Builder::new();
builder.preemption_bound = Some(2);

builder.check(|| {
let (tx, rx) = oneshot::channel();

// tx thread
let tx_thread_join_handle = thread::spawn(move || {
// Ensure that `Msg::drop` in this thread will see is_rx == false
IS_RX.with(|is_rx: &Cell<_>| {
is_rx.set(false);
});
if let Err(msg) = tx.send(Msg) {
std::mem::forget(msg);
}
});

// main thread is the rx thread
drop(rx);

tx_thread_join_handle.join().unwrap();
});
}
Loading