Skip to content

Commit 443e50c

Browse files
committed
Rollup merge of #55963 - stepancheg:mpsc-take-2, r=alexcrichton
Stress test for MPSC `concurrent_recv_timeout_and_upgrade` reproduces a problem 100% times on my MacBook with command: ``` ./x.py test --stage 0 ./src/test/run-pass/mpsc_stress.rs ``` Thus it is commented out. Other tests cases were useful for catching another test cases which may arise during the fix. This diff is a part of my previous rewrite attempt: #42883 CC #39364
2 parents fd65e25 + a1f83e7 commit 443e50c

File tree

1 file changed

+172
-0
lines changed

1 file changed

+172
-0
lines changed

src/test/run-pass/mpsc_stress.rs

+172
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
// Copyright 2017 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
// compile-flags:--test
12+
// ignore-emscripten
13+
14+
use std::sync::mpsc::channel;
15+
use std::sync::mpsc::TryRecvError;
16+
use std::sync::mpsc::RecvError;
17+
use std::sync::mpsc::RecvTimeoutError;
18+
use std::sync::Arc;
19+
use std::sync::atomic::AtomicUsize;
20+
use std::sync::atomic::Ordering;
21+
22+
use std::thread;
23+
use std::time::Duration;
24+
25+
26+
/// Simple thread synchronization utility
27+
struct Barrier {
28+
// Not using mutex/condvar for precision
29+
shared: Arc<AtomicUsize>,
30+
count: usize,
31+
}
32+
33+
impl Barrier {
34+
fn new(count: usize) -> Vec<Barrier> {
35+
let shared = Arc::new(AtomicUsize::new(0));
36+
(0..count).map(|_| Barrier { shared: shared.clone(), count: count }).collect()
37+
}
38+
39+
fn new2() -> (Barrier, Barrier) {
40+
let mut v = Barrier::new(2);
41+
(v.pop().unwrap(), v.pop().unwrap())
42+
}
43+
44+
/// Returns when `count` threads enter `wait`
45+
fn wait(self) {
46+
self.shared.fetch_add(1, Ordering::SeqCst);
47+
while self.shared.load(Ordering::SeqCst) != self.count {
48+
}
49+
}
50+
}
51+
52+
53+
fn shared_close_sender_does_not_lose_messages_iter() {
54+
let (tb, rb) = Barrier::new2();
55+
56+
let (tx, rx) = channel();
57+
let _ = tx.clone(); // convert to shared
58+
59+
thread::spawn(move || {
60+
tb.wait();
61+
thread::sleep(Duration::from_micros(1));
62+
tx.send(17).expect("send");
63+
drop(tx);
64+
});
65+
66+
let i = rx.into_iter();
67+
rb.wait();
68+
// Make sure it doesn't return disconnected before returning an element
69+
assert_eq!(vec![17], i.collect::<Vec<_>>());
70+
}
71+
72+
#[test]
73+
fn shared_close_sender_does_not_lose_messages() {
74+
for _ in 0..10000 {
75+
shared_close_sender_does_not_lose_messages_iter();
76+
}
77+
}
78+
79+
80+
// https://github.com/rust-lang/rust/issues/39364
81+
fn concurrent_recv_timeout_and_upgrade_iter() {
82+
// 1 us
83+
let sleep = Duration::new(0, 1_000);
84+
85+
let (a, b) = Barrier::new2();
86+
let (tx, rx) = channel();
87+
let th = thread::spawn(move || {
88+
a.wait();
89+
loop {
90+
match rx.recv_timeout(sleep) {
91+
Ok(_) => {
92+
break;
93+
},
94+
Err(_) => {},
95+
}
96+
}
97+
});
98+
b.wait();
99+
thread::sleep(sleep);
100+
tx.clone().send(()).expect("send");
101+
th.join().unwrap();
102+
}
103+
104+
#[test]
105+
fn concurrent_recv_timeout_and_upgrade() {
106+
// FIXME: fix and enable
107+
if true { return }
108+
109+
// at the moment of writing this test fails like this:
110+
// thread '<unnamed>' panicked at 'assertion failed: `(left == right)`
111+
// left: `4561387584`,
112+
// right: `0`', libstd/sync/mpsc/shared.rs:253:13
113+
114+
for _ in 0..10000 {
115+
concurrent_recv_timeout_and_upgrade_iter();
116+
}
117+
}
118+
119+
120+
fn concurrent_writes_iter() {
121+
const THREADS: usize = 4;
122+
const PER_THR: usize = 100;
123+
124+
let mut bs = Barrier::new(THREADS + 1);
125+
let (tx, rx) = channel();
126+
127+
let mut threads = Vec::new();
128+
for j in 0..THREADS {
129+
let tx = tx.clone();
130+
let b = bs.pop().unwrap();
131+
threads.push(thread::spawn(move || {
132+
b.wait();
133+
for i in 0..PER_THR {
134+
tx.send(j * 1000 + i).expect("send");
135+
}
136+
}));
137+
}
138+
139+
let b = bs.pop().unwrap();
140+
b.wait();
141+
142+
let mut v: Vec<_> = rx.iter().take(THREADS * PER_THR).collect();
143+
v.sort();
144+
145+
for j in 0..THREADS {
146+
for i in 0..PER_THR {
147+
assert_eq!(j * 1000 + i, v[j * PER_THR + i]);
148+
}
149+
}
150+
151+
for t in threads {
152+
t.join().unwrap();
153+
}
154+
155+
let one_us = Duration::new(0, 1000);
156+
157+
assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err());
158+
assert_eq!(RecvTimeoutError::Timeout, rx.recv_timeout(one_us).unwrap_err());
159+
160+
drop(tx);
161+
162+
assert_eq!(RecvError, rx.recv().unwrap_err());
163+
assert_eq!(RecvTimeoutError::Disconnected, rx.recv_timeout(one_us).unwrap_err());
164+
assert_eq!(TryRecvError::Disconnected, rx.try_recv().unwrap_err());
165+
}
166+
167+
#[test]
168+
fn concurrent_writes() {
169+
for _ in 0..100 {
170+
concurrent_writes_iter();
171+
}
172+
}

0 commit comments

Comments
 (0)