Skip to content

Commit

Permalink
mutex for send2
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Simmons committed Apr 18, 2022
1 parent 8256493 commit 76d5829
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 20 deletions.
47 changes: 28 additions & 19 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1025,29 +1025,38 @@ impl<T> Drop for Receiver<T> {
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let next = self.next;
let shared = self.shared.clone();

// register interest in the slot that next points to
{
for n in next..=self.shared.tail.lock().pos {
let idx = (n & self.shared.mask as u64) as usize;
let slot = self.shared.buffer[idx].read().unwrap();

// a race with RecvGuard::drop would be bad, but is impossible since `self.next`
// is already incremented to the slot after the one that the `RecvGuard` points to. Additionally
// all methods that drop a `RecvGuard` require a &mut `Receiver` which ensures this method is not
// called concurrently.
slot.rem.fetch_add(1, SeqCst);
}
// let this be lock-free since we're not yet operating on the tail.
let tail_pos = shared.tail.lock().pos;
for n in next..tail_pos {
let idx = (n & shared.mask as u64) as usize;
let slot = shared.buffer[idx].read().unwrap();

// a race with RecvGuard::drop would be bad, but is impossible since `self.next`
// is already incremented to the slot after the one that the `RecvGuard` points to. Additionally
// all methods that drop a `RecvGuard` require a &mut `Receiver` which ensures this method is not
// called concurrently.
slot.rem.fetch_add(1, SeqCst);
}
// tail pos may have changed, we need a locked section here to prevent a race with `Sender::send2`
let mut n = tail_pos.wrapping_sub(1);
let mut tail = shared.tail.lock();
while n <= tail.pos {
let idx = (n & shared.mask as u64) as usize;
let slot = self.shared.buffer[idx].read().unwrap();
slot.rem.fetch_add(1, SeqCst);
n = n.wrapping_add(1);
}
let shared = self.shared.clone();
// register the new receiver with `Tail`
{
let mut tail = shared.tail.lock();

if tail.rx_cnt == MAX_RECEIVERS {
panic!("max receivers");
}
tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
// register the new receiver with `Tail`
if tail.rx_cnt == MAX_RECEIVERS {
panic!("max receivers");
}
tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");

drop(tail);

Receiver { shared, next }
}
Expand Down
26 changes: 25 additions & 1 deletion tokio/src/sync/tests/loom_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ fn drop_cloned_rx() {
let v = assert_ok!(rx1.recv().await);
assert_eq!(v, "three");

let v = assert_ok!(rx1.recv().await);

match assert_err!(rx1.recv().await) {
Closed => {}
Expand Down Expand Up @@ -272,3 +271,28 @@ fn drop_multiple_cloned_rx_with_overflow() {
assert_ok!(th2.join());
});
}

#[test]
fn send_and_rx_clone() {
// test the interraction of Sender::send and Rx::clone
loom::model(move || {
let (tx, mut rx) = broadcast::channel(2);

let th1 = thread::spawn(move || {
block_on(async {
let mut rx2 = rx.clone();
let v = assert_ok!(rx.recv().await);
assert_eq!(v, 1);

// this would return closed if rem was incr'd in clone between
// read and write of rem for new tail entry.
let v2 = assert_ok!(rx2.recv().await);
assert_eq!(v2, 1);
});
});
assert_ok!(tx.send(1));

assert_ok!(th1.join());
});
}

0 comments on commit 76d5829

Please sign in to comment.