Skip to content

Commit

Permalink
Fix race condition related bugs (#243)
Browse files Browse the repository at this point in the history
* Fix races.

This mostly pulls in changes from rust-lang/futures-rs#881, but
also updates Registration to be a bit more obvious as to what is going
on.

* Reduce spurious wakeups caused by Reactor

This patch adds an ABA guard on token values before registering them
with Mio. This allows catching token reuse and avoid the notification.

This is needed for OS X as the notification is used to determine that a
TCP connect has completed. A spurious notification can potentially cause
write failures.
  • Loading branch information
carllerche authored Mar 22, 2018
1 parent 8786741 commit 08c21e7
Show file tree
Hide file tree
Showing 9 changed files with 414 additions and 188 deletions.
82 changes: 49 additions & 33 deletions tests/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ extern crate env_logger;

use std::{io, thread};
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;

use futures::prelude::*;
use tokio::net::{TcpStream, TcpListener};
Expand All @@ -18,7 +20,7 @@ macro_rules! t {
}

#[test]
fn hammer() {
fn hammer_old() {
let _ = env_logger::init();

let threads = (0..10).map(|_| {
Expand Down Expand Up @@ -73,48 +75,62 @@ fn hammer_split() {
use tokio_io::io;

const N: usize = 100;
const ITER: usize = 100;

let _ = env_logger::init();

let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
let addr = t!(srv.local_addr());
for _ in 0..ITER {
let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
let addr = t!(srv.local_addr());

let mut rt = Runtime::new().unwrap();
let cnt = Arc::new(AtomicUsize::new(0));

fn split(socket: TcpStream) {
let socket = Arc::new(socket);
let rd = Rd(socket.clone());
let wr = Wr(socket);
let mut rt = Runtime::new().unwrap();

let rd = io::read(rd, vec![0; 1])
.map(|_| ())
.map_err(|e| panic!("read error = {:?}", e));
fn split(socket: TcpStream, cnt: Arc<AtomicUsize>) {
let socket = Arc::new(socket);
let rd = Rd(socket.clone());
let wr = Wr(socket);

let wr = io::write_all(wr, b"1")
.map(|_| ())
.map_err(|e| panic!("write error = {:?}", e));
let cnt2 = cnt.clone();

tokio::spawn(rd);
tokio::spawn(wr);
}
let rd = io::read(rd, vec![0; 1])
.map(move |_| {
cnt2.fetch_add(1, Relaxed);
})
.map_err(|e| panic!("read error = {:?}", e));

let wr = io::write_all(wr, b"1")
.map(move |_| {
cnt.fetch_add(1, Relaxed);
})
.map_err(move |e| panic!("write error = {:?}", e));

tokio::spawn(rd);
tokio::spawn(wr);
}

rt.spawn({
srv.incoming()
.map_err(|e| panic!("accept error = {:?}", e))
.take(N as u64)
.for_each(|socket| {
split(socket);
Ok(())
})
});

for _ in 0..N {
rt.spawn({
TcpStream::connect(&addr)
.map_err(|e| panic!("connect error = {:?}", e))
.map(|socket| split(socket))
let cnt = cnt.clone();
srv.incoming()
.map_err(|e| panic!("accept error = {:?}", e))
.take(N as u64)
.for_each(move |socket| {
split(socket, cnt.clone());
Ok(())
})
});
}

rt.shutdown_on_idle().wait().unwrap();
for _ in 0..N {
rt.spawn({
let cnt = cnt.clone();
TcpStream::connect(&addr)
.map_err(move |e| panic!("connect error = {:?}", e))
.map(move |socket| split(socket, cnt))
});
}

rt.shutdown_on_idle().wait().unwrap();
assert_eq!(N * 4, cnt.load(Relaxed));
}
}
Loading

0 comments on commit 08c21e7

Please sign in to comment.