From 02c5a2a23d0751d31a597fe4c1b882a499a0e462 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Fri, 4 Aug 2023 13:52:53 +0200 Subject: [PATCH 1/3] Reuse alloction for closed_raw_fds in poll(2) Selector --- src/sys/unix/selector/poll.rs | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index ab2eea3c8..32739d32e 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -25,6 +25,8 @@ pub struct Selector { /// Whether this selector currently has an associated waker. #[cfg(debug_assertions)] has_waker: AtomicBool, + /// Buffer used in [`SelectorState::select`]. + fd_bufs: Vec, } impl Selector { @@ -35,6 +37,7 @@ impl Selector { state: Arc::new(state), #[cfg(debug_assertions)] has_waker: AtomicBool::new(false), + fd_bufs: Vec::new(), }) } @@ -48,11 +51,12 @@ impl Selector { state, #[cfg(debug_assertions)] has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), + fd_bufs: Vec::new(), }) } - pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { - self.state.select(events, timeout) + pub fn select(&mut self, events: &mut Events, timeout: Option) -> io::Result<()> { + self.state.select(events, timeout, &mut self.fd_bufs) } pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { @@ -194,17 +198,17 @@ impl SelectorState { }) } - pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + pub fn select( + &self, + events: &mut Events, + timeout: Option, + closed_raw_fds: &mut Vec, + ) -> io::Result<()> { events.clear(); + closed_raw_fds.clear(); let mut fds = self.fds.lock().unwrap(); - // Keep track of fds that receive POLLHUP or POLLERR (i.e. won't receive further - // events) and internally deregister them before they are externally deregister'd. See - // IoSourceState below to track how the external deregister call will be handled - // when this state occurs. - let mut closed_raw_fds = Vec::new(); - loop { // Complete all current operations. loop { @@ -272,6 +276,12 @@ impl SelectorState { events: poll_fd.revents, }); + // Keep track of fds that receive POLLHUP or POLLERR + // (i.e. won't receive further events) and internally + // deregister them before they are externally + // deregistered. See IoSourceState below to track how + // the external deregister call will be handled when + // this state occurs. if poll_fd.revents & (libc::POLLHUP | libc::POLLERR) != 0 { pending_removal.push(poll_fd.fd); closed_raw_fds.push(poll_fd.fd); @@ -299,7 +309,6 @@ impl SelectorState { drop(fds); let _ = self.deregister_all(&closed_raw_fds); - Ok(()) } From 7b3afc25dc7f30afb75a2938abca08d853dd3d1f Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Fri, 4 Aug 2023 14:02:06 +0200 Subject: [PATCH 2/3] Small formatting cleanups To be more consistent with the other code. --- src/sys/unix/selector/poll.rs | 76 ++++++++++++++++------------------- 1 file changed, 34 insertions(+), 42 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 32739d32e..d5fd7f4da 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -3,18 +3,18 @@ // Permission to use this code has been granted by original author: // https://github.com/tokio-rs/mio/pull/1602#issuecomment-1218441031 -use crate::sys::unix::selector::LOWEST_FD; -use crate::sys::unix::waker::WakerInternal; -use crate::{Interest, Token}; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::os::unix::io::{AsRawFd, RawFd}; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; use std::{cmp, fmt, io}; +use crate::sys::unix::selector::LOWEST_FD; +use crate::sys::unix::waker::WakerInternal; +use crate::{Interest, Token}; + /// Unique id for use as `SelectorId`. #[cfg(debug_assertions)] static NEXT_ID: AtomicUsize = AtomicUsize::new(1); @@ -31,10 +31,8 @@ pub struct Selector { impl Selector { pub fn new() -> io::Result { - let state = SelectorState::new()?; - Ok(Selector { - state: Arc::new(state), + state: Arc::new(SelectorState::new()?), #[cfg(debug_assertions)] has_waker: AtomicBool::new(false), fd_bufs: Vec::new(), @@ -42,13 +40,9 @@ impl Selector { } pub fn try_clone(&self) -> io::Result { - // Just to keep the compiler happy :) - let _ = LOWEST_FD; - - let state = self.state.clone(); - + let _ = LOWEST_FD; // Just to keep the compiler happy :) Ok(Selector { - state, + state: self.state.clone(), #[cfg(debug_assertions)] has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), fd_bufs: Vec::new(), @@ -89,6 +83,7 @@ impl Selector { pub fn wake(&self, token: Token) -> io::Result<()> { self.state.wake(token) } + cfg_io_source! { #[cfg(debug_assertions)] pub fn id(&self) -> usize { @@ -102,33 +97,30 @@ impl Selector { struct SelectorState { /// File descriptors to poll. fds: Mutex, - /// File descriptors which will be removed before the next poll call. /// - /// When a file descriptor is deregistered while a poll is running, we need to filter - /// out all removed descriptors after that poll is finished running. + /// When a file descriptor is deregistered while a poll is running, we need + /// to filter out all removed descriptors after that poll is finished + /// running. pending_removal: Mutex>, - /// Token associated with Waker that have recently asked to wake. This will - /// cause a synthetic behaviour where on any wakeup we add all pending tokens - /// to the list of emitted events. + /// cause a synthetic behaviour where on any wakeup we add all pending + /// tokens to the list of emitted events. pending_wake_token: Mutex>, - - /// Data is written to this to wake up the current instance of `wait`, which can occur when the - /// user notifies it (in which case `notified` would have been set) or when an operation needs - /// to occur (in which case `waiting_operations` would have been incremented). + /// Data is written to this to wake up the current instance of `wait`, which + /// can occur when the user notifies it (in which case `notified` would have + /// been set) or when an operation needs to occur (in which case + /// `waiting_operations` would have been incremented). notify_waker: WakerInternal, - - /// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the - /// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero - /// again. + /// The number of operations (`add`, `modify` or `delete`) that are + /// currently waiting on the mutex to become free. When this is nonzero, + /// `wait` must be suspended until it reaches zero again. waiting_operations: AtomicUsize, - /// The condition variable that gets notified when `waiting_operations` reaches zero or - /// `notified` becomes true. + /// The condition variable that gets notified when `waiting_operations` + /// reaches zero or `notified` becomes true. /// /// This is used with the `fds` mutex. operations_complete: Condvar, - /// This selectors id. #[cfg(debug_assertions)] #[allow(dead_code)] @@ -140,15 +132,16 @@ struct SelectorState { struct Fds { /// The list of `pollfds` taken by poll. /// - /// The first file descriptor is always present and is used to notify the poller. + /// The first file descriptor is always present and is used to notify the + /// poller. poll_fds: Vec, - /// The map of each file descriptor to data associated with it. This does not include the file - /// descriptors created by the internal notify waker. + /// The map of each file descriptor to data associated with it. This does + /// not include the file descriptors created by the internal notify waker. fd_data: HashMap, } -/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the -/// `extra_traits` feature of `libc`. +/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives +/// without adding the `extra_traits` feature of `libc`. #[repr(transparent)] #[derive(Clone)] struct PollFd(libc::pollfd); @@ -170,16 +163,15 @@ struct FdData { poll_fds_index: usize, /// The key of the `Event` associated with this file descriptor. token: Token, - /// Used to communicate with IoSourceState when we need to internally deregister - /// based on a closed fd. + /// Used to communicate with IoSourceState when we need to internally + /// deregister based on a closed fd. shared_record: Arc, } impl SelectorState { pub fn new() -> io::Result { let notify_waker = WakerInternal::new()?; - - Ok(Self { + Ok(SelectorState { fds: Mutex::new(Fds { poll_fds: vec![PollFd(libc::pollfd { fd: notify_waker.as_raw_fd(), @@ -262,7 +254,7 @@ impl SelectorState { } for fd_data in fds.fd_data.values_mut() { - let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index]; + let poll_fd = &mut fds.poll_fds[fd_data.poll_fds_index].0; if pending_removal.contains(&poll_fd.fd) { // Fd was removed while poll was running @@ -674,7 +666,7 @@ cfg_io_source! { pub fn do_io(&self, f: F, io: &T) -> io::Result where - F: FnOnce(&T) -> io::Result, + F: FnOnce(&T) -> io::Result, { let result = f(io); From ec27b1983b8cc3b280d42cc241470dfb47cf00b8 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Fri, 4 Aug 2023 14:19:10 +0200 Subject: [PATCH 3/3] Reuse allocation for pending_removal in poll(2) impl --- src/sys/unix/selector/poll.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index d5fd7f4da..beb3e44fa 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Formatter}; +use std::mem::swap; use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; @@ -26,7 +27,8 @@ pub struct Selector { #[cfg(debug_assertions)] has_waker: AtomicBool, /// Buffer used in [`SelectorState::select`]. - fd_bufs: Vec, + fd_bufs0: Vec, + fd_bufs1: Vec, } impl Selector { @@ -35,7 +37,8 @@ impl Selector { state: Arc::new(SelectorState::new()?), #[cfg(debug_assertions)] has_waker: AtomicBool::new(false), - fd_bufs: Vec::new(), + fd_bufs0: Vec::new(), + fd_bufs1: Vec::new(), }) } @@ -45,12 +48,14 @@ impl Selector { state: self.state.clone(), #[cfg(debug_assertions)] has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), - fd_bufs: Vec::new(), + fd_bufs0: Vec::new(), + fd_bufs1: Vec::new(), }) } pub fn select(&mut self, events: &mut Events, timeout: Option) -> io::Result<()> { - self.state.select(events, timeout, &mut self.fd_bufs) + self.state + .select(events, timeout, &mut self.fd_bufs0, &mut self.fd_bufs1) } pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { @@ -195,9 +200,11 @@ impl SelectorState { events: &mut Events, timeout: Option, closed_raw_fds: &mut Vec, + pending_removal: &mut Vec, ) -> io::Result<()> { events.clear(); closed_raw_fds.clear(); + pending_removal.clear(); let mut fds = self.fds.lock().unwrap(); @@ -236,7 +243,7 @@ impl SelectorState { // We now check whether this poll was performed with descriptors which were pending // for removal and filter out any matching. let mut pending_removal_guard = self.pending_removal.lock().unwrap(); - let mut pending_removal = std::mem::replace(pending_removal_guard.as_mut(), Vec::new()); + swap(pending_removal_guard.as_mut(), pending_removal); drop(pending_removal_guard); // Store the events if there were any.