diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index ab2eea3c8..beb3e44fa 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -3,18 +3,19 @@ // Permission to use this code has been granted by original author: // https://github.com/tokio-rs/mio/pull/1602#issuecomment-1218441031 -use crate::sys::unix::selector::LOWEST_FD; -use crate::sys::unix::waker::WakerInternal; -use crate::{Interest, Token}; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; +use std::mem::swap; use std::os::unix::io::{AsRawFd, RawFd}; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; use std::{cmp, fmt, io}; +use crate::sys::unix::selector::LOWEST_FD; +use crate::sys::unix::waker::WakerInternal; +use crate::{Interest, Token}; + /// Unique id for use as `SelectorId`. #[cfg(debug_assertions)] static NEXT_ID: AtomicUsize = AtomicUsize::new(1); @@ -25,34 +26,36 @@ pub struct Selector { /// Whether this selector currently has an associated waker. #[cfg(debug_assertions)] has_waker: AtomicBool, + /// Buffer used in [`SelectorState::select`]. + fd_bufs0: Vec, + fd_bufs1: Vec, } impl Selector { pub fn new() -> io::Result { - let state = SelectorState::new()?; - Ok(Selector { - state: Arc::new(state), + state: Arc::new(SelectorState::new()?), #[cfg(debug_assertions)] has_waker: AtomicBool::new(false), + fd_bufs0: Vec::new(), + fd_bufs1: Vec::new(), }) } pub fn try_clone(&self) -> io::Result { - // Just to keep the compiler happy :) - let _ = LOWEST_FD; - - let state = self.state.clone(); - + let _ = LOWEST_FD; // Just to keep the compiler happy :) Ok(Selector { - state, + state: self.state.clone(), #[cfg(debug_assertions)] has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), + fd_bufs0: Vec::new(), + fd_bufs1: Vec::new(), }) } - pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { - self.state.select(events, timeout) + pub fn select(&mut self, events: &mut Events, timeout: Option) -> io::Result<()> { + self.state + .select(events, timeout, &mut self.fd_bufs0, &mut self.fd_bufs1) } pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { @@ -85,6 +88,7 @@ impl Selector { pub fn wake(&self, token: Token) -> io::Result<()> { self.state.wake(token) } + cfg_io_source! { #[cfg(debug_assertions)] pub fn id(&self) -> usize { @@ -98,33 +102,30 @@ impl Selector { struct SelectorState { /// File descriptors to poll. fds: Mutex, - /// File descriptors which will be removed before the next poll call. /// - /// When a file descriptor is deregistered while a poll is running, we need to filter - /// out all removed descriptors after that poll is finished running. + /// When a file descriptor is deregistered while a poll is running, we need + /// to filter out all removed descriptors after that poll is finished + /// running. pending_removal: Mutex>, - /// Token associated with Waker that have recently asked to wake. This will - /// cause a synthetic behaviour where on any wakeup we add all pending tokens - /// to the list of emitted events. + /// cause a synthetic behaviour where on any wakeup we add all pending + /// tokens to the list of emitted events. pending_wake_token: Mutex>, - - /// Data is written to this to wake up the current instance of `wait`, which can occur when the - /// user notifies it (in which case `notified` would have been set) or when an operation needs - /// to occur (in which case `waiting_operations` would have been incremented). + /// Data is written to this to wake up the current instance of `wait`, which + /// can occur when the user notifies it (in which case `notified` would have + /// been set) or when an operation needs to occur (in which case + /// `waiting_operations` would have been incremented). notify_waker: WakerInternal, - - /// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the - /// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero - /// again. + /// The number of operations (`add`, `modify` or `delete`) that are + /// currently waiting on the mutex to become free. When this is nonzero, + /// `wait` must be suspended until it reaches zero again. waiting_operations: AtomicUsize, - /// The condition variable that gets notified when `waiting_operations` reaches zero or - /// `notified` becomes true. + /// The condition variable that gets notified when `waiting_operations` + /// reaches zero or `notified` becomes true. /// /// This is used with the `fds` mutex. operations_complete: Condvar, - /// This selectors id. #[cfg(debug_assertions)] #[allow(dead_code)] @@ -136,15 +137,16 @@ struct SelectorState { struct Fds { /// The list of `pollfds` taken by poll. /// - /// The first file descriptor is always present and is used to notify the poller. + /// The first file descriptor is always present and is used to notify the + /// poller. poll_fds: Vec, - /// The map of each file descriptor to data associated with it. This does not include the file - /// descriptors created by the internal notify waker. + /// The map of each file descriptor to data associated with it. This does + /// not include the file descriptors created by the internal notify waker. fd_data: HashMap, } -/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the -/// `extra_traits` feature of `libc`. +/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives +/// without adding the `extra_traits` feature of `libc`. #[repr(transparent)] #[derive(Clone)] struct PollFd(libc::pollfd); @@ -166,16 +168,15 @@ struct FdData { poll_fds_index: usize, /// The key of the `Event` associated with this file descriptor. token: Token, - /// Used to communicate with IoSourceState when we need to internally deregister - /// based on a closed fd. + /// Used to communicate with IoSourceState when we need to internally + /// deregister based on a closed fd. shared_record: Arc, } impl SelectorState { pub fn new() -> io::Result { let notify_waker = WakerInternal::new()?; - - Ok(Self { + Ok(SelectorState { fds: Mutex::new(Fds { poll_fds: vec![PollFd(libc::pollfd { fd: notify_waker.as_raw_fd(), @@ -194,17 +195,19 @@ impl SelectorState { }) } - pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + pub fn select( + &self, + events: &mut Events, + timeout: Option, + closed_raw_fds: &mut Vec, + pending_removal: &mut Vec, + ) -> io::Result<()> { events.clear(); + closed_raw_fds.clear(); + pending_removal.clear(); let mut fds = self.fds.lock().unwrap(); - // Keep track of fds that receive POLLHUP or POLLERR (i.e. won't receive further - // events) and internally deregister them before they are externally deregister'd. See - // IoSourceState below to track how the external deregister call will be handled - // when this state occurs. - let mut closed_raw_fds = Vec::new(); - loop { // Complete all current operations. loop { @@ -240,7 +243,7 @@ impl SelectorState { // We now check whether this poll was performed with descriptors which were pending // for removal and filter out any matching. let mut pending_removal_guard = self.pending_removal.lock().unwrap(); - let mut pending_removal = std::mem::replace(pending_removal_guard.as_mut(), Vec::new()); + swap(pending_removal_guard.as_mut(), pending_removal); drop(pending_removal_guard); // Store the events if there were any. @@ -258,7 +261,7 @@ impl SelectorState { } for fd_data in fds.fd_data.values_mut() { - let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index]; + let poll_fd = &mut fds.poll_fds[fd_data.poll_fds_index].0; if pending_removal.contains(&poll_fd.fd) { // Fd was removed while poll was running @@ -272,6 +275,12 @@ impl SelectorState { events: poll_fd.revents, }); + // Keep track of fds that receive POLLHUP or POLLERR + // (i.e. won't receive further events) and internally + // deregister them before they are externally + // deregistered. See IoSourceState below to track how + // the external deregister call will be handled when + // this state occurs. if poll_fd.revents & (libc::POLLHUP | libc::POLLERR) != 0 { pending_removal.push(poll_fd.fd); closed_raw_fds.push(poll_fd.fd); @@ -299,7 +308,6 @@ impl SelectorState { drop(fds); let _ = self.deregister_all(&closed_raw_fds); - Ok(()) } @@ -665,7 +673,7 @@ cfg_io_source! { pub fn do_io(&self, f: F, io: &T) -> io::Result where - F: FnOnce(&T) -> io::Result, + F: FnOnce(&T) -> io::Result, { let result = f(io);