From bd67037bb479371545d45b48b761b6a7c8884495 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Tue, 20 Aug 2019 14:51:31 +0200 Subject: [PATCH 01/16] Add tests for the registration process Check that the event::Source implementation is correctly called. --- tests/poll.rs | 146 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 144 insertions(+), 2 deletions(-) diff --git a/tests/poll.rs b/tests/poll.rs index 8d22cefa8..21abba0cf 100644 --- a/tests/poll.rs +++ b/tests/poll.rs @@ -1,9 +1,11 @@ use mio::net::{TcpListener, TcpStream}; -use mio::*; +use mio::{event, Events, Interests, Poll, Registry, Token}; + use std::net; -use std::sync::{Arc, Barrier}; +use std::sync::{Arc, Barrier, Mutex}; use std::thread::{self, sleep}; use std::time::Duration; +use std::{fmt, io}; mod util; @@ -210,3 +212,143 @@ pub fn test_double_register() { .register(&l, Token(1), Interests::READABLE) .is_err()); } + +struct TestEventSource(Mutex); + +struct TestEventSourceData { + registrations: Vec<(Token, Interests)>, + reregistrations: Vec<(Token, Interests)>, + deregister_count: usize, +} + +impl TestEventSource { + fn new() -> TestEventSource { + TestEventSource(Mutex::new(TestEventSourceData { + registrations: Vec::new(), + reregistrations: Vec::new(), + deregister_count: 0, + })) + } +} + +impl event::Source for TestEventSource { + fn register(&self, _registry: &Registry, token: Token, interests: Interests) -> io::Result<()> { + let mut inner = self.0.lock().unwrap(); + inner.registrations.push((token, interests)); + Ok(()) + } + + fn reregister( + &self, + _registry: &Registry, + token: Token, + interests: Interests, + ) -> io::Result<()> { + let mut inner = self.0.lock().unwrap(); + inner.reregistrations.push((token, interests)); + Ok(()) + } + + fn deregister(&self, _registry: &Registry) -> io::Result<()> { + let mut inner = self.0.lock().unwrap(); + inner.deregister_count += 1; + Ok(()) + } +} + +#[test] +fn poll_registration() { + init(); + let poll = Poll::new().unwrap(); + let registry = poll.registry(); + + let source = TestEventSource::new(); + let token = Token(0); + let interests = Interests::READABLE; + registry.register(&source, token, interests).unwrap(); + { + let source = source.0.lock().unwrap(); + assert_eq!(source.registrations.len(), 1); + assert_eq!(source.registrations.get(0), Some(&(token, interests))); + assert!(source.reregistrations.is_empty()); + assert_eq!(source.deregister_count, 0); + } + + let re_token = Token(0); + let re_interests = Interests::READABLE; + registry + .reregister(&source, re_token, re_interests) + .unwrap(); + { + let source = source.0.lock().unwrap(); + assert_eq!(source.registrations.len(), 1); + assert_eq!(source.reregistrations.len(), 1); + assert_eq!( + source.reregistrations.get(0), + Some(&(re_token, re_interests)) + ); + assert_eq!(source.deregister_count, 0); + } + + registry.deregister(&source).unwrap(); + { + let source = source.0.lock().unwrap(); + assert_eq!(source.registrations.len(), 1); + assert_eq!(source.reregistrations.len(), 1); + assert_eq!(source.deregister_count, 1); + } +} + +struct ErroneousTestEventSource; + +impl event::Source for ErroneousTestEventSource { + fn register( + &self, + _registry: &Registry, + _token: Token, + _interests: Interests, + ) -> io::Result<()> { + Err(io::Error::new(io::ErrorKind::Other, "register")) + } + + fn reregister( + &self, + _registry: &Registry, + _token: Token, + _interests: Interests, + ) -> io::Result<()> { + Err(io::Error::new(io::ErrorKind::Other, "reregister")) + } + + fn deregister(&self, _registry: &Registry) -> io::Result<()> { + Err(io::Error::new(io::ErrorKind::Other, "deregister")) + } +} + +#[test] +fn poll_erroneous_registration() { + init(); + let poll = Poll::new().unwrap(); + let registry = poll.registry(); + + let source = ErroneousTestEventSource; + let token = Token(0); + let interests = Interests::READABLE; + assert_error(registry.register(&source, token, interests), "register"); + assert_error(registry.reregister(&source, token, interests), "reregister"); + assert_error(registry.deregister(&source), "deregister"); +} + +/// Assert that `result` is an error and the formatted error (via +/// `fmt::Display`) equals `expected_msg`. +pub fn assert_error(result: Result, expected_msg: &str) { + match result { + Ok(_) => panic!("unexpected OK result"), + Err(err) => assert!( + err.to_string().contains(expected_msg), + "wanted: {}, got: {}", + err, + expected_msg + ), + } +} From 2a91ee08d51162a0c5a68caefc9c80aff826ffb8 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Thu, 29 Aug 2019 18:54:35 +0200 Subject: [PATCH 02/16] Port tests for UdpSocket from Gaea There is some overlap in what is tested now however. --- tests/udp_socket.rs | 548 +++++++++++++++++++++++++++++++++++++++++++- tests/util/mod.rs | 90 +++++++- 2 files changed, 632 insertions(+), 6 deletions(-) diff --git a/tests/udp_socket.rs b/tests/udp_socket.rs index dad87add7..c9ce33e6e 100644 --- a/tests/udp_socket.rs +++ b/tests/udp_socket.rs @@ -1,7 +1,11 @@ use std::io::ErrorKind; -use std::net::IpAddr; +use std::net::{self, IpAddr, SocketAddr}; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; use std::str; -use std::time; +use std::sync::{Arc, Barrier}; +use std::thread; +use std::time::Duration; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::{debug, info}; @@ -11,10 +15,19 @@ use mio::{Events, Interests, Poll, Registry, Token}; mod util; -use util::{any_local_address, assert_send, assert_sync, init}; +use util::{ + any_local_address, any_local_ipv6_address, assert_error, assert_send, assert_sync, + expect_events, expect_no_events, init, init_with_poll, ExpectEvent, +}; + +const DATA1: &[u8] = b"Hello world!"; +const DATA2: &[u8] = b"Hello mars!"; const LISTENER: Token = Token(0); const SENDER: Token = Token(1); +const ID1: Token = Token(2); +const ID2: Token = Token(3); +const ID3: Token = Token(4); #[test] fn is_send_and_sync() { @@ -22,6 +35,533 @@ fn is_send_and_sync() { assert_sync::(); } +#[test] +fn unconnected_udp_socket_ipv4() { + let socket1 = UdpSocket::bind(any_local_address()).unwrap(); + let socket2 = UdpSocket::bind(any_local_address()).unwrap(); + smoke_test_unconnected_udp_socket(socket1, socket2); +} + +#[test] +fn unconnected_udp_socket_ipv6() { + let socket1 = UdpSocket::bind(any_local_ipv6_address()).unwrap(); + let socket2 = UdpSocket::bind(any_local_ipv6_address()).unwrap(); + smoke_test_unconnected_udp_socket(socket1, socket2); +} + +fn smoke_test_unconnected_udp_socket(socket1: UdpSocket, socket2: UdpSocket) { + let (mut poll, mut events) = init_with_poll(); + + let address1 = socket1.local_addr().unwrap(); + let address2 = socket2.local_addr().unwrap(); + + poll.registry() + .register(&socket1, ID1, Interests::READABLE.add(Interests::WRITABLE)) + .expect("unable to register UDP socket"); + poll.registry() + .register(&socket2, ID2, Interests::READABLE.add(Interests::WRITABLE)) + .expect("unable to register UDP socket"); + + expect_events( + &mut poll, + &mut events, + vec![ + ExpectEvent::new(ID1, Interests::WRITABLE), + ExpectEvent::new(ID2, Interests::WRITABLE), + ], + ); + + socket1.send_to(DATA1, address2).unwrap(); + socket2.send_to(DATA2, address1).unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ + ExpectEvent::new(ID1, Interests::READABLE), + ExpectEvent::new(ID2, Interests::READABLE), + ], + ); + + let mut buf = [0; 20]; + let (n, got_address1) = socket1.peek_from(&mut buf).unwrap(); + assert_eq!(n, DATA2.len()); + assert_eq!(buf[..n], DATA2[..]); + assert_eq!(got_address1, address2); + + let (n, got_address2) = socket2.peek_from(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(buf[..n], DATA1[..]); + assert_eq!(got_address2, address1); + + let (n, got_address1) = socket1.recv_from(&mut buf).unwrap(); + assert_eq!(n, DATA2.len()); + assert_eq!(buf[..n], DATA2[..]); + assert_eq!(got_address1, address2); + + let (n, got_address2) = socket2.recv_from(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(buf[..n], DATA1[..]); + assert_eq!(got_address2, address1); + + assert!(socket1.take_error().unwrap().is_none()); + assert!(socket2.take_error().unwrap().is_none()); +} + +#[test] +fn connected_udp_socket_ipv4() { + let socket1 = UdpSocket::bind(any_local_address()).unwrap(); + let address1 = socket1.local_addr().unwrap(); + + let socket2 = UdpSocket::bind(any_local_address()).unwrap(); + let address2 = socket2.local_addr().unwrap(); + + socket1.connect(address2).unwrap(); + socket2.connect(address1).unwrap(); + + smoke_test_connected_udp_socket(socket1, socket2); +} + +#[test] +fn connected_udp_socket_ipv6() { + let socket1 = UdpSocket::bind(any_local_ipv6_address()).unwrap(); + let address1 = socket1.local_addr().unwrap(); + + let socket2 = UdpSocket::bind(any_local_ipv6_address()).unwrap(); + let address2 = socket2.local_addr().unwrap(); + + socket1.connect(address2).unwrap(); + socket2.connect(address1).unwrap(); + + smoke_test_connected_udp_socket(socket1, socket2); +} + +fn smoke_test_connected_udp_socket(socket1: UdpSocket, socket2: UdpSocket) { + let (mut poll, mut events) = init_with_poll(); + + poll.registry() + .register(&socket1, ID1, Interests::READABLE.add(Interests::WRITABLE)) + .expect("unable to register UDP socket"); + poll.registry() + .register(&socket2, ID2, Interests::READABLE.add(Interests::WRITABLE)) + .expect("unable to register UDP socket"); + + expect_events( + &mut poll, + &mut events, + vec![ + ExpectEvent::new(ID1, Interests::WRITABLE), + ExpectEvent::new(ID2, Interests::WRITABLE), + ], + ); + + socket1.send(DATA1).unwrap(); + socket2.send(DATA2).unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ + ExpectEvent::new(ID1, Interests::READABLE), + ExpectEvent::new(ID2, Interests::READABLE), + ], + ); + + let mut buf = [0; 20]; + let n = socket1.peek(&mut buf).unwrap(); + assert_eq!(n, DATA2.len()); + assert_eq!(buf[..n], DATA2[..]); + + let n = socket2.peek(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(buf[..n], DATA1[..]); + + let n = socket1.recv(&mut buf).unwrap(); + assert_eq!(n, DATA2.len()); + assert_eq!(buf[..n], DATA2[..]); + + let n = socket2.recv(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(buf[..n], DATA1[..]); + + assert!(socket1.take_error().unwrap().is_none()); + assert!(socket2.take_error().unwrap().is_none()); +} + +#[test] +fn reconnect_udp_socket_sending() { + let (mut poll, mut events) = init_with_poll(); + + let socket1 = UdpSocket::bind(any_local_address()).unwrap(); + let socket2 = UdpSocket::bind(any_local_address()).unwrap(); + let socket3 = UdpSocket::bind(any_local_address()).unwrap(); + + let address1 = socket1.local_addr().unwrap(); + let address2 = socket2.local_addr().unwrap(); + let address3 = socket3.local_addr().unwrap(); + + socket1.connect(address2).unwrap(); + socket2.connect(address1).unwrap(); + socket3.connect(address1).unwrap(); + + poll.registry() + .register(&socket1, ID1, Interests::READABLE.add(Interests::WRITABLE)) + .unwrap(); + poll.registry() + .register(&socket2, ID2, Interests::READABLE) + .unwrap(); + poll.registry() + .register(&socket3, ID3, Interests::READABLE) + .unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::WRITABLE)], + ); + + socket1.send(DATA1).unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID2, Interests::READABLE)], + ); + + let mut buf = [0; 20]; + let n = socket2.recv(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(buf[..n], DATA1[..]); + + socket1.connect(address3).unwrap(); + socket1.send(DATA2).unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID3, Interests::READABLE)], + ); + + let n = socket3.recv(&mut buf).unwrap(); + assert_eq!(n, DATA2.len()); + assert_eq!(buf[..n], DATA2[..]); + + assert!(socket1.take_error().unwrap().is_none()); + assert!(socket2.take_error().unwrap().is_none()); + assert!(socket3.take_error().unwrap().is_none()); +} + +#[test] +fn reconnect_udp_socket_receiving() { + let (mut poll, mut events) = init_with_poll(); + + let socket1 = UdpSocket::bind(any_local_address()).unwrap(); + let socket2 = UdpSocket::bind(any_local_address()).unwrap(); + let socket3 = UdpSocket::bind(any_local_address()).unwrap(); + + let address1 = socket1.local_addr().unwrap(); + let address2 = socket2.local_addr().unwrap(); + let address3 = socket3.local_addr().unwrap(); + + socket1.connect(address2).unwrap(); + socket2.connect(address1).unwrap(); + socket3.connect(address1).unwrap(); + + poll.registry() + .register(&socket1, ID1, Interests::READABLE) + .unwrap(); + poll.registry() + .register(&socket2, ID2, Interests::WRITABLE) + .unwrap(); + poll.registry() + .register(&socket3, ID3, Interests::WRITABLE) + .unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ + ExpectEvent::new(ID2, Interests::WRITABLE), + ExpectEvent::new(ID3, Interests::WRITABLE), + ], + ); + + socket2.send(DATA1).unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::READABLE)], + ); + + let mut buf = [0; 20]; + let n = socket1.recv(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(buf[..n], DATA1[..]); + + socket1.connect(address3).unwrap(); + socket3.send(DATA2).unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::READABLE)], + ); + + // Read only a part of the data. + let max = 4; + let n = socket1.recv(&mut buf[..max]).unwrap(); + assert_eq!(n, max); + assert_eq!(buf[..max], DATA2[..max]); + + // Now connect back to socket 2, dropping the unread data. + socket1.connect(address2).unwrap(); + socket2.send(DATA2).unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::READABLE)], + ); + + let n = socket1.recv(&mut buf).unwrap(); + assert_eq!(n, DATA2.len()); + assert_eq!(buf[..n], DATA2[..]); + + assert!(socket1.take_error().unwrap().is_none()); + assert!(socket2.take_error().unwrap().is_none()); + assert!(socket3.take_error().unwrap().is_none()); +} + +#[test] +fn unconnected_udp_socket_connected_methods() { + let (mut poll, mut events) = init_with_poll(); + + let socket1 = UdpSocket::bind(any_local_address()).unwrap(); + let socket2 = UdpSocket::bind(any_local_address()).unwrap(); + let address2 = socket2.local_addr().unwrap(); + + poll.registry() + .register(&socket1, ID1, Interests::WRITABLE) + .unwrap(); + poll.registry() + .register(&socket2, ID2, Interests::READABLE) + .unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::WRITABLE)], + ); + + // Socket is unconnected, but we're using an connected method. + assert_error(socket1.send(DATA1), "address required"); + + // Now send some actual data. + socket1.send_to(DATA1, address2).unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID2, Interests::READABLE)], + ); + + // Receive methods don't require the socket to be connected, you just won't + // know the sender. + let mut buf = [0; 20]; + let n = socket2.peek(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(buf[..n], DATA1[..]); + + let n = socket2.recv(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(buf[..n], DATA1[..]); + + assert!(socket1.take_error().unwrap().is_none()); + assert!(socket2.take_error().unwrap().is_none()); +} + +#[test] +fn connected_udp_socket_unconnected_methods() { + let (mut poll, mut events) = init_with_poll(); + + let socket1 = UdpSocket::bind(any_local_address()).unwrap(); + let socket2 = UdpSocket::bind(any_local_address()).unwrap(); + let socket3 = UdpSocket::bind(any_local_address()).unwrap(); + + let address2 = socket2.local_addr().unwrap(); + let address3 = socket3.local_addr().unwrap(); + + socket1.connect(address3).unwrap(); + socket3.connect(address2).unwrap(); + + poll.registry() + .register(&socket1, ID1, Interests::WRITABLE) + .unwrap(); + poll.registry() + .register(&socket2, ID2, Interests::WRITABLE) + .unwrap(); + poll.registry() + .register(&socket3, ID3, Interests::READABLE) + .unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ + ExpectEvent::new(ID1, Interests::WRITABLE), + ExpectEvent::new(ID2, Interests::WRITABLE), + ], + ); + + // Can't use `send_to`. + // Linux (and Android) actually allow `send_to` even if the socket is + // connected. + #[cfg(not(any(target_os = "android", target_os = "linux")))] + assert_error(socket1.send_to(DATA1, address2), "already connected"); + // Even if the address is the same. + #[cfg(not(any(target_os = "android", target_os = "linux")))] + assert_error(socket1.send_to(DATA1, address3), "already connected"); + + socket2.send_to(DATA2, address3).unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID3, Interests::READABLE)], + ); + + let mut buf = [0; 20]; + let (n, got_address1) = socket3.peek_from(&mut buf).unwrap(); + assert_eq!(n, DATA2.len()); + assert_eq!(buf[..n], DATA2[..]); + assert_eq!(got_address1, address2); + + let (n, got_address2) = socket3.recv_from(&mut buf).unwrap(); + assert_eq!(n, DATA2.len()); + assert_eq!(buf[..n], DATA2[..]); + assert_eq!(got_address2, address2); + + assert!(socket1.take_error().unwrap().is_none()); + assert!(socket2.take_error().unwrap().is_none()); + assert!(socket3.take_error().unwrap().is_none()); +} + +#[test] +#[cfg(unix)] +fn udp_socket_raw_fd() { + init(); + + let socket = UdpSocket::bind(any_local_address()).unwrap(); + let address = socket.local_addr().unwrap(); + + let raw_fd1 = socket.as_raw_fd(); + let raw_fd2 = socket.into_raw_fd(); + assert_eq!(raw_fd1, raw_fd2); + + let socket = unsafe { UdpSocket::from_raw_fd(raw_fd2) }; + assert_eq!(socket.as_raw_fd(), raw_fd1); + assert_eq!(socket.local_addr().unwrap(), address); +} + +#[test] +fn udp_socket_register() { + let (mut poll, mut events) = init_with_poll(); + + let socket = UdpSocket::bind(any_local_address()).unwrap(); + poll.registry() + .register(&socket, ID1, Interests::READABLE) + .expect("unable to register UDP socket"); + + expect_no_events(&mut poll, &mut events); + + // NOTE: more tests are done in the smoke tests above. +} + +#[test] +fn udp_socket_reregister() { + let (mut poll, mut events) = init_with_poll(); + + let socket = UdpSocket::bind(any_local_address()).unwrap(); + let address = socket.local_addr().unwrap(); + + let barrier = Arc::new(Barrier::new(2)); + let thread_handle = send_packets(address, 1, barrier.clone()); + + poll.registry() + .register(&socket, ID1, Interests::WRITABLE) + .unwrap(); + // Let the first packet be send. + barrier.wait(); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::WRITABLE)], // Not readable! + ); + + poll.registry() + .reregister(&socket, ID2, Interests::READABLE) + .unwrap(); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID2, Interests::READABLE)], + ); + + let mut buf = [0; 20]; + let (n, _) = socket.recv_from(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(buf[..n], DATA1[..]); + + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn udp_socket_deregister() { + let (mut poll, mut events) = init_with_poll(); + + let socket = UdpSocket::bind(any_local_address()).unwrap(); + let address = socket.local_addr().unwrap(); + + let barrier = Arc::new(Barrier::new(2)); + let thread_handle = send_packets(address, 1, barrier.clone()); + + poll.registry() + .register(&socket, ID1, Interests::READABLE) + .unwrap(); + + // Let the packet be send. + barrier.wait(); + + poll.registry().deregister(&socket).unwrap(); + + expect_no_events(&mut poll, &mut events); + + // But we do expect a packet to be send. + let mut buf = [0; 20]; + let (n, _) = socket.recv_from(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(buf[..n], DATA1[..]); + + thread_handle.join().expect("unable to join thread"); +} + +/// Sends `n_packets` packets to `address`, over UDP, after the `barrier` is +/// waited (before each send) on in another thread. +fn send_packets( + address: SocketAddr, + n_packets: usize, + barrier: Arc, +) -> thread::JoinHandle<()> { + thread::spawn(move || { + let socket = net::UdpSocket::bind(any_local_address()).unwrap(); + for _ in 0..n_packets { + barrier.wait(); + assert_eq!(socket.send_to(DATA1, address).unwrap(), DATA1.len()); + } + }) +} + pub struct UdpHandlerSendRecv { tx: UdpSocket, rx: UdpSocket, @@ -176,7 +716,7 @@ pub fn test_udp_socket_discard() { let mut events = Events::with_capacity(1024); - poll.poll(&mut events, Some(time::Duration::from_secs(5))) + poll.poll(&mut events, Some(Duration::from_secs(5))) .unwrap(); for event in &events { diff --git a/tests/util/mod.rs b/tests/util/mod.rs index 12c1bd79b..ba5b6519e 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -1,13 +1,16 @@ // Not all functions are used by all tests. #![allow(dead_code)] +use std::fmt; use std::io::{self, Read, Write}; use std::net::SocketAddr; use std::sync::Once; use std::time::Duration; use bytes::{Buf, BufMut}; -use mio::{Events, Poll}; +use log::{error, warn}; +use mio::event::Event; +use mio::{Events, Interests, Poll, Token}; pub fn init() { static INIT: Once = Once::new(); @@ -17,6 +20,14 @@ pub fn init() { }) } +pub fn init_with_poll() -> (Poll, Events) { + init(); + + let poll = Poll::new().expect("unable to create Poll instance"); + let events = Events::with_capacity(16); + (poll, events) +} + pub fn assert_sync() {} pub fn assert_send() {} @@ -103,10 +114,85 @@ impl MapNonBlock for io::Result { } } +/// An event that is expected to show up when `Poll` is polled, see +/// `expect_events`. +#[derive(Debug)] +pub struct ExpectEvent { + token: Token, + // We're (ab)using `Interests` as readiness in `matches`. + interests: Interests, +} + +impl ExpectEvent { + pub const fn new(token: Token, interests: Interests) -> ExpectEvent { + ExpectEvent { token, interests } + } + + fn matches(&self, event: &Event) -> bool { + event.token() == self.token && + // If we expect a readiness then also match on the event. + // In maths terms that is p -> q, which is the same as !p || q. + (!self.interests.is_readable() || event.is_readable()) && + (!self.interests.is_writable() || event.is_writable()) && + (!self.interests.is_aio() || event.is_aio()) && + (!self.interests.is_lio() || event.is_lio()) + } +} + +pub fn expect_events(poll: &mut Poll, events: &mut Events, mut expected: Vec) { + // In a lot of calls we expect more then one event, but it could be that + // poll returns the first event only in a single call. To be a bit more + // lenient we'll poll a couple of times. + for _ in 0..3 { + poll.poll(events, Some(Duration::from_millis(500))) + .expect("unable to poll"); + + for event in events.iter() { + let index = expected.iter().position(|expected| expected.matches(event)); + + if let Some(index) = index { + expected.swap_remove(index); + } else { + // Must accept sporadic events. + warn!("got unexpected event: {:?}", event); + } + } + + if expected.is_empty() { + return; + } + } + + assert!( + expected.is_empty(), + "the following expected events were not found: {:?}", + expected + ); +} + pub fn expect_no_events(poll: &mut Poll, events: &mut Events) { poll.poll(events, Some(Duration::from_millis(50))) .expect("unable to poll"); - assert!(events.is_empty(), "received events, but didn't expect any"); + if !events.is_empty() { + for event in events.iter() { + error!("unexpected event: {:?}", event); + } + panic!("received events, but didn't expect any, see above"); + } +} + +/// Assert that `result` is an error and the formatted error (via +/// `fmt::Display`) equals `expected_msg`. +pub fn assert_error(result: Result, expected_msg: &str) { + match result { + Ok(_) => panic!("unexpected OK result"), + Err(err) => assert!( + err.to_string().contains(expected_msg), + "wanted: {}, got: {}", + err, + expected_msg + ), + } } /// Bind to any port on localhost. From 7a24525b1716097169a025abf5f50d0d71883e7a Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Fri, 30 Aug 2019 13:07:31 +0200 Subject: [PATCH 03/16] Check that UdpSocket is non-blocking in smoke test When no data is available to read/peek the socket should return a WouldBlock error. --- tests/udp_socket.rs | 11 +++++++++-- tests/util/mod.rs | 9 +++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/tests/udp_socket.rs b/tests/udp_socket.rs index c9ce33e6e..64bfa458f 100644 --- a/tests/udp_socket.rs +++ b/tests/udp_socket.rs @@ -17,7 +17,7 @@ mod util; use util::{ any_local_address, any_local_ipv6_address, assert_error, assert_send, assert_sync, - expect_events, expect_no_events, init, init_with_poll, ExpectEvent, + assert_would_block, expect_events, expect_no_events, init, init_with_poll, ExpectEvent, }; const DATA1: &[u8] = b"Hello world!"; @@ -71,6 +71,10 @@ fn smoke_test_unconnected_udp_socket(socket1: UdpSocket, socket2: UdpSocket) { ], ); + let mut buf = [0; 20]; + assert_would_block(socket1.peek_from(&mut buf)); + assert_would_block(socket1.recv_from(&mut buf)); + socket1.send_to(DATA1, address2).unwrap(); socket2.send_to(DATA2, address1).unwrap(); @@ -83,7 +87,6 @@ fn smoke_test_unconnected_udp_socket(socket1: UdpSocket, socket2: UdpSocket) { ], ); - let mut buf = [0; 20]; let (n, got_address1) = socket1.peek_from(&mut buf).unwrap(); assert_eq!(n, DATA2.len()); assert_eq!(buf[..n], DATA2[..]); @@ -155,6 +158,10 @@ fn smoke_test_connected_udp_socket(socket1: UdpSocket, socket2: UdpSocket) { ], ); + let mut buf = [0; 20]; + assert_would_block(socket1.peek(&mut buf)); + assert_would_block(socket1.recv(&mut buf)); + socket1.send(DATA1).unwrap(); socket2.send(DATA2).unwrap(); diff --git a/tests/util/mod.rs b/tests/util/mod.rs index ba5b6519e..8fe897840 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -195,6 +195,15 @@ pub fn assert_error(result: Result, expected_msg: &str } } +/// Assert that the provided result is an `io::Error` with kind `WouldBlock`. +pub fn assert_would_block(result: io::Result) { + match result { + Ok(_) => panic!("unexpected OK result, expected a `WouldBlock` error"), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {} + Err(err) => panic!("unexpected error result: {}", err), + } +} + /// Bind to any port on localhost. pub fn any_local_address() -> SocketAddr { "127.0.0.1:0".parse().unwrap() From 49da93883e58e6ed1e42a127fc63164d1a67a4da Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Fri, 30 Aug 2019 13:33:26 +0200 Subject: [PATCH 04/16] Port tests for TcpStream from Gaea --- tests/tcp_stream.rs | 464 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 464 insertions(+) create mode 100644 tests/tcp_stream.rs diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs new file mode 100644 index 000000000..88289f1a8 --- /dev/null +++ b/tests/tcp_stream.rs @@ -0,0 +1,464 @@ +use std::io::{self, IoSlice, IoSliceMut, Read, Write}; +use std::net::{self, Shutdown, SocketAddr}; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; +use std::sync::mpsc::channel; +use std::sync::{Arc, Barrier}; +use std::thread; + +use mio::net::TcpStream; +use mio::{Interests, Token}; + +mod util; + +use util::{ + any_local_address, any_local_ipv6_address, assert_send, assert_sync, assert_would_block, + expect_events, expect_no_events, init, init_with_poll, ExpectEvent, +}; + +const DATA1: &[u8] = b"Hello world!"; +const DATA2: &[u8] = b"Hello mars!"; +// TODO: replace with `DATA1.len()` once `const_slice_len` is stable. +const DATA1_LEN: usize = 12; +const DATA2_LEN: usize = 11; + +const ID1: Token = Token(0); +const ID2: Token = Token(1); + +#[test] +fn is_send_and_sync() { + assert_send::(); + assert_sync::(); +} + +#[test] +fn tcp_stream_ipv4() { + smoke_test_tcp_stream(any_local_address()); +} + +#[test] +fn tcp_stream_ipv6() { + smoke_test_tcp_stream(any_local_ipv6_address()); +} + +fn smoke_test_tcp_stream(addr: SocketAddr) { + let (mut poll, mut events) = init_with_poll(); + + let (thread_handle, addr) = echo_listener(addr, 1); + let mut stream = TcpStream::connect(addr).unwrap(); + + poll.registry() + .register(&stream, ID1, Interests::WRITABLE.add(Interests::READABLE)) + .expect("unable to register TCP stream"); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::WRITABLE)], + ); + + let mut buf = [0; 16]; + assert_would_block(stream.peek(&mut buf)); + assert_would_block(stream.read(&mut buf)); + + // NOTE: the call to `peer_addr` must happen after we received a writable + // event as the stream might not yet be connected. + assert_eq!(stream.peer_addr().unwrap(), addr); + assert!(stream.local_addr().unwrap().ip().is_loopback()); + + let n = stream.write(&DATA1).expect("unable to write to stream"); + assert_eq!(n, DATA1.len()); + + stream.flush().unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::READABLE)], + ); + + let n = stream.peek(&mut buf).expect("unable to peek from stream"); + assert_eq!(n, DATA1.len()); + assert_eq!(&buf[..n], DATA1); + + let n = stream.read(&mut buf).expect("unable to read from stream"); + assert_eq!(n, DATA1.len()); + assert_eq!(&buf[..n], DATA1); + + assert!(stream.take_error().unwrap().is_none()); + + let bufs = [IoSlice::new(&DATA1), IoSlice::new(&DATA2)]; + let n = stream + .write_vectored(&bufs) + .expect("unable to write vectored to stream"); + assert_eq!(n, DATA1.len() + DATA2.len()); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::READABLE)], + ); + + let mut buf1 = [1; DATA1_LEN]; + let mut buf2 = [2; DATA2_LEN + 1]; + let mut bufs = [IoSliceMut::new(&mut buf1), IoSliceMut::new(&mut buf2)]; + let n = stream + .read_vectored(&mut bufs) + .expect("unable to read vectored from stream"); + assert_eq!(n, DATA1.len() + DATA2.len()); + assert_eq!(&buf1, DATA1); + assert_eq!(&buf2[..DATA2.len()], DATA2); + assert_eq!(buf2[DATA2.len()], 2); // Last byte should be unchanged. + + // Close the connection to allow the listener to shutdown. + drop(stream); + + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn try_clone() { + let (mut poll, mut events) = init_with_poll(); + + let (thread_handle, address) = echo_listener(any_local_address(), 1); + + let mut stream1 = TcpStream::connect(address).unwrap(); + + poll.registry() + .register(&stream1, ID1, Interests::WRITABLE) + .expect("unable to register TCP stream"); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::WRITABLE)], + ); + + let n = stream1.write(DATA1).unwrap(); + assert_eq!(n, DATA1.len()); + + let mut stream2 = stream1.try_clone().unwrap(); + + // When using `try_clone` the `TcpStream` needs to be deregistered! + poll.registry().deregister(&stream1).unwrap(); + drop(stream1); + + poll.registry() + .register(&stream2, ID2, Interests::READABLE) + .expect("unable to register TCP stream"); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID2, Interests::READABLE)], + ); + + let mut buf = [0; 20]; + let n = stream2.read(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(&buf[..n], DATA1); + + drop(stream2); + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn ttl() { + init(); + + let barrier = Arc::new(Barrier::new(2)); + let (thread_handle, address) = start_listener(1, Some(barrier.clone())); + + let stream = TcpStream::connect(address).unwrap(); + + const TTL: u32 = 10; + stream.set_ttl(TTL).unwrap(); + assert_eq!(stream.ttl().unwrap(), TTL); + assert!(stream.take_error().unwrap().is_none()); + + barrier.wait(); + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn nodelay() { + init(); + + let barrier = Arc::new(Barrier::new(2)); + let (thread_handle, address) = start_listener(1, Some(barrier.clone())); + + let stream = TcpStream::connect(address).unwrap(); + + const NO_DELAY: bool = true; + stream.set_nodelay(NO_DELAY).unwrap(); + assert_eq!(stream.nodelay().unwrap(), NO_DELAY); + assert!(stream.take_error().unwrap().is_none()); + + barrier.wait(); + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn shutdown_read() { + let (mut poll, mut events) = init_with_poll(); + + let (thread_handle, address) = echo_listener(any_local_address(), 1); + + let mut stream = TcpStream::connect(address).unwrap(); + + poll.registry() + .register(&stream, ID1, Interests::WRITABLE.add(Interests::READABLE)) + .expect("unable to register TCP stream"); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::WRITABLE)], + ); + + let n = stream.write(DATA2).unwrap(); + assert_eq!(n, DATA2.len()); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::READABLE)], + ); + + stream.shutdown(Shutdown::Read).unwrap(); + + let mut buf = [0; 20]; + let n = stream.read(&mut buf).unwrap(); + assert_eq!(n, 0); + + drop(stream); + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn shutdown_write() { + let (mut poll, mut events) = init_with_poll(); + + let (thread_handle, address) = echo_listener(any_local_address(), 1); + + let mut stream = TcpStream::connect(address).unwrap(); + + poll.registry() + .register(&stream, ID1, Interests::WRITABLE.add(Interests::READABLE)) + .expect("unable to register TCP stream"); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::WRITABLE)], + ); + + let n = stream.write(DATA1).unwrap(); + assert_eq!(n, DATA1.len()); + + stream.shutdown(Shutdown::Write).unwrap(); + + let err = stream.write(DATA2).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::BrokenPipe); + + // TODO: this can be flaky, fix that. + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::READABLE)], + ); + + // Read should be ok. + let mut buf = [0; 20]; + let n = stream.read(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(&buf[..n], DATA1); + + drop(stream); + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn shutdown_both() { + let (mut poll, mut events) = init_with_poll(); + + let (thread_handle, address) = echo_listener(any_local_address(), 1); + + let mut stream = TcpStream::connect(address).unwrap(); + + poll.registry() + .register(&stream, ID1, Interests::WRITABLE.add(Interests::READABLE)) + .expect("unable to register TCP stream"); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::WRITABLE)], + ); + + let n = stream.write(DATA1).unwrap(); + assert_eq!(n, DATA1.len()); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::READABLE)], + ); + + stream.shutdown(Shutdown::Both).unwrap(); + + let mut buf = [0; 20]; + let n = stream.read(&mut buf).unwrap(); + assert_eq!(n, 0); + + let err = stream.write(DATA2).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::BrokenPipe); + + drop(stream); + thread_handle.join().expect("unable to join thread"); +} + +#[test] +#[cfg(unix)] +fn raw_fd() { + init(); + + let (thread_handle, address) = start_listener(1, None); + + let stream = TcpStream::connect(address).unwrap(); + let address = stream.local_addr().unwrap(); + + let raw_fd1 = stream.as_raw_fd(); + let raw_fd2 = stream.into_raw_fd(); + assert_eq!(raw_fd1, raw_fd2); + + let stream = unsafe { TcpStream::from_raw_fd(raw_fd2) }; + assert_eq!(stream.as_raw_fd(), raw_fd1); + assert_eq!(stream.local_addr().unwrap(), address); + + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn registering() { + let (mut poll, mut events) = init_with_poll(); + + let (thread_handle, address) = echo_listener(any_local_address(), 1); + + let stream = TcpStream::connect(address).unwrap(); + + poll.registry() + .register(&stream, ID1, Interests::READABLE) + .expect("unable to register TCP stream"); + + expect_no_events(&mut poll, &mut events); + + // NOTE: more tests are done in the smoke tests above. + + drop(stream); + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn reregistering() { + let (mut poll, mut events) = init_with_poll(); + + let (thread_handle, address) = echo_listener(any_local_address(), 1); + + let stream = TcpStream::connect(address).unwrap(); + + poll.registry() + .register(&stream, ID1, Interests::READABLE) + .expect("unable to register TCP stream"); + + poll.registry() + .reregister(&stream, ID2, Interests::WRITABLE) + .expect("unable to reregister TCP stream"); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID2, Interests::WRITABLE)], + ); + + assert_eq!(stream.peer_addr().unwrap(), address); + + drop(stream); + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn deregistering() { + let (mut poll, mut events) = init_with_poll(); + + let (thread_handle, address) = echo_listener(any_local_address(), 1); + + let stream = TcpStream::connect(address).unwrap(); + + poll.registry() + .register(&stream, ID1, Interests::READABLE) + .expect("unable to register TCP stream"); + + poll.registry() + .deregister(&stream) + .expect("unable to deregister TCP stream"); + + expect_no_events(&mut poll, &mut events); + + // We do expect to be connected. + assert_eq!(stream.peer_addr().unwrap(), address); + + drop(stream); + thread_handle.join().expect("unable to join thread"); +} + +/// Start a listener that accepts `n_connections` connections on the returned +/// address. It echos back any data it reads from the connection before +/// accepting another one. +fn echo_listener(addr: SocketAddr, n_connections: usize) -> (thread::JoinHandle<()>, SocketAddr) { + let (sender, receiver) = channel(); + let thread_handle = thread::spawn(move || { + let listener = net::TcpListener::bind(addr).unwrap(); + let local_address = listener.local_addr().unwrap(); + sender.send(local_address).unwrap(); + + let mut buf = [0; 128]; + for _ in 0..n_connections { + let (mut stream, _) = listener.accept().unwrap(); + + loop { + let n = stream.read(&mut buf).expect("error reading"); + if n == 0 { + break; + } + let written = stream.write(&buf[..n]).expect("error writing"); + assert_eq!(written, n, "short write"); + } + } + }); + (thread_handle, receiver.recv().unwrap()) +} + +/// Start a listener that accepts `n_connections` connections on the returned +/// address. If a barrier is provided it will wait on it before closing the +/// connection. +fn start_listener( + n_connections: usize, + barrier: Option>, +) -> (thread::JoinHandle<()>, SocketAddr) { + let (sender, receiver) = channel(); + let thread_handle = thread::spawn(move || { + let listener = net::TcpListener::bind(any_local_address()).unwrap(); + let local_address = listener.local_addr().unwrap(); + sender.send(local_address).unwrap(); + + for _ in 0..n_connections { + let (stream, _) = listener.accept().unwrap(); + if let Some(ref barrier) = barrier { + barrier.wait(); + } + drop(stream); + } + }); + (thread_handle, receiver.recv().unwrap()) +} From add262520f777d5564c26b7f2fc8596a07c4e83f Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sat, 31 Aug 2019 13:09:17 +0200 Subject: [PATCH 05/16] Port tests for TcpListener from Gaea --- tests/tcp_listener.rs | 310 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 310 insertions(+) create mode 100644 tests/tcp_listener.rs diff --git a/tests/tcp_listener.rs b/tests/tcp_listener.rs new file mode 100644 index 000000000..e8ec352f8 --- /dev/null +++ b/tests/tcp_listener.rs @@ -0,0 +1,310 @@ +use std::io::Read; +use std::net::{self, SocketAddr}; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; +use std::sync::{Arc, Barrier}; +use std::thread; + +use mio::net::TcpListener; +use mio::{Interests, Poll, Token}; + +mod util; + +use util::{ + any_local_address, any_local_ipv6_address, assert_send, assert_sync, assert_would_block, + expect_events, expect_no_events, init, init_with_poll, ExpectEvent, +}; + +const ID1: Token = Token(0); +const ID2: Token = Token(1); + +#[test] +fn is_send_and_sync() { + assert_send::(); + assert_sync::(); +} + +#[test] +fn tcp_listener() { + smoke_test_tcp_listener(any_local_address()); +} + +#[test] +fn tcp_listener_ipv6() { + smoke_test_tcp_listener(any_local_ipv6_address()); +} + +fn smoke_test_tcp_listener(addr: SocketAddr) { + let (mut poll, mut events) = init_with_poll(); + + let listener = TcpListener::bind(addr).unwrap(); + let address = listener.local_addr().unwrap(); + + poll.registry() + .register(&listener, ID1, Interests::READABLE) + .expect("unable to register TCP listener"); + + let barrier = Arc::new(Barrier::new(2)); + let thread_handle = start_connections(address, 1, barrier.clone()); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interests::READABLE)], + ); + + // Expect a single connection. + let (mut stream, peer_address) = listener.accept().expect("unable to accept connection"); + assert!(peer_address.ip().is_loopback()); + assert_eq!(stream.peer_addr().unwrap(), peer_address); + assert_eq!(stream.local_addr().unwrap(), address); + + // Expect the stream to be non-blocking. + let mut buf = [0; 20]; + assert_would_block(stream.read(&mut buf)); + + // Expect no more connections. + assert_would_block(listener.accept()); + + assert!(listener.take_error().unwrap().is_none()); + + barrier.wait(); + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn ttl() { + init(); + + let listener = TcpListener::bind(any_local_address()).unwrap(); + + const TTL: u32 = 10; + listener.set_ttl(TTL).unwrap(); + assert_eq!(listener.ttl().unwrap(), TTL); + assert!(listener.take_error().unwrap().is_none()); +} + +#[test] +#[cfg(unix)] +fn raw_fd() { + init(); + + let listener = TcpListener::bind(any_local_address()).unwrap(); + let address = listener.local_addr().unwrap(); + + let raw_fd1 = listener.as_raw_fd(); + let raw_fd2 = listener.into_raw_fd(); + assert_eq!(raw_fd1, raw_fd2); + + let listener = unsafe { TcpListener::from_raw_fd(raw_fd2) }; + assert_eq!(listener.as_raw_fd(), raw_fd1); + assert_eq!(listener.local_addr().unwrap(), address); +} + +#[test] +fn registering() { + let (mut poll, mut events) = init_with_poll(); + + let stream = TcpListener::bind(any_local_address()).unwrap(); + + poll.registry() + .register(&stream, ID1, Interests::READABLE) + .expect("unable to register TCP listener"); + + expect_no_events(&mut poll, &mut events); + + // NOTE: more tests are done in the smoke tests above. +} + +#[test] +fn reregister() { + let (mut poll, mut events) = init_with_poll(); + + let listener = TcpListener::bind(any_local_address()).unwrap(); + let address = listener.local_addr().unwrap(); + + poll.registry() + .register(&listener, ID1, Interests::READABLE) + .unwrap(); + poll.registry() + .reregister(&listener, ID2, Interests::READABLE) + .unwrap(); + + let barrier = Arc::new(Barrier::new(2)); + let thread_handle = start_connections(address, 1, barrier.clone()); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID2, Interests::READABLE)], + ); + + let (stream, peer_address) = listener.accept().expect("unable to accept connection"); + assert!(peer_address.ip().is_loopback()); + assert_eq!(stream.peer_addr().unwrap(), peer_address); + assert_eq!(stream.local_addr().unwrap(), address); + + assert_would_block(listener.accept()); + + assert!(listener.take_error().unwrap().is_none()); + + barrier.wait(); + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn deregister() { + let (mut poll, mut events) = init_with_poll(); + + let listener = TcpListener::bind(any_local_address()).unwrap(); + let address = listener.local_addr().unwrap(); + + poll.registry() + .register(&listener, ID1, Interests::READABLE) + .unwrap(); + + let barrier = Arc::new(Barrier::new(2)); + let thread_handle = start_connections(address, 1, barrier.clone()); + + poll.registry().deregister(&listener).unwrap(); + + expect_no_events(&mut poll, &mut events); + + // Should still be able to accept the connection. + let (stream, peer_address) = listener.accept().expect("unable to accept connection"); + assert!(peer_address.ip().is_loopback()); + assert_eq!(stream.peer_addr().unwrap(), peer_address); + assert_eq!(stream.local_addr().unwrap(), address); + + assert_would_block(listener.accept()); + + assert!(listener.take_error().unwrap().is_none()); + + barrier.wait(); + thread_handle.join().expect("unable to join thread"); +} + +#[test] +fn try_clone_same_poll() { + let (mut poll, mut events) = init_with_poll(); + + let listener1 = TcpListener::bind(any_local_address()).unwrap(); + let listener2 = listener1.try_clone().expect("unable to clone TCP listener"); + #[cfg(unix)] + assert_ne!(listener1.as_raw_fd(), listener2.as_raw_fd()); + let address = listener1.local_addr().unwrap(); + assert_eq!(address, listener2.local_addr().unwrap()); + + let barrier = Arc::new(Barrier::new(3)); + let thread_handle1 = start_connections(address, 1, barrier.clone()); + let thread_handle2 = start_connections(address, 1, barrier.clone()); + + poll.registry() + .register(&listener1, ID1, Interests::READABLE) + .unwrap(); + poll.registry() + .register(&listener2, ID2, Interests::READABLE) + .unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ + ExpectEvent::new(ID1, Interests::READABLE), + ExpectEvent::new(ID2, Interests::READABLE), + ], + ); + + let (stream, peer_address) = listener1.accept().expect("unable to accept connection"); + assert!(peer_address.ip().is_loopback()); + assert_eq!(stream.peer_addr().unwrap(), peer_address); + assert_eq!(stream.local_addr().unwrap(), address); + + let (stream, peer_address) = listener2.accept().expect("unable to accept connection"); + assert!(peer_address.ip().is_loopback()); + assert_eq!(stream.peer_addr().unwrap(), peer_address); + assert_eq!(stream.local_addr().unwrap(), address); + + assert_would_block(listener1.accept()); + assert_would_block(listener2.accept()); + + assert!(listener1.take_error().unwrap().is_none()); + assert!(listener2.take_error().unwrap().is_none()); + + barrier.wait(); + thread_handle1.join().expect("unable to join thread"); + thread_handle2.join().expect("unable to join thread"); +} + +#[test] +fn try_clone_different_poll() { + let (mut poll1, mut events) = init_with_poll(); + let mut poll2 = Poll::new().unwrap(); + + let listener1 = TcpListener::bind(any_local_address()).unwrap(); + let listener2 = listener1.try_clone().expect("unable to clone TCP listener"); + #[cfg(unix)] + assert_ne!(listener1.as_raw_fd(), listener2.as_raw_fd()); + let address = listener1.local_addr().unwrap(); + assert_eq!(address, listener2.local_addr().unwrap()); + + let barrier = Arc::new(Barrier::new(3)); + let thread_handle1 = start_connections(address, 1, barrier.clone()); + let thread_handle2 = start_connections(address, 1, barrier.clone()); + + poll1 + .registry() + .register(&listener1, ID1, Interests::READABLE) + .unwrap(); + poll2 + .registry() + .register(&listener2, ID2, Interests::READABLE) + .unwrap(); + + expect_events( + &mut poll1, + &mut events, + vec![ExpectEvent::new(ID1, Interests::READABLE)], + ); + expect_events( + &mut poll2, + &mut events, + vec![ExpectEvent::new(ID2, Interests::READABLE)], + ); + + let (stream, peer_address) = listener1.accept().expect("unable to accept connection"); + assert!(peer_address.ip().is_loopback()); + assert_eq!(stream.peer_addr().unwrap(), peer_address); + assert_eq!(stream.local_addr().unwrap(), address); + + let (stream, peer_address) = listener2.accept().expect("unable to accept connection"); + assert!(peer_address.ip().is_loopback()); + assert_eq!(stream.peer_addr().unwrap(), peer_address); + assert_eq!(stream.local_addr().unwrap(), address); + + assert_would_block(listener1.accept()); + assert_would_block(listener2.accept()); + + assert!(listener1.take_error().unwrap().is_none()); + assert!(listener2.take_error().unwrap().is_none()); + + barrier.wait(); + thread_handle1.join().expect("unable to join thread"); + thread_handle2.join().expect("unable to join thread"); +} + +/// Start `n_connections` connections to `address`. If a `barrier` is provided +/// it will wait on it after each connection is made before it is dropped. +fn start_connections( + address: SocketAddr, + n_connections: usize, + barrier: Arc, +) -> thread::JoinHandle<()> { + thread::spawn(move || { + for _ in 0..n_connections { + let conn = net::TcpStream::connect(address).unwrap(); + barrier.wait(); + drop(conn); + } + }) +} From 3b81bbefa6702b9ab19600c6c1f2b235b6a84514 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sat, 31 Aug 2019 13:22:35 +0200 Subject: [PATCH 06/16] Ignore flaky TcpStream shutdown_write test --- tests/tcp_stream.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 88289f1a8..c337f0b79 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -236,6 +236,7 @@ fn shutdown_read() { } #[test] +#[ignore = "This test is flaky, it doesn't always receive an event after shutting down the write side"] fn shutdown_write() { let (mut poll, mut events) = init_with_poll(); @@ -261,7 +262,7 @@ fn shutdown_write() { let err = stream.write(DATA2).unwrap_err(); assert_eq!(err.kind(), io::ErrorKind::BrokenPipe); - // TODO: this can be flaky, fix that. + // FIXME: we don't always receive the following event. expect_events( &mut poll, &mut events, From a7e531e04dafa15f10b11e389feb4a12f83d31b1 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sat, 31 Aug 2019 13:34:18 +0200 Subject: [PATCH 07/16] Set accepted TcpStreams in non-blocking mode --- src/sys/unix/tcp_listener.rs | 8 +++++--- src/sys/windows/tcp.rs | 18 ++++++++++-------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/sys/unix/tcp_listener.rs b/src/sys/unix/tcp_listener.rs index 1395df56a..9fdf3751c 100644 --- a/src/sys/unix/tcp_listener.rs +++ b/src/sys/unix/tcp_listener.rs @@ -49,9 +49,11 @@ impl TcpListener { } pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { - self.inner - .accept() - .map(|(inner, addr)| (TcpStream::new(inner), addr)) + self.inner.accept().and_then(|(inner, addr)| { + inner + .set_nonblocking(true) + .map(|()| (TcpStream::new(inner), addr)) + }) } pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { diff --git a/src/sys/windows/tcp.rs b/src/sys/windows/tcp.rs index d34b82178..113ce0141 100644 --- a/src/sys/windows/tcp.rs +++ b/src/sys/windows/tcp.rs @@ -359,14 +359,16 @@ impl TcpListener { } pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { - wouldblock!(self, accept).map(|(inner, addr)| { - ( - TcpStream { - internal: Arc::new(Mutex::new(None)), - inner, - }, - addr, - ) + wouldblock!(self, accept).and_then(|(inner, addr)| { + inner.set_nonblocking(true).map(|()| { + ( + TcpStream { + internal: Arc::new(Mutex::new(None)), + inner, + }, + addr, + ) + }) }) } From d88f51a313ab4b5e32bf76c835689a82680790c6 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sat, 31 Aug 2019 13:36:32 +0200 Subject: [PATCH 08/16] Disable TcpListener deregister test on Windows Doesn't work, see issue #1070. --- tests/tcp_listener.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/tcp_listener.rs b/tests/tcp_listener.rs index e8ec352f8..affb2fd11 100644 --- a/tests/tcp_listener.rs +++ b/tests/tcp_listener.rs @@ -153,6 +153,7 @@ fn reregister() { } #[test] +#[cfg_attr(windows, ignore = "deregister doesn't work, see #1073")] fn deregister() { let (mut poll, mut events) = init_with_poll(); From b20377a6d6f378093d40bf263472967d86abe890 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sat, 31 Aug 2019 13:46:57 +0200 Subject: [PATCH 09/16] Make TcpListener::try_clone tests more robust Before it could be that only one of the connections would be ready to be accepted, result in would-block errors. --- tests/tcp_listener.rs | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/tests/tcp_listener.rs b/tests/tcp_listener.rs index affb2fd11..3de0ba9bd 100644 --- a/tests/tcp_listener.rs +++ b/tests/tcp_listener.rs @@ -198,7 +198,6 @@ fn try_clone_same_poll() { let barrier = Arc::new(Barrier::new(3)); let thread_handle1 = start_connections(address, 1, barrier.clone()); - let thread_handle2 = start_connections(address, 1, barrier.clone()); poll.registry() .register(&listener1, ID1, Interests::READABLE) @@ -221,6 +220,17 @@ fn try_clone_same_poll() { assert_eq!(stream.peer_addr().unwrap(), peer_address); assert_eq!(stream.local_addr().unwrap(), address); + let thread_handle2 = start_connections(address, 1, barrier.clone()); + + expect_events( + &mut poll, + &mut events, + vec![ + ExpectEvent::new(ID1, Interests::READABLE), + ExpectEvent::new(ID2, Interests::READABLE), + ], + ); + let (stream, peer_address) = listener2.accept().expect("unable to accept connection"); assert!(peer_address.ip().is_loopback()); assert_eq!(stream.peer_addr().unwrap(), peer_address); @@ -251,7 +261,6 @@ fn try_clone_different_poll() { let barrier = Arc::new(Barrier::new(3)); let thread_handle1 = start_connections(address, 1, barrier.clone()); - let thread_handle2 = start_connections(address, 1, barrier.clone()); poll1 .registry() @@ -278,6 +287,19 @@ fn try_clone_different_poll() { assert_eq!(stream.peer_addr().unwrap(), peer_address); assert_eq!(stream.local_addr().unwrap(), address); + let thread_handle2 = start_connections(address, 1, barrier.clone()); + + expect_events( + &mut poll1, + &mut events, + vec![ExpectEvent::new(ID1, Interests::READABLE)], + ); + expect_events( + &mut poll2, + &mut events, + vec![ExpectEvent::new(ID2, Interests::READABLE)], + ); + let (stream, peer_address) = listener2.accept().expect("unable to accept connection"); assert!(peer_address.ip().is_loopback()); assert_eq!(stream.peer_addr().unwrap(), peer_address); From ac477b41bd106826b582f3e87ff1dd099013a57f Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sat, 31 Aug 2019 18:08:44 +0200 Subject: [PATCH 10/16] Loosen TcpStream shutdown reading side test What happens is different on each OS, for example on Linux we can still read but on macOS and FreeBSD we can't. --- tests/tcp_stream.rs | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index c337f0b79..95f0a54d7 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -227,9 +227,21 @@ fn shutdown_read() { stream.shutdown(Shutdown::Read).unwrap(); - let mut buf = [0; 20]; - let n = stream.read(&mut buf).unwrap(); - assert_eq!(n, 0); + // Shutting down the reading side is different on each platform. For example + // on Linux based systems we can still read. + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ))] + { + let mut buf = [0; 20]; + let n = stream.read(&mut buf).unwrap(); + assert_eq!(n, 0); + } drop(stream); thread_handle.join().expect("unable to join thread"); @@ -308,9 +320,21 @@ fn shutdown_both() { stream.shutdown(Shutdown::Both).unwrap(); - let mut buf = [0; 20]; - let n = stream.read(&mut buf).unwrap(); - assert_eq!(n, 0); + // Shutting down the reading side is different on each platform. For example + // on Linux based systems we can still read. + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ))] + { + let mut buf = [0; 20]; + let n = stream.read(&mut buf).unwrap(); + assert_eq!(n, 0); + } let err = stream.write(DATA2).unwrap_err(); assert_eq!(err.kind(), io::ErrorKind::BrokenPipe); From 9a49628bc998e9c70cb4789a819b6d9037ad8ce3 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sat, 31 Aug 2019 18:26:41 +0200 Subject: [PATCH 11/16] Ignore connection reset error in TcpStream test In a attempt to fix the TcpStream shutdown_read test. --- tests/tcp_stream.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 95f0a54d7..fe5b3b5ca 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -452,7 +452,16 @@ fn echo_listener(addr: SocketAddr, n_connections: usize) -> (thread::JoinHandle< let (mut stream, _) = listener.accept().unwrap(); loop { - let n = stream.read(&mut buf).expect("error reading"); + let n = stream + .read(&mut buf) + // On Linux based system it will cause a connection reset + // error when the reading side of the peer connection is + // shutdown, we don't consider it an actual here. + .or_else(|err| match err { + ref err if err.kind() == io::ErrorKind::ConnectionReset => Ok(0), + err => Err(err), + }) + .expect("error reading"); if n == 0 { break; } From e99caf3856d73614a0a0c90ff8d779b5d4bfc421 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sat, 31 Aug 2019 18:36:48 +0200 Subject: [PATCH 12/16] Disable TcpListener::try_clone tests on Windows They both fails, be it's unknown to me why. It needs further investigation for which I've opened #1074. --- tests/tcp_listener.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/tcp_listener.rs b/tests/tcp_listener.rs index 3de0ba9bd..471f4c7bb 100644 --- a/tests/tcp_listener.rs +++ b/tests/tcp_listener.rs @@ -186,6 +186,7 @@ fn deregister() { } #[test] +#[cfg_attr(windows, ignore = "fails on Windows, see #1073")] fn try_clone_same_poll() { let (mut poll, mut events) = init_with_poll(); @@ -248,6 +249,7 @@ fn try_clone_same_poll() { } #[test] +#[cfg_attr(windows, ignore = "fails on Windows, see #1073")] fn try_clone_different_poll() { let (mut poll1, mut events) = init_with_poll(); let mut poll2 = Poll::new().unwrap(); From d479097cd83569dbd8644c744eb8e187345d0e48 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Wed, 4 Sep 2019 10:16:43 +0200 Subject: [PATCH 13/16] Fix TcpStream's shutdown_both tests On Windows the error is ConnectionAborted rather then BrokenPipe. --- tests/tcp_stream.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index fe5b3b5ca..d109ff3e5 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -337,7 +337,10 @@ fn shutdown_both() { } let err = stream.write(DATA2).unwrap_err(); + #[cfg(unix)] assert_eq!(err.kind(), io::ErrorKind::BrokenPipe); + #[cfg(windows)] + assert_eq!(err.kind(), io::ErrorKind::ConnectionAborted); drop(stream); thread_handle.join().expect("unable to join thread"); From 8c7af5fe4268c68e62053552b0fe61ceaf1f22e4 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sat, 7 Sep 2019 13:15:46 +0200 Subject: [PATCH 14/16] Ignore TcpStream smoke tests on Windows Issue #1078 is opened to investigate why they fail on Windows only. --- tests/tcp_stream.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index d109ff3e5..75521f6c4 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -32,11 +32,13 @@ fn is_send_and_sync() { } #[test] +#[cfg_attr(windows, ignore = "fails on Windows, see #1078")] fn tcp_stream_ipv4() { smoke_test_tcp_stream(any_local_address()); } #[test] +#[cfg_attr(windows, ignore = "fails on Windows, see #1078")] fn tcp_stream_ipv6() { smoke_test_tcp_stream(any_local_ipv6_address()); } From 4f2913d384bc92d190007d107923b4f6041f860c Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sat, 7 Sep 2019 13:30:47 +0200 Subject: [PATCH 15/16] Ignore TcpStream nodelay test on Windows Issue #1079 is opened to investigate why it fails on Windows only. --- tests/tcp_stream.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 75521f6c4..f2c64e0ae 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -183,6 +183,7 @@ fn ttl() { } #[test] +#[cfg_attr(windows, ignore = "fails on Windows, see #1079")] fn nodelay() { init(); From 112a4250a2c8e873ac96cb7a996755706e0212ae Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sun, 8 Sep 2019 12:58:38 +0200 Subject: [PATCH 16/16] Ignore some UdpSocket tests on Windows Issues #1080 is opened to investigate why. --- tests/udp_socket.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/udp_socket.rs b/tests/udp_socket.rs index 64bfa458f..3cc08c95d 100644 --- a/tests/udp_socket.rs +++ b/tests/udp_socket.rs @@ -259,6 +259,7 @@ fn reconnect_udp_socket_sending() { } #[test] +#[cfg_attr(windows, ignore = "fails on Windows, see #1080")] fn reconnect_udp_socket_receiving() { let (mut poll, mut events) = init_with_poll(); @@ -341,6 +342,7 @@ fn reconnect_udp_socket_receiving() { } #[test] +#[cfg_attr(windows, ignore = "fails on Windows, see #1080")] fn unconnected_udp_socket_connected_methods() { let (mut poll, mut events) = init_with_poll(); @@ -422,12 +424,12 @@ fn connected_udp_socket_unconnected_methods() { ); // Can't use `send_to`. - // Linux (and Android) actually allow `send_to` even if the socket is - // connected. - #[cfg(not(any(target_os = "android", target_os = "linux")))] + // Linux (and Android) and Windows actually allow `send_to` even if the + // socket is connected. + #[cfg(not(any(target_os = "android", target_os = "linux", target_os = "windows")))] assert_error(socket1.send_to(DATA1, address2), "already connected"); // Even if the address is the same. - #[cfg(not(any(target_os = "android", target_os = "linux")))] + #[cfg(not(any(target_os = "android", target_os = "linux", target_os = "windows")))] assert_error(socket1.send_to(DATA1, address3), "already connected"); socket2.send_to(DATA2, address3).unwrap(); @@ -524,6 +526,7 @@ fn udp_socket_reregister() { } #[test] +#[cfg_attr(windows, ignore = "fails on Windows, see #1080")] fn udp_socket_deregister() { let (mut poll, mut events) = init_with_poll();