Skip to content

Commit

Permalink
For TcpStream fixed issue tokio-rs#1073, deregistering not working on…
Browse files Browse the repository at this point in the history
… windows. When deregistering a socket make sure it is marked for deletion so it won't be used in poll updates. Also added a new more complicated test for TcpStream deregistering and fixed the existing test which was mistankenly registering only READABLE instead of WRITABLE.

Signed-off-by: Daniel Tacalau <dst4096@gmail.com>
  • Loading branch information
dtacalau committed Nov 8, 2019
1 parent a8e61b6 commit c806d5a
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 4 deletions.
22 changes: 22 additions & 0 deletions src/sys/windows/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ impl super::SocketState for TcpStream {
let mut internal = self.internal.lock().unwrap();
match &mut *internal {
Some(internal) => {
// action of setting a None state it's a sign of deregistering a socket, so
// existing socket must be marked for deletion so it won't be used by selector
// for subsequent updates (atm it will be removed during first selector poll update)
if sock_state.is_none() {
if internal.sock_state.is_some() {
let sock_state = internal.sock_state.as_ref();
let mut sock_internal = sock_state.unwrap().lock().unwrap();
sock_internal.mark_delete();
}
}

internal.sock_state = sock_state;
}
None => {}
Expand All @@ -156,6 +167,17 @@ impl<'a> super::SocketState for &'a TcpStream {
let mut internal = self.internal.lock().unwrap();
match &mut *internal {
Some(internal) => {
// action of setting a None state it's a sign of deregistering a socket, so
// existing socket must be marked for deletion so it won't be used by selector
// for subsequent updates (atm it will be removed during first selector poll update)
if sock_state.is_none() {
if internal.sock_state.is_some() {
let sock_state = internal.sock_state.as_ref();
let mut sock_internal = sock_state.unwrap().lock().unwrap();
sock_internal.mark_delete();
}
}

internal.sock_state = sock_state;
}
None => {}
Expand Down
43 changes: 42 additions & 1 deletion tests/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use mio::{Events, Interests, Poll, Registry, Token};

mod util;

use util::{any_local_address, assert_send, assert_sync, init, TryRead, TryWrite};
use util::{
any_local_address, assert_send, assert_sync, expect_no_events, init, init_with_poll, TryRead,
TryWrite,
};

const LISTEN: Token = Token(0);
const CLIENT: Token = Token(1);
Expand Down Expand Up @@ -1188,3 +1191,41 @@ fn write_then_deregister() {
assert_eq!(s.read(&mut buf).unwrap(), 4);
assert_eq!(&buf[0..4], &[1, 2, 3, 4]);
}

#[test]
fn tcp_no_events_after_deregister() {
let (mut poll, mut events) = init_with_poll();

let listener = TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
let mut stream = TcpStream::connect(addr).unwrap();

poll.registry()
.register(&listener, Token(1), Interests::READABLE)
.unwrap();
poll.registry()
.register(&stream, Token(3), Interests::READABLE)
.unwrap();

// Wait a moment for stream to connect before trying accept
thread::sleep(Duration::from_millis(100));

let mut stream2 = listener.accept().unwrap().0;
poll.registry()
.register(&stream2, Token(2), Interests::WRITABLE)
.unwrap();

stream2.write_all(&[1, 2, 3, 4]).unwrap();

poll.registry().deregister(&listener).unwrap();
poll.registry().deregister(&stream).unwrap();
poll.registry().deregister(&stream2).unwrap();

// note: without deregister, poll would have retrieved 3 events:
// listener-READABLE, stream2-WRITABLE, stream-READABLE
expect_no_events(&mut poll, &mut events);

let mut buf = [0; 10];
assert_eq!(stream.read(&mut buf).unwrap(), 4);
assert_eq!(&buf[0..4], &[1, 2, 3, 4]);
}
19 changes: 16 additions & 3 deletions tests/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,26 +510,39 @@ fn reregistering() {
}

#[test]
fn deregistering() {
fn no_events_after_deregister() {
let (mut poll, mut events) = init_with_poll();

let (thread_handle, address) = echo_listener(any_local_address(), 1);

let stream = TcpStream::connect(address).unwrap();
let mut stream = TcpStream::connect(address).unwrap();

poll.registry()
.register(&stream, ID1, Interests::READABLE)
.register(&stream, ID1, Interests::WRITABLE.add(Interests::READABLE))
.expect("unable to register TCP stream");

poll.registry()
.deregister(&stream)
.expect("unable to deregister TCP stream");

// note: without deregistering at this point poll would retrieve Interests::WRITABLE
expect_no_events(&mut poll, &mut events);

// We do expect to be connected.
assert_eq!(stream.peer_addr().unwrap(), address);

// Also, write should work
let mut buf = [0; 16];
assert_would_block(stream.peek(&mut buf));
assert_would_block(stream.read(&mut buf));

let n = stream.write(&DATA1).expect("unable to write to stream");
assert_eq!(n, DATA1.len());
stream.flush().unwrap();

// note: without deregistering at this point poll would retrieve Interests::READABLE
expect_no_events(&mut poll, &mut events);

drop(stream);
thread_handle.join().expect("unable to join thread");
}
Expand Down

0 comments on commit c806d5a

Please sign in to comment.