diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs index b490e970e0..6f197192ec 100644 --- a/src/sys/windows/selector.rs +++ b/src/sys/windows/selector.rs @@ -104,6 +104,9 @@ pub struct SockState { poll_status: SockPollStatus, delete_pending: bool, + // last raw os error + error: Option, + pinned: PhantomPinned, } @@ -111,6 +114,9 @@ impl SockState { fn update(&mut self, self_arc: &Pin>>) -> io::Result<()> { assert!(!self.delete_pending); + // make sure to reset previous error before a new update + self.error = None; + if let SockPollStatus::Pending = self.poll_status { if (self.user_evts & afd::KNOWN_EVENTS & !self.pending_evts) == 0 { /* All the events the user is interested in are already being monitored by @@ -122,7 +128,11 @@ impl SockState { * events that the user is interested in. Therefore, cancel the pending * poll operation; when we receive it's completion package, a new poll * operation will be submitted with the correct event mask. */ - self.cancel()?; + if let Err(e) = self.cancel() { + self.error = e.raw_os_error(); + return Err(e); + } + return Ok(()); } } else if let SockPollStatus::Cancelled = self.poll_status { /* The poll operation has already been cancelled, we're still waiting for @@ -156,6 +166,7 @@ impl SockState { self.mark_delete(); return Ok(()); } else { + self.error = e.raw_os_error(); return Err(e); } } @@ -166,6 +177,7 @@ impl SockState { } else { unreachable!("Invalid poll status during update, {:#?}", self) } + Ok(()) } @@ -246,6 +258,10 @@ impl SockState { self.delete_pending = true; } } + + fn has_error(&self) -> bool { + self.error.is_some() + } } cfg_net! { @@ -262,6 +278,7 @@ cfg_net! { user_data: 0, poll_status: SockPollStatus::Idle, delete_pending: false, + error: None, pinned: PhantomPinned, }) } @@ -464,16 +481,18 @@ impl SelectorInner { unsafe fn update_sockets_events(&self) -> io::Result<()> { let mut update_queue = self.update_queue.lock().unwrap(); - loop { - let sock = match update_queue.pop_front() { - Some(sock) => sock, - None => break, - }; + for sock in update_queue.iter_mut() { let mut sock_internal = sock.lock().unwrap(); if !sock_internal.is_pending_deletion() { - sock_internal.update(&sock).unwrap(); + let _ = sock_internal.update(&sock); } } + + // remove all sock which do not have error, they have afd op pending + update_queue.retain(|sock| { + sock.lock().unwrap().has_error() + }); + self.afd_group.release_unused_afd(); Ok(()) } @@ -574,6 +593,8 @@ cfg_net! { sock.lock().unwrap().set_event(event); } unsafe { + // FIXME: a sock which has_error true should not be re-added to + // the update queue because it's already there. self.add_socket_to_update_queue(socket); self.update_sockets_events_if_polling()?; }