From 78979c310386710f0be698737f4bae2f6bf8711f Mon Sep 17 00:00:00 2001 From: John Nunley Date: Tue, 8 Aug 2023 07:55:59 -0700 Subject: [PATCH 1/8] Add better documentation for the IOCP module Signed-off-by: John Nunley --- src/iocp/mod.rs | 61 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 54 insertions(+), 7 deletions(-) diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index 8d725a7..54a99ea 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -74,27 +74,48 @@ pub(super) struct Poller { port: Arc>, /// List of currently active AFD instances. + /// + /// AFD acts as the actual source of the socket events. It's essentially running `WSAPoll` on + /// the sockets and then posting the events to the IOCP. + /// + /// AFD instances can be keyed to an unlimited number of sockets. However, each AFD instance + /// polls their sockets linearly. Therefore, it is best to limit the number of sockets each AFD + /// instance is responsible for. The limit of 32 is chosen because that's what `wepoll` uses. /// - /// Weak references are kept here so that the AFD handle is automatically dropped - /// when the last associated socket is dropped. + /// Weak references are kept here so that the AFD handle is automatically dropped when the last + /// associated socket is dropped. afd: Mutex>>>, /// The state of the sources registered with this poller. + /// + /// Each source is keyed by its raw socket ID. sources: RwLock>, /// The state of the waitable handles registered with this poller. waitables: RwLock>, /// Sockets with pending updates. + /// + /// This list contains packets with sockets that need to have their AFD state adjusted by + /// calling the `update()` function on them. It's best to queue up packets as they need to + /// be updated and then run all of the updates before we start waiting on the IOCP, rather than + /// updating them as we come. If we're waiting on the IOCP updates should be run immediately. pending_updates: ConcurrentQueue, /// Are we currently polling? + /// + /// This indicates whether or not we are blocking on the IOCP, and is used to determine + /// whether pending updates should be run immediately or queued. polling: AtomicBool, /// A list of completion packets. + /// + /// The IOCP writes to this list when it has new events. packets: Mutex>>, /// The packet used to notify the poller. + /// + /// This is a special-case packet that is used to wake up the poller when it is waiting. notifier: Packet, } @@ -119,7 +140,6 @@ impl Poller { )))?; let port = IoCompletionPort::new(0)?; - tracing::trace!(handle = ?port, "new"); Ok(Poller { @@ -178,6 +198,7 @@ impl Poller { // Create a new packet. let socket_state = { + // Create a new socket state and assign an AFD handle to it. let state = SocketState { socket, base_socket: base_socket(socket)?, @@ -189,6 +210,7 @@ impl Poller { status: SocketStatus::Idle, }; + // We wrap this socket state in a Packet so the IOCP can use it. Arc::pin(IoStatusBlock::from(PacketInner::Socket { packet: UnsafeCell::new(AfdPollInfo::default()), socket: Mutex::new(state), @@ -249,6 +271,7 @@ impl Poller { // Set the new event. if source.as_ref().set_events(interest, mode) { + // The packet needs to be updated. self.update_packet(source)?; } @@ -264,7 +287,7 @@ impl Poller { ); let _enter = span.enter(); - // Get a reference to the source. + // Remove the source from our associative map. let source = { let mut sources = lock!(self.sources.write()); @@ -409,6 +432,7 @@ impl Poller { ); let _enter = span.enter(); + // Make sure we have a consistent timeout. let deadline = timeout.and_then(|timeout| Instant::now().checked_add(timeout)); let mut packets = lock!(self.packets.lock()); let mut notified = false; @@ -421,6 +445,7 @@ impl Poller { let was_polling = self.polling.swap(true, Ordering::SeqCst); debug_assert!(!was_polling); + // Even if we panic, we want to make sure we indicate that polling has stopped. let guard = CallOnDrop(|| { let was_polling = self.polling.swap(false, Ordering::SeqCst); debug_assert!(was_polling); @@ -500,18 +525,22 @@ impl Poller { // If we failed to queue the update, we need to drain the queue first. self.drain_update_queue(true)?; + + // Loop back and try again. } } /// Drain the update queue. fn drain_update_queue(&self, limit: bool) -> io::Result<()> { + // Determine how many packets to process. let max = if limit { + // Only drain the queue's capacity, since this could in theory run forever. self.pending_updates.capacity().unwrap() } else { + // Less of a concern if we're draining the queue prior to a poll operation. std::usize::MAX }; - // Only drain the queue's capacity, since this could in theory run forever. self.pending_updates .try_iter() .take(max) @@ -519,11 +548,14 @@ impl Poller { } /// Get a handle to the AFD reference. + /// + /// This finds an AFD handle with less than 32 associated sockets, or creates a new one if + /// one does not exist. fn afd_handle(&self) -> io::Result>> { const AFD_MAX_SIZE: usize = 32; // Crawl the list and see if there are any existing AFD instances that we can use. - // Remove any unused AFD pointers. + // While we're here, remove any unused AFD pointers. let mut afd_handles = lock!(self.afd.lock()); let mut i = 0; while i < afd_handles.len() { @@ -561,7 +593,7 @@ impl Poller { // Register the AFD instance with the I/O completion port. self.port.register(&*afd, true)?; - // Insert a weak pointer to the AFD instance into the list. + // Insert a weak pointer to the AFD instance into the list for other sockets. afd_handles.push(Arc::downgrade(&afd)); Ok(afd) @@ -624,6 +656,8 @@ impl CompletionPacket { } /// The type of our completion packet. +/// +/// It needs to be pinned, since it contains data that is expected by IOCP not to be moved. type Packet = Pin>; type PacketUnwrapped = IoStatusBlock; @@ -720,6 +754,13 @@ impl PacketUnwrapped { } /// Update the socket and install the new status in AFD. + /// + /// This function does one of the following: + /// + /// - Nothing, if the packet is waiting on being dropped anyways. + /// - Cancels the ongoing poll, if we want to poll for different events than we are currently + /// polling for. + /// - Starts a new AFD_POLL operation, if we are not currently polling. fn update(self: Pin>) -> io::Result<()> { let mut socket = match self.as_ref().data().project_ref() { PacketInnerProj::Socket { socket, .. } => lock!(socket.lock()), @@ -841,6 +882,9 @@ impl PacketUnwrapped { } /// This socket state was notified; see if we need to update it. + /// + /// This indicates that this packet was indicated as "ready" by the IOCP and needs to be + /// processed. fn feed_event(self: Pin>, poller: &Poller) -> io::Result { let inner = self.as_ref().data().project_ref(); @@ -902,6 +946,7 @@ impl PacketUnwrapped { // Check in on the AFD data. let afd_data = &*afd_info.get(); + // There was at least one event. if afd_data.handle_count() >= 1 { let events = afd_data.events(); @@ -1161,6 +1206,7 @@ impl WaitHandle { } } +/// Translate an event to the mask expected by AFD. fn event_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPollMask { use afd::AfdPollMask as AfdPoll; @@ -1182,6 +1228,7 @@ fn event_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPol mask } +/// Convert the mask reported by AFD to an event. fn afd_mask_to_event(mask: afd::AfdPollMask) -> (bool, bool) { use afd::AfdPollMask as AfdPoll; From cb9d851f7790dab5eb55de587c00ed96a7f598a6 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Tue, 8 Aug 2023 12:26:31 -0700 Subject: [PATCH 2/8] Extract Events from Poller This prevents the need to have an intermediate buffer to read events from, reducing the need for an allocation and a copy. This is a breaking change. Signed-off-by: John Nunley --- examples/two-listeners.rs | 6 +- examples/wait-signal.rs | 8 +- src/epoll.rs | 14 ++- src/iocp/mod.rs | 33 +++--- src/kqueue.rs | 14 ++- src/lib.rs | 171 +++++++++++++++++++++++++++++-- src/poll.rs | 14 ++- src/port.rs | 14 ++- tests/concurrent_modification.rs | 10 +- tests/io.rs | 6 +- tests/many_connections.rs | 13 ++- tests/notify.rs | 5 +- tests/other_modes.rs | 56 +++++++--- tests/precision.rs | 6 +- tests/timeout.rs | 6 +- tests/windows_post.rs | 10 +- 16 files changed, 307 insertions(+), 79 deletions(-) diff --git a/examples/two-listeners.rs b/examples/two-listeners.rs index 02b2339..bf54eee 100644 --- a/examples/two-listeners.rs +++ b/examples/two-listeners.rs @@ -1,7 +1,7 @@ use std::io; use std::net::TcpListener; -use polling::{Event, Poller}; +use polling::{Event, Events, Poller}; fn main() -> io::Result<()> { let l1 = TcpListener::bind("127.0.0.1:8001")?; @@ -19,12 +19,12 @@ fn main() -> io::Result<()> { println!(" $ nc 127.0.0.1 8001"); println!(" $ nc 127.0.0.1 8002"); - let mut events = Vec::new(); + let mut events = Events::new(); loop { events.clear(); poller.wait(&mut events, None)?; - for ev in &events { + for ev in events.iter() { match ev.key { 1 => { println!("Accept on l1"); diff --git a/examples/wait-signal.rs b/examples/wait-signal.rs index 02a4c5d..4a645af 100644 --- a/examples/wait-signal.rs +++ b/examples/wait-signal.rs @@ -13,7 +13,7 @@ ))] mod example { use polling::os::kqueue::{PollerKqueueExt, Signal}; - use polling::{PollMode, Poller}; + use polling::{Events, PollMode, Poller}; pub(super) fn main2() { // Create a poller. @@ -23,7 +23,7 @@ mod example { let sigint = Signal(libc::SIGINT); poller.add_filter(sigint, 1, PollMode::Oneshot).unwrap(); - let mut events = vec![]; + let mut events = Events::new(); println!("Press Ctrl+C to exit..."); @@ -32,7 +32,7 @@ mod example { poller.wait(&mut events, None).unwrap(); // Process events. - for ev in events.drain(..) { + for ev in events.iter() { match ev.key { 1 => { println!("SIGINT received"); @@ -41,6 +41,8 @@ mod example { _ => unreachable!(), } } + + events.clear(); } } } diff --git a/src/epoll.rs b/src/epoll.rs index a878cfb..6337b2d 100644 --- a/src/epoll.rs +++ b/src/epoll.rs @@ -308,9 +308,9 @@ unsafe impl Send for Events {} impl Events { /// Creates an empty list. - pub fn new() -> Events { + pub fn with_capacity(cap: usize) -> Events { Events { - list: epoll::EventVec::with_capacity(1024), + list: epoll::EventVec::with_capacity(cap), } } @@ -325,4 +325,14 @@ impl Events { } }) } + + /// Clear the list. + pub fn clear(&mut self) { + self.list.clear(); + } + + /// Get the capacity of the list. + pub fn capacity(&self) -> usize { + self.list.capacity() + } } diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index 54a99ea..04423e1 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -74,10 +74,10 @@ pub(super) struct Poller { port: Arc>, /// List of currently active AFD instances. - /// + /// /// AFD acts as the actual source of the socket events. It's essentially running `WSAPoll` on /// the sockets and then posting the events to the IOCP. - /// + /// /// AFD instances can be keyed to an unlimited number of sockets. However, each AFD instance /// polls their sockets linearly. Therefore, it is best to limit the number of sockets each AFD /// instance is responsible for. The limit of 32 is chosen because that's what `wepoll` uses. @@ -87,7 +87,7 @@ pub(super) struct Poller { afd: Mutex>>>, /// The state of the sources registered with this poller. - /// + /// /// Each source is keyed by its raw socket ID. sources: RwLock>, @@ -95,7 +95,7 @@ pub(super) struct Poller { waitables: RwLock>, /// Sockets with pending updates. - /// + /// /// This list contains packets with sockets that need to have their AFD state adjusted by /// calling the `update()` function on them. It's best to queue up packets as they need to /// be updated and then run all of the updates before we start waiting on the IOCP, rather than @@ -103,18 +103,18 @@ pub(super) struct Poller { pending_updates: ConcurrentQueue, /// Are we currently polling? - /// + /// /// This indicates whether or not we are blocking on the IOCP, and is used to determine /// whether pending updates should be run immediately or queued. polling: AtomicBool, /// A list of completion packets. - /// + /// /// The IOCP writes to this list when it has new events. packets: Mutex>>, /// The packet used to notify the poller. - /// + /// /// This is a special-case packet that is used to wake up the poller when it is waiting. notifier: Packet, } @@ -548,7 +548,7 @@ impl Poller { } /// Get a handle to the AFD reference. - /// + /// /// This finds an AFD handle with less than 32 associated sockets, or creates a new one if /// one does not exist. fn afd_handle(&self) -> io::Result>> { @@ -622,9 +622,9 @@ unsafe impl Send for Events {} impl Events { /// Creates an empty list of events. - pub(super) fn new() -> Events { + pub(super) fn with_capacity(cap: usize) -> Events { Events { - packets: Vec::with_capacity(1024), + packets: Vec::with_capacity(cap), } } @@ -632,6 +632,11 @@ impl Events { pub(super) fn iter(&self) -> impl Iterator + '_ { self.packets.iter().copied() } + + /// Clear the list of events. + pub(super) fn clear(&mut self) { + self.packets.clear(); + } } /// A packet used to wake up the poller with an event. @@ -656,7 +661,7 @@ impl CompletionPacket { } /// The type of our completion packet. -/// +/// /// It needs to be pinned, since it contains data that is expected by IOCP not to be moved. type Packet = Pin>; type PacketUnwrapped = IoStatusBlock; @@ -754,9 +759,9 @@ impl PacketUnwrapped { } /// Update the socket and install the new status in AFD. - /// + /// /// This function does one of the following: - /// + /// /// - Nothing, if the packet is waiting on being dropped anyways. /// - Cancels the ongoing poll, if we want to poll for different events than we are currently /// polling for. @@ -882,7 +887,7 @@ impl PacketUnwrapped { } /// This socket state was notified; see if we need to update it. - /// + /// /// This indicates that this packet was indicated as "ready" by the IOCP and needs to be /// processed. fn feed_event(self: Pin>, poller: &Poller) -> io::Result { diff --git a/src/kqueue.rs b/src/kqueue.rs index 62b7ea3..e244188 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -222,9 +222,9 @@ unsafe impl Send for Events {} impl Events { /// Creates an empty list. - pub fn new() -> Events { + pub fn with_capacity(cap: usize) -> Events { Events { - list: Vec::with_capacity(1024), + list: Vec::with_capacity(cap), } } @@ -249,6 +249,16 @@ impl Events { && (ev.flags().intersects(kqueue::EventFlags::EOF))), }) } + + /// Clears the list. + pub fn clear(&mut self) { + self.list.clear(); + } + + /// Get the capacity of the list. + pub fn capacity(&self) -> usize { + self.list.capacity() + } } pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags { diff --git a/src/lib.rs b/src/lib.rs index 188ab87..f2a792b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,6 +67,7 @@ use std::fmt; use std::io; +use std::num::NonZeroUsize; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; use std::time::Duration; @@ -226,7 +227,7 @@ impl Event { /// Waits for I/O events. pub struct Poller { poller: sys::Poller, - events: Mutex, + lock: Mutex<()>, notified: AtomicBool, } @@ -244,7 +245,7 @@ impl Poller { pub fn new() -> io::Result { Ok(Poller { poller: sys::Poller::new()?, - events: Mutex::new(sys::Events::new()), + lock: Mutex::new(()), notified: AtomicBool::new(false), }) } @@ -510,21 +511,19 @@ impl Poller { /// poller.delete(&socket)?; /// # std::io::Result::Ok(()) /// ``` - pub fn wait(&self, events: &mut Vec, timeout: Option) -> io::Result { + pub fn wait(&self, events: &mut Events, timeout: Option) -> io::Result { let span = tracing::trace_span!("Poller::wait", ?timeout); let _enter = span.enter(); - if let Ok(mut lock) = self.events.try_lock() { + if let Ok(_lock) = self.lock.try_lock() { // Wait for I/O events. - self.poller.wait(&mut lock, timeout)?; + self.poller.wait(&mut events.events, timeout)?; // Clear the notification, if any. self.notified.swap(false, Ordering::SeqCst); - // Collect events. - let len = events.len(); - events.extend(lock.iter().filter(|ev| ev.key != usize::MAX)); - Ok(events.len() - len) + // Indicate number of events. + Ok(events.len()) } else { tracing::trace!("wait: skipping because another thread is already waiting on I/O"); Ok(0) @@ -568,6 +567,160 @@ impl Poller { } } +/// A container for I/O events. +pub struct Events { + events: sys::Events, +} + +impl Default for Events { + #[inline] + fn default() -> Self { + Self::new() + } +} + +impl Events { + /// Create a new container for events, using the default capacity. + /// + /// The default capacity is 1024. + /// + /// # Examples + /// + /// ``` + /// use polling::Events; + /// + /// let events = Events::new(); + /// ``` + #[inline] + pub fn new() -> Self { + // ESP-IDF has a low amount of RAM, so we use a smaller default capacity. + #[cfg(target_os = "espidf")] + const DEFAULT_CAPACITY: usize = 32; + + #[cfg(not(target_os = "espidf"))] + const DEFAULT_CAPACITY: usize = 1024; + + Self::with_capacity(NonZeroUsize::new(DEFAULT_CAPACITY).unwrap()) + } + + /// Create a new container with the provided capacity. + /// + /// # Examples + /// + /// ``` + /// use polling::Events; + /// use std::num::NonZeroUsize; + /// + /// let capacity = NonZeroUsize::new(1024).unwrap(); + /// let events = Events::with_capacity(capacity); + /// ``` + #[inline] + pub fn with_capacity(capacity: NonZeroUsize) -> Self { + Self { + events: sys::Events::with_capacity(capacity.get()), + } + } + + /// Create a new iterator over I/O events. + /// + /// This returns all of the events in the container, excluding the notification event. + /// + /// # Examples + /// + /// ``` + /// use polling::{Event, Events, Poller}; + /// use std::time::Duration; + /// + /// # fn main() -> std::io::Result<()> { + /// let poller = Poller::new()?; + /// let mut events = Events::new(); + /// + /// poller.wait(&mut events, Some(Duration::from_secs(0)))?; + /// assert!(events.iter().next().is_none()); + /// # Ok(()) } + /// ``` + #[inline] + pub fn iter(&self) -> impl Iterator + '_ { + self.events.iter().filter(|ev| ev.key != NOTIFY_KEY) + } + + /// Delete all of the events in the container. + /// + /// # Examples + /// + /// ```no_run + /// use polling::{Event, Events, Poller}; + /// + /// # fn main() -> std::io::Result<()> { + /// let poller = Poller::new()?; + /// let mut events = Events::new(); + /// + /// /* register some sources */ + /// + /// poller.wait(&mut events, None)?; + /// + /// events.clear(); + /// # Ok(()) } + /// ``` + #[inline] + pub fn clear(&mut self) { + self.events.clear(); + } + + /// Returns the number of events in the container. + /// + /// # Examples + /// + /// ``` + /// use polling::Events; + /// + /// let events = Events::new(); + /// assert_eq!(events.len(), 0); + /// ``` + #[inline] + pub fn len(&self) -> usize { + self.iter().count() + } + + /// Returns `true` if the container contains no events. + /// + /// # Examples + /// + /// ``` + /// use polling::Events; + /// + /// let events = Events::new(); + /// assert!(events.is_empty()); + /// ``` + #[inline] + pub fn is_empty(&self) -> bool { + self.len() != 0 + } + + /// Get the total capacity of the list. + /// + /// # Examples + /// + /// ``` + /// use polling::Events; + /// use std::num::NonZeroUsize; + /// + /// let cap = NonZeroUsize::new(10).unwrap(); + /// let events = Events::with_capacity(std::num::NonZeroUsize::new(10).unwrap()); + /// assert_eq!(events.capacity(), cap); + /// ``` + #[inline] + pub fn capacity(&self) -> NonZeroUsize { + NonZeroUsize::new(self.events.capacity()).unwrap() + } +} + +impl fmt::Debug for Events { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Events { .. }") + } +} + #[cfg(all( any( target_os = "linux", diff --git a/src/poll.rs b/src/poll.rs index b6e31ee..ee61379 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -364,14 +364,24 @@ pub struct Events { impl Events { /// Creates an empty list. - pub fn new() -> Events { - Self { inner: Vec::new() } + pub fn with_capacity(cap: usize) -> Events { + Self { inner: Vec::with_capacity(cap) } } /// Iterates over I/O events. pub fn iter(&self) -> impl Iterator + '_ { self.inner.iter().copied() } + + /// Clear the list. + pub fn clear(&mut self) { + self.inner.clear(); + } + + /// Get the capacity of the list. + pub fn capacity(&self) -> usize { + self.inner.capacity() + } } fn cvt_mode_as_remove(mode: PollMode) -> io::Result { diff --git a/src/port.rs b/src/port.rs index bd55b15..7b65ff6 100644 --- a/src/port.rs +++ b/src/port.rs @@ -181,9 +181,9 @@ unsafe impl Send for Events {} impl Events { /// Creates an empty list. - pub fn new() -> Events { + pub fn with_capacity(cap: usize) -> Events { Events { - list: Vec::with_capacity(1024), + list: Vec::with_capacity(cap), } } @@ -195,4 +195,14 @@ impl Events { writable: PollFlags::from_bits_truncate(ev.events() as _).intersects(write_flags()), }) } + + /// Clear the list. + pub fn clear(&mut self) { + self.list.clear(); + } + + /// Get the capacity of the list. + pub fn capacity(&self) -> usize { + self.list.capacity() + } } diff --git a/tests/concurrent_modification.rs b/tests/concurrent_modification.rs index 8cf6691..ff00ddb 100644 --- a/tests/concurrent_modification.rs +++ b/tests/concurrent_modification.rs @@ -4,14 +4,14 @@ use std::thread; use std::time::Duration; use easy_parallel::Parallel; -use polling::{Event, Poller}; +use polling::{Event, Events, Poller}; #[test] fn concurrent_add() -> io::Result<()> { let (reader, mut writer) = tcp_pair()?; let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); let result = Parallel::new() .add(|| { @@ -33,7 +33,7 @@ fn concurrent_add() -> io::Result<()> { poller.delete(&reader)?; result?; - assert_eq!(events, [Event::readable(0)]); + assert_eq!(events.iter().collect::>(), [Event::readable(0)]); Ok(()) } @@ -46,7 +46,7 @@ fn concurrent_modify() -> io::Result<()> { poller.add(&reader, Event::none(0))?; } - let mut events = Vec::new(); + let mut events = Events::new(); Parallel::new() .add(|| { @@ -63,7 +63,7 @@ fn concurrent_modify() -> io::Result<()> { .into_iter() .collect::>()?; - assert_eq!(events, [Event::readable(0)]); + assert_eq!(events.iter().collect::>(), [Event::readable(0)]); Ok(()) } diff --git a/tests/io.rs b/tests/io.rs index 10b3d48..965e477 100644 --- a/tests/io.rs +++ b/tests/io.rs @@ -1,4 +1,4 @@ -use polling::{Event, Poller}; +use polling::{Event, Events, Poller}; use std::io::{self, Write}; use std::net::{TcpListener, TcpStream}; use std::time::Duration; @@ -12,7 +12,7 @@ fn basic_io() { } // Nothing should be available at first. - let mut events = vec![]; + let mut events = Events::new(); assert_eq!( poller .wait(&mut events, Some(Duration::from_secs(0))) @@ -29,7 +29,7 @@ fn basic_io() { .unwrap(), 1 ); - assert_eq!(&*events, &[Event::readable(1)]); + assert_eq!(events.iter().collect::>(), &[Event::readable(1)]); poller.delete(&read).unwrap(); } diff --git a/tests/many_connections.rs b/tests/many_connections.rs index 41d640b..2a8807b 100644 --- a/tests/many_connections.rs +++ b/tests/many_connections.rs @@ -7,6 +7,8 @@ use std::io::{self, prelude::*}; use std::net::{TcpListener, TcpStream}; use std::time::Duration; +use polling::Events; + #[test] fn many_connections() { // Create 100 connections. @@ -25,7 +27,7 @@ fn many_connections() { } } - let mut events = vec![]; + let mut events = Events::new(); while !connections.is_empty() { // Choose a random connection to write to. let i = fastrand::usize(..connections.len()); @@ -40,10 +42,11 @@ fn many_connections() { .unwrap(); // Check that the connection is readable. - assert_eq!(events.len(), 1, "events: {:?}", events); - assert_eq!(events[0].key, id); - assert!(events[0].readable); - assert!(!events[0].writable); + let current_events = events.iter().collect::>(); + assert_eq!(current_events.len(), 1, "events: {:?}", current_events); + assert_eq!(current_events[0].key, id); + assert!(current_events[0].readable); + assert!(!current_events[0].writable); // Read the byte from the connection. let mut buf = [0]; diff --git a/tests/notify.rs b/tests/notify.rs index a5ca481..447fad1 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -3,12 +3,13 @@ use std::thread; use std::time::Duration; use easy_parallel::Parallel; +use polling::Events; use polling::Poller; #[test] fn simple() -> io::Result<()> { let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); for _ in 0..10 { poller.notify()?; @@ -21,7 +22,7 @@ fn simple() -> io::Result<()> { #[test] fn concurrent() -> io::Result<()> { let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); for _ in 0..2 { Parallel::new() diff --git a/tests/other_modes.rs b/tests/other_modes.rs index 12ef1bc..863ed36 100644 --- a/tests/other_modes.rs +++ b/tests/other_modes.rs @@ -6,7 +6,7 @@ use std::io::{self, prelude::*}; use std::net::{TcpListener, TcpStream}; use std::time::Duration; -use polling::{Event, PollMode, Poller}; +use polling::{Event, Events, PollMode, Poller}; #[test] fn level_triggered() { @@ -34,12 +34,15 @@ fn level_triggered() { writer.write_all(&data).unwrap(); // A "readable" notification should be delivered. - let mut events = Vec::new(); + let mut events = Events::new(); poller .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!( + events.iter().collect::>(), + [Event::readable(reader_token)] + ); // If we read some of the data, the notification should still be available. reader.read_exact(&mut [0; 3]).unwrap(); @@ -47,7 +50,10 @@ fn level_triggered() { poller .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!( + events.iter().collect::>(), + [Event::readable(reader_token)] + ); // If we read the rest of the data, the notification should be gone. reader.read_exact(&mut [0; 2]).unwrap(); @@ -56,7 +62,7 @@ fn level_triggered() { .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events, []); + assert_eq!(events.iter().collect::>(), []); // After modifying the stream and sending more data, it should be oneshot. poller @@ -71,7 +77,10 @@ fn level_triggered() { .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!( + events.iter().collect::>(), + [Event::readable(reader_token)] + ); // After reading, the notification should vanish. reader.read(&mut [0; 5]).unwrap(); @@ -80,7 +89,7 @@ fn level_triggered() { .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events, []); + assert_eq!(events.iter().collect::>(), []); } #[test] @@ -123,12 +132,15 @@ fn edge_triggered() { writer.write_all(&data).unwrap(); // A "readable" notification should be delivered. - let mut events = Vec::new(); + let mut events = Events::new(); poller .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!( + events.iter().collect::>(), + [Event::readable(reader_token)] + ); // If we read some of the data, the notification should not still be available. reader.read_exact(&mut [0; 3]).unwrap(); @@ -136,7 +148,7 @@ fn edge_triggered() { poller .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events, []); + assert_eq!(events.iter().collect::>(), []); // If we write more data, a notification should be delivered. writer.write_all(&data).unwrap(); @@ -144,7 +156,10 @@ fn edge_triggered() { poller .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!( + events.iter().collect::>(), + [Event::readable(reader_token)] + ); // After modifying the stream and sending more data, it should be oneshot. poller @@ -157,7 +172,10 @@ fn edge_triggered() { .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!( + events.iter().collect::>(), + [Event::readable(reader_token)] + ); } #[test] @@ -206,12 +224,15 @@ fn edge_oneshot_triggered() { writer.write_all(&data).unwrap(); // A "readable" notification should be delivered. - let mut events = Vec::new(); + let mut events = Events::new(); poller .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!( + events.iter().collect::>(), + [Event::readable(reader_token)] + ); // If we read some of the data, the notification should not still be available. reader.read_exact(&mut [0; 3]).unwrap(); @@ -219,7 +240,7 @@ fn edge_oneshot_triggered() { poller .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events, []); + assert!(events.is_empty()); // If we modify to re-enable the notification, it should be delivered. poller @@ -233,7 +254,10 @@ fn edge_oneshot_triggered() { poller .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events, [Event::readable(reader_token)]); + assert_eq!( + events.iter().collect::>(), + [Event::readable(reader_token)] + ); } fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> { diff --git a/tests/precision.rs b/tests/precision.rs index de5d605..1776fa8 100644 --- a/tests/precision.rs +++ b/tests/precision.rs @@ -1,12 +1,12 @@ use std::io; use std::time::{Duration, Instant}; -use polling::Poller; +use polling::{Events, Poller}; #[test] fn below_ms() -> io::Result<()> { let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); let dur = Duration::from_micros(100); let margin = Duration::from_micros(500); @@ -42,7 +42,7 @@ fn below_ms() -> io::Result<()> { #[test] fn above_ms() -> io::Result<()> { let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); let dur = Duration::from_micros(3_100); let margin = Duration::from_micros(500); diff --git a/tests/timeout.rs b/tests/timeout.rs index abf902c..62763a1 100644 --- a/tests/timeout.rs +++ b/tests/timeout.rs @@ -1,12 +1,12 @@ use std::io; use std::time::{Duration, Instant}; -use polling::Poller; +use polling::{Events, Poller}; #[test] fn twice() -> io::Result<()> { let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); for _ in 0..2 { let start = Instant::now(); @@ -22,7 +22,7 @@ fn twice() -> io::Result<()> { #[test] fn non_blocking() -> io::Result<()> { let poller = Poller::new()?; - let mut events = Vec::new(); + let mut events = Events::new(); for _ in 0..100 { poller.wait(&mut events, Some(Duration::from_secs(0)))?; diff --git a/tests/windows_post.rs b/tests/windows_post.rs index 488fab3..425b09c 100644 --- a/tests/windows_post.rs +++ b/tests/windows_post.rs @@ -3,7 +3,7 @@ #![cfg(windows)] use polling::os::iocp::{CompletionPacket, PollerIocpExt}; -use polling::{Event, Poller}; +use polling::{Event, Events, Poller}; use std::sync::Arc; use std::thread; @@ -12,7 +12,7 @@ use std::time::Duration; #[test] fn post_smoke() { let poller = Poller::new().unwrap(); - let mut events = Vec::new(); + let mut events = Events::new(); poller .post(CompletionPacket::new(Event::readable(1))) @@ -20,13 +20,13 @@ fn post_smoke() { poller.wait(&mut events, None).unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events[0], Event::readable(1)); + assert_eq!(events.iter().next().unwrap(), Event::readable(1)); } #[test] fn post_multithread() { let poller = Arc::new(Poller::new().unwrap()); - let mut events = Vec::new(); + let mut events = Events::new(); thread::spawn({ let poller = Arc::clone(&poller); @@ -47,7 +47,7 @@ fn post_multithread() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events.pop(), Some(Event::writable(i))); + assert_eq!(events.iter().next(), Some(Event::writable(i))); } poller From ce31d70d5bb5764e84a6756fa1da91dde22bfdd0 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Tue, 8 Aug 2023 19:21:21 -0700 Subject: [PATCH 3/8] Add event extra information Foundation for more details later on. Signed-off-by: John Nunley --- src/epoll.rs | 34 +++++++++++++++++++--------------- src/iocp/afd.rs | 31 ++++++++++++++++++++++++++++++- src/iocp/mod.rs | 21 +++++++++++++++++++++ src/kqueue.rs | 12 ++++++++++++ src/lib.rs | 6 ++++++ src/poll.rs | 26 +++++++++++++++++++++++--- src/port.rs | 28 ++++++++++++++++++++++++---- 7 files changed, 135 insertions(+), 23 deletions(-) diff --git a/src/epoll.rs b/src/epoll.rs index 6337b2d..1dd570f 100644 --- a/src/epoll.rs +++ b/src/epoll.rs @@ -59,11 +59,7 @@ impl Poller { poller.add( poller.event_fd.as_raw_fd(), - Event { - key: crate::NOTIFY_KEY, - readable: true, - writable: false, - }, + Event::readable(crate::NOTIFY_KEY), PollMode::Oneshot, )?; } @@ -177,11 +173,7 @@ impl Poller { // Set interest in timerfd. self.modify( timer_fd.as_fd(), - Event { - key: crate::NOTIFY_KEY, - readable: true, - writable: false, - }, + Event::readable(crate::NOTIFY_KEY), PollMode::Oneshot, )?; } @@ -213,11 +205,7 @@ impl Poller { let _ = read(&self.event_fd, &mut buf); self.modify( self.event_fd.as_fd(), - Event { - key: crate::NOTIFY_KEY, - readable: true, - writable: false, - }, + Event::readable(crate::NOTIFY_KEY), PollMode::Oneshot, )?; Ok(()) @@ -322,6 +310,7 @@ impl Events { key: ev.data.u64() as usize, readable: flags.intersects(read_flags()), writable: flags.intersects(write_flags()), + extra: EventExtra { flags }, } }) } @@ -336,3 +325,18 @@ impl Events { self.list.capacity() } } + +/// Extra information about this event. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct EventExtra { + flags: epoll::EventFlags, +} + +impl EventExtra { + /// Create an empty version of the data. + pub fn empty() -> EventExtra { + EventExtra { + flags: epoll::EventFlags::empty(), + } + } +} diff --git a/src/iocp/afd.rs b/src/iocp/afd.rs index 4fb1bf8..c63b8a0 100644 --- a/src/iocp/afd.rs +++ b/src/iocp/afd.rs @@ -66,7 +66,7 @@ impl AfdPollInfo { } } -#[derive(Default, Copy, Clone)] +#[derive(Default, Copy, Clone, PartialEq, Eq)] #[repr(transparent)] pub(super) struct AfdPollMask(u32); @@ -91,6 +91,35 @@ impl AfdPollMask { } } +impl fmt::Debug for AfdPollMask { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + const FLAGS: &[(&str, AfdPollMask)] = &[ + ("RECEIVE", AfdPollMask::RECEIVE), + ("RECEIVE_EXPEDITED", AfdPollMask::RECEIVE_EXPEDITED), + ("SEND", AfdPollMask::SEND), + ("DISCONNECT", AfdPollMask::DISCONNECT), + ("ABORT", AfdPollMask::ABORT), + ("LOCAL_CLOSE", AfdPollMask::LOCAL_CLOSE), + ("ACCEPT", AfdPollMask::ACCEPT), + ("CONNECT_FAIL", AfdPollMask::CONNECT_FAIL), + ]; + + let mut first = true; + for (name, value) in FLAGS { + if self.intersects(*value) { + if !first { + write!(f, " | ")?; + } + + first = false; + write!(f, "{}", name)?; + } + } + + Ok(()) + } +} + impl ops::BitOr for AfdPollMask { type Output = Self; diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index 04423e1..4c4dea1 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -637,6 +637,27 @@ impl Events { pub(super) fn clear(&mut self) { self.packets.clear(); } + + /// The capacity of the list of events. + pub(super) fn capacity(&self) -> usize { + self.packets.capacity() + } +} + +/// Extra information about an event. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct EventExtra { + /// Flags associated with this event. + flags: AfdPollMask, +} + +impl EventExtra { + /// Create a new, empty version of this struct. + pub fn empty() -> EventExtra { + EventExtra { + flags: AfdPollMask::empty(), + } + } } /// A packet used to wake up the poller with an event. diff --git a/src/kqueue.rs b/src/kqueue.rs index e244188..2c7e511 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -247,6 +247,7 @@ impl Events { writable: matches!(ev.filter(), kqueue::EventFilter::Write(..)) || (matches!(ev.filter(), kqueue::EventFilter::Read(..)) && (ev.flags().intersects(kqueue::EventFlags::EOF))), + extra: EventExtra, }) } @@ -261,6 +262,17 @@ impl Events { } } +/// Extra information associated with an event. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct EventExtra; + +impl EventExtra { + /// Create a new, empty version of this struct. + pub fn empty() -> EventExtra { + EventExtra + } +} + pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags { use kqueue::EventFlags as EV; diff --git a/src/lib.rs b/src/lib.rs index f2a792b..c02b117 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -132,6 +132,8 @@ pub struct Event { pub readable: bool, /// Can it do a write operation without blocking? pub writable: bool, + /// System-specific event data. + extra: sys::EventExtra, } /// The mode in which the poller waits for I/O events. @@ -187,6 +189,7 @@ impl Event { key, readable: true, writable: true, + extra: sys::EventExtra::empty(), } } @@ -198,6 +201,7 @@ impl Event { key, readable: true, writable: false, + extra: sys::EventExtra::empty(), } } @@ -209,6 +213,7 @@ impl Event { key, readable: false, writable: true, + extra: sys::EventExtra::empty(), } } @@ -220,6 +225,7 @@ impl Event { key, readable: false, writable: false, + extra: sys::EventExtra::empty(), } } } diff --git a/src/poll.rs b/src/poll.rs index ee61379..59f3ce7 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -268,10 +268,12 @@ impl Poller { let poll_fd = &mut fds.poll_fds[fd_data.poll_fds_index]; if !poll_fd.revents().is_empty() { // Store event + let revents = poll_fd.revents(); events.inner.push(Event { key: fd_data.key, - readable: poll_fd.revents().intersects(read_events()), - writable: poll_fd.revents().intersects(write_events()), + readable: revents.intersects(read_events()), + writable: revents.intersects(write_events()), + extra: EventExtra { flags: revents }, }); // Remove interest if necessary if fd_data.remove { @@ -365,7 +367,9 @@ pub struct Events { impl Events { /// Creates an empty list. pub fn with_capacity(cap: usize) -> Events { - Self { inner: Vec::with_capacity(cap) } + Self { + inner: Vec::with_capacity(cap), + } } /// Iterates over I/O events. @@ -384,6 +388,22 @@ impl Events { } } +/// Extra information associated with an event. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct EventExtra { + /// Flags associated with this event. + flags: PollFlags, +} + +impl EventExtra { + /// Creates an empty set of extra information. + pub fn empty() -> Self { + Self { + flags: PollFlags::empty(), + } + } +} + fn cvt_mode_as_remove(mode: PollMode) -> io::Result { match mode { PollMode::Oneshot => Ok(true), diff --git a/src/port.rs b/src/port.rs index 7b65ff6..6225228 100644 --- a/src/port.rs +++ b/src/port.rs @@ -189,10 +189,14 @@ impl Events { /// Iterates over I/O events. pub fn iter(&self) -> impl Iterator + '_ { - self.list.iter().map(|ev| Event { - key: ev.userdata() as usize, - readable: PollFlags::from_bits_truncate(ev.events() as _).intersects(read_flags()), - writable: PollFlags::from_bits_truncate(ev.events() as _).intersects(write_flags()), + self.list.iter().map(|ev| { + let flags = PollFlags::from_bits_truncate(ev.events() as _); + Event { + key: ev.userdata() as usize, + readable: flags.intersects(read_flags()), + writable: flags.intersects(write_flags()), + extra: EventExtra { flags }, + } }) } @@ -206,3 +210,19 @@ impl Events { self.list.capacity() } } + +/// Extra information associated with an event. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct EventExtra { + /// Flags associated with this event. + flags: PollFlags, +} + +impl EventExtra { + /// Create a new, empty version of this struct. + pub fn empty() -> EventExtra { + EventExtra { + flags: PollFlags::empty(), + } + } +} From 1951527cbbdc0f911d371fd1e92650f98644fbad Mon Sep 17 00:00:00 2001 From: John Nunley Date: Tue, 8 Aug 2023 19:52:43 -0700 Subject: [PATCH 4/8] Add PRI and HUP events Signed-off-by: John Nunley --- src/epoll.rs | 24 ++++++++++- src/iocp/afd.rs | 14 +++++++ src/iocp/mod.rs | 78 ++++++++++++++++++++++++------------ src/kqueue.rs | 20 ++++++++++ src/lib.rs | 104 ++++++++++++++++++++++++++++++++++++++++++++++++ src/poll.rs | 20 ++++++++++ src/port.rs | 20 ++++++++++ 7 files changed, 252 insertions(+), 28 deletions(-) diff --git a/src/epoll.rs b/src/epoll.rs index 1dd570f..024af0f 100644 --- a/src/epoll.rs +++ b/src/epoll.rs @@ -102,7 +102,7 @@ impl Poller { &self.epoll_fd, unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) }, epoll::EventData::new_u64(ev.key as u64), - epoll_flags(&ev, mode), + epoll_flags(&ev, mode) | ev.extra.flags, )?; Ok(()) @@ -122,7 +122,7 @@ impl Poller { &self.epoll_fd, fd, epoll::EventData::new_u64(ev.key as u64), - epoll_flags(&ev, mode), + epoll_flags(&ev, mode) | ev.extra.flags, )?; Ok(()) @@ -339,4 +339,24 @@ impl EventExtra { flags: epoll::EventFlags::empty(), } } + + /// Add the interrupt flag to this event. + pub fn set_hup(&mut self) { + self.flags |= epoll::EventFlags::HUP; + } + + /// Add the priority flag to this event. + pub fn set_pri(&mut self) { + self.flags |= epoll::EventFlags::PRI; + } + + /// Tell if the interrupt flag is set. + pub fn is_hup(&self) -> bool { + self.flags.contains(epoll::EventFlags::HUP) + } + + /// Tell if the priority flag is set. + pub fn is_pri(&self) -> bool { + self.flags.contains(epoll::EventFlags::PRI) + } } diff --git a/src/iocp/afd.rs b/src/iocp/afd.rs index c63b8a0..70cd33b 100644 --- a/src/iocp/afd.rs +++ b/src/iocp/afd.rs @@ -134,6 +134,20 @@ impl ops::BitOrAssign for AfdPollMask { } } +impl ops::BitAnd for AfdPollMask { + type Output = Self; + + fn bitand(self, rhs: Self) -> Self { + AfdPollMask(self.0 & rhs.0) + } +} + +impl ops::BitAndAssign for AfdPollMask { + fn bitand_assign(&mut self, rhs: Self) { + self.0 &= rhs.0; + } +} + pub(super) trait HasAfdInfo { fn afd_info(self: Pin<&Self>) -> Pin<&UnsafeCell>; } diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index 4c4dea1..cdf6d6e 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -658,6 +658,26 @@ impl EventExtra { flags: AfdPollMask::empty(), } } + + /// Is this a HUP event? + pub fn is_hup(&self) -> bool { + self.flags.intersects(AfdPollMask::ABORT) + } + + /// Is this a PRI event? + pub fn is_pri(&self) -> bool { + self.flags.intersects(AfdPollMask::RECEIVE_EXPEDITED) + } + + /// Set up a listener for HUP events. + pub fn set_hup(&mut self) { + self.flags |= AfdPollMask::ABORT; + } + + /// Set up a listener for PRI events. + pub fn set_pri(&mut self) { + self.flags |= AfdPollMask::RECEIVE_EXPEDITED; + } } /// A packet used to wake up the poller with an event. @@ -758,9 +778,13 @@ impl PacketUnwrapped { socket.mode = mode; socket.interest_error = true; + // If there was a change, indicate that we need an update. match socket.status { - SocketStatus::Polling { readable, writable } => { - (interest.readable && !readable) || (interest.writable && !writable) + SocketStatus::Polling { flags } => { + let our_flags = event_to_afd_mask(interest.readable, interest.writable, true) + | interest.extra.flags; + + our_flags != flags } _ => true, } @@ -775,7 +799,7 @@ impl PacketUnwrapped { // Update if there is no ongoing wait. handle.status.is_idle() } - _ => false, + _ => true } } @@ -847,12 +871,16 @@ impl PacketUnwrapped { // Check the current status. match socket.status { - SocketStatus::Polling { readable, writable } => { + SocketStatus::Polling { flags } => { // If we need to poll for events aside from what we are currently polling, we need // to update the packet. Cancel the ongoing poll. - if (socket.interest.readable && !readable) - || (socket.interest.writable && !writable) - { + let our_flags = event_to_afd_mask( + socket.interest.readable, + socket.interest.writable, + socket.interest_error, + ) | socket.interest.extra.flags; + + if our_flags != flags { return self.cancel(socket); } @@ -868,15 +896,12 @@ impl PacketUnwrapped { SocketStatus::Idle => { // Start a new poll. - let result = socket.afd.poll( - self.clone(), - socket.base_socket, - event_to_afd_mask( - socket.interest.readable, - socket.interest.writable, - socket.interest_error, - ), - ); + let mask = event_to_afd_mask( + socket.interest.readable, + socket.interest.writable, + socket.interest_error, + ) | socket.interest.extra.flags; + let result = socket.afd.poll(self.clone(), socket.base_socket, mask); match result { Ok(()) => {} @@ -897,10 +922,7 @@ impl PacketUnwrapped { } // We are now polling for the current events. - socket.status = SocketStatus::Polling { - readable: socket.interest.readable, - writable: socket.interest.writable, - }; + socket.status = SocketStatus::Polling { flags: mask }; Ok(()) } @@ -988,6 +1010,7 @@ impl PacketUnwrapped { let (readable, writable) = afd_mask_to_event(events); event.readable = readable; event.writable = writable; + event.extra.flags = events; } } } @@ -999,7 +1022,13 @@ impl PacketUnwrapped { // If this event doesn't have anything that interests us, don't return or // update the oneshot state. - let return_value = if event.readable || event.writable { + let return_value = if event.readable + || event.writable + || event + .extra + .flags + .intersects(socket_state.interest.extra.flags) + { // If we are in oneshot mode, remove the interest. if matches!(socket_state.mode, PollMode::Oneshot) { socket_state.interest = Event::none(socket_state.interest.key); @@ -1099,11 +1128,8 @@ enum SocketStatus { /// We are currently polling these events. Polling { - /// We are currently polling for readable events. - readable: bool, - - /// We are currently polling for writable events. - writable: bool, + /// The flags we are currently polling for. + flags: AfdPollMask, }, /// The last poll operation was cancelled, and we're waiting for it to diff --git a/src/kqueue.rs b/src/kqueue.rs index 2c7e511..aca51d5 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -271,6 +271,26 @@ impl EventExtra { pub fn empty() -> EventExtra { EventExtra } + + /// Set the interrupt flag. + pub fn set_hup(&mut self) { + // No-op. + } + + /// Set the priority flag. + pub fn set_pri(&mut self) { + // No-op. + } + + /// Is the interrupt flag set? + pub fn is_hup(&self) -> bool { + false + } + + /// Is the priority flag set? + pub fn is_pri(&self) -> bool { + false + } } pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags { diff --git a/src/lib.rs b/src/lib.rs index c02b117..bd748ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -228,6 +228,110 @@ impl Event { extra: sys::EventExtra::empty(), } } + + /// Add interruption events to this interest. + /// + /// This usually indicates that the file descriptor or socket has been closed. It corresponds + /// to the `EPOLLHUP` and `POLLHUP` events. + /// + /// Interruption events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this function is a no-op. + pub fn set_interrupt(&mut self) { + self.extra.set_hup(); + } + + /// Add interruption events to this interest. + /// + /// This usually indicates that the file descriptor or socket has been closed. It corresponds + /// to the `EPOLLHUP` and `POLLHUP` events. + /// + /// Interruption events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this function is a no-op. + pub fn with_interrupt(mut self) -> Self { + self.set_interrupt(); + self + } + + /// Add priority events to this interest. + /// + /// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and + /// `POLLPRI` events. + /// + /// Priority events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this function is a no-op. + pub fn set_priority(&mut self) { + self.extra.set_pri(); + } + + /// Add priority events to this interest. + /// + /// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and + /// `POLLPRI` events. + /// + /// Priority events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this function is a no-op. + pub fn with_priority(mut self) -> Self { + self.set_priority(); + self + } + + /// Tell if this event is the result of an interrupt notification. + /// + /// This usually indicates that the file descriptor or socket has been closed. It corresponds + /// to the `EPOLLHUP` and `POLLHUP` events. + /// + /// Interruption events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this always returns `false`. + pub fn is_interrupt(&self) -> bool { + self.extra.is_hup() + } + + /// Tell if this event is the result of a priority notification. + /// + /// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and + /// `POLLPRI` events. + /// + /// Priority events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this always returns `false`. + pub fn is_priority(&self) -> bool { + self.extra.is_pri() + } } /// Waits for I/O events. diff --git a/src/poll.rs b/src/poll.rs index 59f3ce7..e73ea59 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -402,6 +402,26 @@ impl EventExtra { flags: PollFlags::empty(), } } + + /// Set the interrupt flag. + pub fn set_hup(&mut self) { + self.flags.set(PollFlags::HUP, true); + } + + /// Set the priority flag. + pub fn set_pri(&mut self) { + self.flags.set(PollFlags::PRI, true); + } + + /// Is this an interrupt event? + pub fn is_hup(&self) -> bool { + self.flags.contains(PollFlags::HUP) + } + + /// Is this a priority event? + pub fn is_pri(&self) -> bool { + self.flags.contains(PollFlags::PRI) + } } fn cvt_mode_as_remove(mode: PollMode) -> io::Result { diff --git a/src/port.rs b/src/port.rs index 6225228..be8668b 100644 --- a/src/port.rs +++ b/src/port.rs @@ -225,4 +225,24 @@ impl EventExtra { flags: PollFlags::empty(), } } + + /// Set the interrupt flag. + pub fn set_hup(&mut self) { + self.flags.set(PollFlags::HUP, true); + } + + /// Set the priority flag. + pub fn set_pri(&mut self) { + self.flags.set(PollFlags::PRI, true); + } + + /// Is this an interrupt event? + pub fn is_hup(&self) -> bool { + self.flags.contains(PollFlags::HUP) + } + + /// Is this a priority event? + pub fn is_pri(&self) -> bool { + self.flags.contains(PollFlags::PRI) + } } From a568261202f12180a1bcf6935c0cee8a0607bb5b Mon Sep 17 00:00:00 2001 From: John Nunley Date: Tue, 8 Aug 2023 20:58:35 -0700 Subject: [PATCH 5/8] Fix various failing tests - Make sure that waitable handles interact properly with the new infrastructure - Fix failing doctests Signed-off-by: John Nunley --- src/iocp/mod.rs | 7 ++-- src/lib.rs | 16 ++++---- src/os/iocp.rs | 22 +++++------ src/os/kqueue.rs | 15 +++++--- tests/concurrent_modification.rs | 8 +++- tests/io.rs | 5 ++- tests/other_modes.rs | 65 ++++++++++++++++---------------- tests/windows_post.rs | 5 ++- tests/windows_waitable.rs | 10 +++-- 9 files changed, 86 insertions(+), 67 deletions(-) diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index cdf6d6e..d27d252 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -781,8 +781,9 @@ impl PacketUnwrapped { // If there was a change, indicate that we need an update. match socket.status { SocketStatus::Polling { flags } => { - let our_flags = event_to_afd_mask(interest.readable, interest.writable, true) - | interest.extra.flags; + let our_flags = + event_to_afd_mask(interest.readable, interest.writable, true) + | interest.extra.flags; our_flags != flags } @@ -799,7 +800,7 @@ impl PacketUnwrapped { // Update if there is no ongoing wait. handle.status.is_idle() } - _ => true + _ => true, } } diff --git a/src/lib.rs b/src/lib.rs index bd748ce..7674118 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,7 +18,7 @@ //! # Examples //! //! ```no_run -//! use polling::{Event, Poller}; +//! use polling::{Event, Events, Poller}; //! use std::net::TcpListener; //! //! // Create a TCP listener. @@ -33,13 +33,13 @@ //! } //! //! // The event loop. -//! let mut events = Vec::new(); +//! let mut events = Events::new(); //! loop { //! // Wait for at least one I/O event. //! events.clear(); //! poller.wait(&mut events, None)?; //! -//! for ev in &events { +//! for ev in events.iter() { //! if ev.key == key { //! // Perform a non-blocking accept operation. //! socket.accept()?; @@ -603,7 +603,7 @@ impl Poller { /// # Examples /// /// ``` - /// use polling::{Event, Poller}; + /// use polling::{Event, Events, Poller}; /// use std::net::TcpListener; /// use std::time::Duration; /// @@ -616,7 +616,7 @@ impl Poller { /// poller.add(&socket, Event::all(key))?; /// } /// - /// let mut events = Vec::new(); + /// let mut events = Events::new(); /// let n = poller.wait(&mut events, Some(Duration::from_secs(1)))?; /// poller.delete(&socket)?; /// # std::io::Result::Ok(()) @@ -650,14 +650,14 @@ impl Poller { /// # Examples /// /// ``` - /// use polling::Poller; + /// use polling::{Events, Poller}; /// /// let poller = Poller::new()?; /// /// // Notify the poller. /// poller.notify()?; /// - /// let mut events = Vec::new(); + /// let mut events = Events::new(); /// poller.wait(&mut events, None)?; // wakes up immediately /// assert!(events.is_empty()); /// # std::io::Result::Ok(()) @@ -804,7 +804,7 @@ impl Events { /// ``` #[inline] pub fn is_empty(&self) -> bool { - self.len() != 0 + self.len() == 0 } /// Get the total capacity of the list. diff --git a/src/os/iocp.rs b/src/os/iocp.rs index 625606c..bc37843 100644 --- a/src/os/iocp.rs +++ b/src/os/iocp.rs @@ -19,7 +19,7 @@ pub trait PollerIocpExt: PollerSealed { /// # Examples /// /// ```rust - /// use polling::{Poller, Event}; + /// use polling::{Poller, Event, Events}; /// use polling::os::iocp::{CompletionPacket, PollerIocpExt}; /// /// use std::thread; @@ -39,7 +39,7 @@ pub trait PollerIocpExt: PollerSealed { /// }); /// /// // Wait for the event. - /// let mut events = vec![]; + /// let mut events = Events::new(); /// poller.wait(&mut events, None)?; /// /// assert_eq!(events.len(), 1); @@ -72,7 +72,7 @@ pub trait PollerIocpExt: PollerSealed { /// # Examples /// /// ```no_run - /// use polling::{Poller, Event, PollMode}; + /// use polling::{Poller, Event, Events, PollMode}; /// use polling::os::iocp::PollerIocpExt; /// /// use std::process::Command; @@ -92,11 +92,11 @@ pub trait PollerIocpExt: PollerSealed { /// } /// /// // Wait for the child process to exit. - /// let mut events = vec![]; + /// let mut events = Events::new(); /// poller.wait(&mut events, None).unwrap(); /// /// assert_eq!(events.len(), 1); - /// assert_eq!(events[0], Event::all(0)); + /// assert_eq!(events.iter().next().unwrap(), Event::all(0)); /// ``` unsafe fn add_waitable( &self, @@ -115,7 +115,7 @@ pub trait PollerIocpExt: PollerSealed { /// # Examples /// /// ```no_run - /// use polling::{Poller, Event, PollMode}; + /// use polling::{Poller, Event, Events, PollMode}; /// use polling::os::iocp::PollerIocpExt; /// /// use std::process::Command; @@ -135,11 +135,11 @@ pub trait PollerIocpExt: PollerSealed { /// } /// /// // Wait for the child process to exit. - /// let mut events = vec![]; + /// let mut events = Events::new(); /// poller.wait(&mut events, None).unwrap(); /// /// assert_eq!(events.len(), 1); - /// assert_eq!(events[0], Event::all(0)); + /// assert_eq!(events.iter().next().unwrap(), Event::all(0)); /// /// // Modify the waitable handle. /// poller.modify_waitable(&child, Event::readable(0), PollMode::Oneshot).unwrap(); @@ -161,7 +161,7 @@ pub trait PollerIocpExt: PollerSealed { /// # Examples /// /// ```no_run - /// use polling::{Poller, Event, PollMode}; + /// use polling::{Poller, Event, Events, PollMode}; /// use polling::os::iocp::PollerIocpExt; /// /// use std::process::Command; @@ -181,11 +181,11 @@ pub trait PollerIocpExt: PollerSealed { /// } /// /// // Wait for the child process to exit. - /// let mut events = vec![]; + /// let mut events = Events::new(); /// poller.wait(&mut events, None).unwrap(); /// /// assert_eq!(events.len(), 1); - /// assert_eq!(events[0], Event::all(0)); + /// assert_eq!(events.iter().next().unwrap(), Event::all(0)); /// /// // Remove the waitable handle. /// poller.remove_waitable(&child).unwrap(); diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs index 684bd3e..c3db033 100644 --- a/src/os/kqueue.rs +++ b/src/os/kqueue.rs @@ -31,7 +31,7 @@ pub trait PollerKqueueExt: PollerSealed { /// # Examples /// /// ```no_run - /// use polling::{Poller, PollMode}; + /// use polling::{Events, Poller, PollMode}; /// use polling::os::kqueue::{Filter, PollerKqueueExt, Signal}; /// /// let poller = Poller::new().unwrap(); @@ -40,7 +40,7 @@ pub trait PollerKqueueExt: PollerSealed { /// poller.add_filter(Signal(libc::SIGINT), 0, PollMode::Oneshot).unwrap(); /// /// // Wait for the signal. - /// let mut events = vec![]; + /// let mut events = Events::new(); /// poller.wait(&mut events, None).unwrap(); /// # let _ = events; /// ``` @@ -54,7 +54,7 @@ pub trait PollerKqueueExt: PollerSealed { /// # Examples /// /// ```no_run - /// use polling::{Poller, PollMode}; + /// use polling::{Events, Poller, PollMode}; /// use polling::os::kqueue::{Filter, PollerKqueueExt, Signal}; /// /// let poller = Poller::new().unwrap(); @@ -66,7 +66,7 @@ pub trait PollerKqueueExt: PollerSealed { /// poller.modify_filter(Signal(libc::SIGINT), 1, PollMode::Oneshot).unwrap(); /// /// // Wait for the signal. - /// let mut events = vec![]; + /// let mut events = Events::new(); /// poller.wait(&mut events, None).unwrap(); /// # let _ = events; /// ``` @@ -183,7 +183,12 @@ pub enum ProcessOps { impl<'a> Process<'a> { /// Monitor a child process. - pub fn new(child: &'a Child, ops: ProcessOps) -> Self { + /// + /// # Safety + /// + /// Once registered into the `Poller`, the `Child` object must outlive this filter's + /// registration into the poller. + pub unsafe fn new(child: &'a Child, ops: ProcessOps) -> Self { Self { child, ops } } } diff --git a/tests/concurrent_modification.rs b/tests/concurrent_modification.rs index ff00ddb..b0333b2 100644 --- a/tests/concurrent_modification.rs +++ b/tests/concurrent_modification.rs @@ -33,7 +33,9 @@ fn concurrent_add() -> io::Result<()> { poller.delete(&reader)?; result?; - assert_eq!(events.iter().collect::>(), [Event::readable(0)]); + assert_eq!(events.len(), 1); + assert!(events.iter().next().unwrap().readable); + assert!(!events.iter().next().unwrap().writable); Ok(()) } @@ -63,7 +65,9 @@ fn concurrent_modify() -> io::Result<()> { .into_iter() .collect::>()?; - assert_eq!(events.iter().collect::>(), [Event::readable(0)]); + assert_eq!(events.len(), 1); + assert!(events.iter().next().unwrap().readable); + assert!(!events.iter().next().unwrap().writable); Ok(()) } diff --git a/tests/io.rs b/tests/io.rs index 965e477..404f389 100644 --- a/tests/io.rs +++ b/tests/io.rs @@ -29,7 +29,10 @@ fn basic_io() { .unwrap(), 1 ); - assert_eq!(events.iter().collect::>(), &[Event::readable(1)]); + + assert_eq!(events.len(), 1); + assert!(events.iter().next().unwrap().readable); + assert!(!events.iter().next().unwrap().writable); poller.delete(&read).unwrap(); } diff --git a/tests/other_modes.rs b/tests/other_modes.rs index 863ed36..e0d8cf2 100644 --- a/tests/other_modes.rs +++ b/tests/other_modes.rs @@ -39,10 +39,10 @@ fn level_triggered() { .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!( - events.iter().collect::>(), - [Event::readable(reader_token)] - ); + assert_eq!(events.len(), 1); + assert_eq!(events.iter().next().unwrap().key, reader_token); + assert!(events.iter().next().unwrap().readable); + assert!(!events.iter().next().unwrap().writable); // If we read some of the data, the notification should still be available. reader.read_exact(&mut [0; 3]).unwrap(); @@ -50,10 +50,11 @@ fn level_triggered() { poller .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!( - events.iter().collect::>(), - [Event::readable(reader_token)] - ); + + assert_eq!(events.len(), 1); + assert_eq!(events.iter().next().unwrap().key, reader_token); + assert!(events.iter().next().unwrap().readable); + assert!(!events.iter().next().unwrap().writable); // If we read the rest of the data, the notification should be gone. reader.read_exact(&mut [0; 2]).unwrap(); @@ -77,10 +78,10 @@ fn level_triggered() { .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!( - events.iter().collect::>(), - [Event::readable(reader_token)] - ); + assert_eq!(events.len(), 1); + assert_eq!(events.iter().next().unwrap().key, reader_token); + assert!(events.iter().next().unwrap().readable); + assert!(!events.iter().next().unwrap().writable); // After reading, the notification should vanish. reader.read(&mut [0; 5]).unwrap(); @@ -137,10 +138,10 @@ fn edge_triggered() { .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!( - events.iter().collect::>(), - [Event::readable(reader_token)] - ); + assert_eq!(events.len(), 1); + assert_eq!(events.iter().next().unwrap().key, reader_token); + assert!(events.iter().next().unwrap().readable); + assert!(!events.iter().next().unwrap().writable); // If we read some of the data, the notification should not still be available. reader.read_exact(&mut [0; 3]).unwrap(); @@ -156,10 +157,11 @@ fn edge_triggered() { poller .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!( - events.iter().collect::>(), - [Event::readable(reader_token)] - ); + + assert_eq!(events.len(), 1); + assert_eq!(events.iter().next().unwrap().key, reader_token); + assert!(events.iter().next().unwrap().readable); + assert!(!events.iter().next().unwrap().writable); // After modifying the stream and sending more data, it should be oneshot. poller @@ -172,10 +174,10 @@ fn edge_triggered() { .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!( - events.iter().collect::>(), - [Event::readable(reader_token)] - ); + assert_eq!(events.len(), 1); + assert_eq!(events.iter().next().unwrap().key, reader_token); + assert!(events.iter().next().unwrap().readable); + assert!(!events.iter().next().unwrap().writable); } #[test] @@ -229,10 +231,9 @@ fn edge_oneshot_triggered() { .wait(&mut events, Some(Duration::from_secs(10))) .unwrap(); - assert_eq!( - events.iter().collect::>(), - [Event::readable(reader_token)] - ); + assert_eq!(events.len(), 1); + assert!(events.iter().next().unwrap().readable); + assert!(!events.iter().next().unwrap().writable); // If we read some of the data, the notification should not still be available. reader.read_exact(&mut [0; 3]).unwrap(); @@ -254,10 +255,10 @@ fn edge_oneshot_triggered() { poller .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!( - events.iter().collect::>(), - [Event::readable(reader_token)] - ); + + assert_eq!(events.len(), 1); + assert!(events.iter().next().unwrap().readable); + assert!(!events.iter().next().unwrap().writable); } fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> { diff --git a/tests/windows_post.rs b/tests/windows_post.rs index 425b09c..c0bef9f 100644 --- a/tests/windows_post.rs +++ b/tests/windows_post.rs @@ -47,7 +47,10 @@ fn post_multithread() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events.iter().next(), Some(Event::writable(i))); + assert_eq!(events.iter().next().unwrap().key, i); + assert!(!events.iter().next().unwrap().readable); + assert!(events.iter().next().unwrap().writable); + events.clear(); } poller diff --git a/tests/windows_waitable.rs b/tests/windows_waitable.rs index bd17a72..fd05983 100644 --- a/tests/windows_waitable.rs +++ b/tests/windows_waitable.rs @@ -3,7 +3,7 @@ #![cfg(windows)] use polling::os::iocp::PollerIocpExt; -use polling::{Event, PollMode, Poller}; +use polling::{Event, Events, PollMode, Poller}; use windows_sys::Win32::Foundation::CloseHandle; use windows_sys::Win32::System::Threading::{CreateEventW, ResetEvent, SetEvent}; @@ -85,7 +85,7 @@ fn smoke() { .unwrap(); } - let mut events = vec![]; + let mut events = Events::new(); poller .wait(&mut events, Some(Duration::from_millis(100))) .unwrap(); @@ -100,7 +100,8 @@ fn smoke() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events[0], Event::all(0)); + assert!(events.iter().next().unwrap().readable); + assert!(events.iter().next().unwrap().writable); // Interest should be cleared. events.clear(); @@ -121,7 +122,8 @@ fn smoke() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events[0], Event::all(0)); + assert!(events.iter().next().unwrap().readable); + assert!(events.iter().next().unwrap().writable); // If we reset the event, it should not be signaled. event.reset().unwrap(); From 1a10d2ac470e2c60abf3c2633e13a5d1ef926bb6 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 12 Aug 2023 08:53:27 -0700 Subject: [PATCH 6/8] Review comments - Make set_* take a boolean for the value of the flag - Make Events !Sync - Fix visibility modifiers - Inline more methods - Use a better strategy for testing Signed-off-by: John Nunley --- src/epoll.rs | 13 ++++--- src/iocp/afd.rs | 9 +++++ src/iocp/mod.rs | 21 ++++++----- src/kqueue.rs | 9 +++-- src/lib.rs | 54 ++++++++++++++++++++++++---- src/poll.rs | 13 ++++--- src/port.rs | 13 ++++--- tests/concurrent_modification.rs | 12 ++++--- tests/io.rs | 7 ++-- tests/many_connections.rs | 7 ++-- tests/notify.rs | 1 + tests/other_modes.rs | 60 +++++++++++++++++++------------- tests/windows_post.rs | 12 ++++--- tests/windows_waitable.rs | 6 ++-- 14 files changed, 166 insertions(+), 71 deletions(-) diff --git a/src/epoll.rs b/src/epoll.rs index 024af0f..713a089 100644 --- a/src/epoll.rs +++ b/src/epoll.rs @@ -334,6 +334,7 @@ pub struct EventExtra { impl EventExtra { /// Create an empty version of the data. + #[inline] pub fn empty() -> EventExtra { EventExtra { flags: epoll::EventFlags::empty(), @@ -341,21 +342,25 @@ impl EventExtra { } /// Add the interrupt flag to this event. - pub fn set_hup(&mut self) { - self.flags |= epoll::EventFlags::HUP; + #[inline] + pub fn set_hup(&mut self, active: bool) { + self.flags.set(epoll::EventFlags::HUP, active); } /// Add the priority flag to this event. - pub fn set_pri(&mut self) { - self.flags |= epoll::EventFlags::PRI; + #[inline] + pub fn set_pri(&mut self, active: bool) { + self.flags.set(epoll::EventFlags::PRI, active); } /// Tell if the interrupt flag is set. + #[inline] pub fn is_hup(&self) -> bool { self.flags.contains(epoll::EventFlags::HUP) } /// Tell if the priority flag is set. + #[inline] pub fn is_pri(&self) -> bool { self.flags.contains(epoll::EventFlags::PRI) } diff --git a/src/iocp/afd.rs b/src/iocp/afd.rs index 70cd33b..faf3bab 100644 --- a/src/iocp/afd.rs +++ b/src/iocp/afd.rs @@ -89,6 +89,15 @@ impl AfdPollMask { pub(crate) fn intersects(self, other: AfdPollMask) -> bool { (self.0 & other.0) != 0 } + + /// Sets a flag. + pub(crate) fn set(&mut self, other: AfdPollMask, value: bool) { + if value { + *self |= other; + } else { + self.0 &= !other.0; + } + } } impl fmt::Debug for AfdPollMask { diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index d27d252..4d01cc3 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -622,24 +622,24 @@ unsafe impl Send for Events {} impl Events { /// Creates an empty list of events. - pub(super) fn with_capacity(cap: usize) -> Events { + pub fn with_capacity(cap: usize) -> Events { Events { packets: Vec::with_capacity(cap), } } /// Iterate over I/O events. - pub(super) fn iter(&self) -> impl Iterator + '_ { + pub fn iter(&self) -> impl Iterator + '_ { self.packets.iter().copied() } /// Clear the list of events. - pub(super) fn clear(&mut self) { + pub fn clear(&mut self) { self.packets.clear(); } /// The capacity of the list of events. - pub(super) fn capacity(&self) -> usize { + pub fn capacity(&self) -> usize { self.packets.capacity() } } @@ -653,6 +653,7 @@ pub struct EventExtra { impl EventExtra { /// Create a new, empty version of this struct. + #[inline] pub fn empty() -> EventExtra { EventExtra { flags: AfdPollMask::empty(), @@ -660,23 +661,27 @@ impl EventExtra { } /// Is this a HUP event? + #[inline] pub fn is_hup(&self) -> bool { self.flags.intersects(AfdPollMask::ABORT) } /// Is this a PRI event? + #[inline] pub fn is_pri(&self) -> bool { self.flags.intersects(AfdPollMask::RECEIVE_EXPEDITED) } /// Set up a listener for HUP events. - pub fn set_hup(&mut self) { - self.flags |= AfdPollMask::ABORT; + #[inline] + pub fn set_hup(&mut self, active: bool) { + self.flags.set(AfdPollMask::ABORT, active); } /// Set up a listener for PRI events. - pub fn set_pri(&mut self) { - self.flags |= AfdPollMask::RECEIVE_EXPEDITED; + #[inline] + pub fn set_pri(&mut self, active: bool) { + self.flags.set(AfdPollMask::RECEIVE_EXPEDITED, active); } } diff --git a/src/kqueue.rs b/src/kqueue.rs index aca51d5..dd0ce34 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -268,26 +268,31 @@ pub struct EventExtra; impl EventExtra { /// Create a new, empty version of this struct. + #[inline] pub fn empty() -> EventExtra { EventExtra } /// Set the interrupt flag. - pub fn set_hup(&mut self) { + #[inline] + pub fn set_hup(&mut self, _value: bool) { // No-op. } /// Set the priority flag. - pub fn set_pri(&mut self) { + #[inline] + pub fn set_pri(&mut self, _value: bool) { // No-op. } /// Is the interrupt flag set? + #[inline] pub fn is_hup(&self) -> bool { false } /// Is the priority flag set? + #[inline] pub fn is_pri(&self) -> bool { false } diff --git a/src/lib.rs b/src/lib.rs index 7674118..ea2e7ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,8 +65,10 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] +use std::cell::Cell; use std::fmt; use std::io; +use std::marker::PhantomData; use std::num::NonZeroUsize; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; @@ -242,8 +244,9 @@ impl Event { /// - Event Ports /// /// On other platforms, this function is a no-op. - pub fn set_interrupt(&mut self) { - self.extra.set_hup(); + #[inline] + pub fn set_interrupt(&mut self, active: bool) { + self.extra.set_hup(active); } /// Add interruption events to this interest. @@ -259,8 +262,9 @@ impl Event { /// - Event Ports /// /// On other platforms, this function is a no-op. + #[inline] pub fn with_interrupt(mut self) -> Self { - self.set_interrupt(); + self.set_interrupt(true); self } @@ -277,8 +281,9 @@ impl Event { /// - Event Ports /// /// On other platforms, this function is a no-op. - pub fn set_priority(&mut self) { - self.extra.set_pri(); + #[inline] + pub fn set_priority(&mut self, active: bool) { + self.extra.set_pri(active); } /// Add priority events to this interest. @@ -294,8 +299,9 @@ impl Event { /// - Event Ports /// /// On other platforms, this function is a no-op. + #[inline] pub fn with_priority(mut self) -> Self { - self.set_priority(); + self.set_priority(true); self } @@ -312,6 +318,7 @@ impl Event { /// - Event Ports /// /// On other platforms, this always returns `false`. + #[inline] pub fn is_interrupt(&self) -> bool { self.extra.is_hup() } @@ -329,9 +336,25 @@ impl Event { /// - Event Ports /// /// On other platforms, this always returns `false`. + #[inline] pub fn is_priority(&self) -> bool { self.extra.is_pri() } + + /// Remove any extra information from this event. + #[inline] + pub fn clear_extra(&mut self) { + self.extra = sys::EventExtra::empty(); + } + + /// Get a version of this event with no extra information. + /// + /// This is useful for comparing events with `==`. + #[inline] + pub fn with_no_extra(mut self) -> Self { + self.clear_extra(); + self + } } /// Waits for I/O events. @@ -680,6 +703,10 @@ impl Poller { /// A container for I/O events. pub struct Events { events: sys::Events, + + /// This is intended to be used from &mut, thread locally, so we should make it !Sync + /// for consistency with the rest of the API. + _not_sync: PhantomData>, } impl Default for Events { @@ -728,6 +755,7 @@ impl Events { pub fn with_capacity(capacity: NonZeroUsize) -> Self { Self { events: sys::Events::with_capacity(capacity.get()), + _not_sync: PhantomData, } } @@ -975,3 +1003,17 @@ cfg_if! { fn unsupported_error(err: impl Into) -> io::Error { io::Error::new(io::ErrorKind::Unsupported, err.into()) } + +fn _assert_send_and_sync() { + fn assert_send() {} + fn assert_sync() {} + + assert_send::(); + assert_sync::(); + + assert_send::(); + assert_sync::(); + + assert_send::(); + // Events can be !Sync +} diff --git a/src/poll.rs b/src/poll.rs index e73ea59..5199f87 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -397,6 +397,7 @@ pub struct EventExtra { impl EventExtra { /// Creates an empty set of extra information. + #[inline] pub fn empty() -> Self { Self { flags: PollFlags::empty(), @@ -404,21 +405,25 @@ impl EventExtra { } /// Set the interrupt flag. - pub fn set_hup(&mut self) { - self.flags.set(PollFlags::HUP, true); + #[inline] + pub fn set_hup(&mut self, value: bool) { + self.flags.set(PollFlags::HUP, value); } /// Set the priority flag. - pub fn set_pri(&mut self) { - self.flags.set(PollFlags::PRI, true); + #[inline] + pub fn set_pri(&mut self, value: bool) { + self.flags.set(PollFlags::PRI, value); } /// Is this an interrupt event? + #[inline] pub fn is_hup(&self) -> bool { self.flags.contains(PollFlags::HUP) } /// Is this a priority event? + #[inline] pub fn is_pri(&self) -> bool { self.flags.contains(PollFlags::PRI) } diff --git a/src/port.rs b/src/port.rs index be8668b..039c851 100644 --- a/src/port.rs +++ b/src/port.rs @@ -220,6 +220,7 @@ pub struct EventExtra { impl EventExtra { /// Create a new, empty version of this struct. + #[inline] pub fn empty() -> EventExtra { EventExtra { flags: PollFlags::empty(), @@ -227,21 +228,25 @@ impl EventExtra { } /// Set the interrupt flag. - pub fn set_hup(&mut self) { - self.flags.set(PollFlags::HUP, true); + #[inline] + pub fn set_hup(&mut self, value: bool) { + self.flags.set(PollFlags::HUP, value); } /// Set the priority flag. - pub fn set_pri(&mut self) { - self.flags.set(PollFlags::PRI, true); + #[inline] + pub fn set_pri(&mut self, value: bool) { + self.flags.set(PollFlags::PRI, value); } /// Is this an interrupt event? + #[inline] pub fn is_hup(&self) -> bool { self.flags.contains(PollFlags::HUP) } /// Is this a priority event? + #[inline] pub fn is_pri(&self) -> bool { self.flags.contains(PollFlags::PRI) } diff --git a/tests/concurrent_modification.rs b/tests/concurrent_modification.rs index b0333b2..0797b0f 100644 --- a/tests/concurrent_modification.rs +++ b/tests/concurrent_modification.rs @@ -34,8 +34,10 @@ fn concurrent_add() -> io::Result<()> { result?; assert_eq!(events.len(), 1); - assert!(events.iter().next().unwrap().readable); - assert!(!events.iter().next().unwrap().writable); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(0) + ); Ok(()) } @@ -66,8 +68,10 @@ fn concurrent_modify() -> io::Result<()> { .collect::>()?; assert_eq!(events.len(), 1); - assert!(events.iter().next().unwrap().readable); - assert!(!events.iter().next().unwrap().writable); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(0) + ); Ok(()) } diff --git a/tests/io.rs b/tests/io.rs index 404f389..2e6ce04 100644 --- a/tests/io.rs +++ b/tests/io.rs @@ -31,9 +31,10 @@ fn basic_io() { ); assert_eq!(events.len(), 1); - assert!(events.iter().next().unwrap().readable); - assert!(!events.iter().next().unwrap().writable); - + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(1) + ); poller.delete(&read).unwrap(); } diff --git a/tests/many_connections.rs b/tests/many_connections.rs index 2a8807b..6a74c9e 100644 --- a/tests/many_connections.rs +++ b/tests/many_connections.rs @@ -44,9 +44,10 @@ fn many_connections() { // Check that the connection is readable. let current_events = events.iter().collect::>(); assert_eq!(current_events.len(), 1, "events: {:?}", current_events); - assert_eq!(current_events[0].key, id); - assert!(current_events[0].readable); - assert!(!current_events[0].writable); + assert_eq!( + current_events[0].with_no_extra(), + polling::Event::readable(id) + ); // Read the byte from the connection. let mut buf = [0]; diff --git a/tests/notify.rs b/tests/notify.rs index 447fad1..7dff0c3 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -14,6 +14,7 @@ fn simple() -> io::Result<()> { for _ in 0..10 { poller.notify()?; poller.wait(&mut events, None)?; + assert!(events.is_empty()); } Ok(()) diff --git a/tests/other_modes.rs b/tests/other_modes.rs index e0d8cf2..407e42b 100644 --- a/tests/other_modes.rs +++ b/tests/other_modes.rs @@ -40,9 +40,10 @@ fn level_triggered() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events.iter().next().unwrap().key, reader_token); - assert!(events.iter().next().unwrap().readable); - assert!(!events.iter().next().unwrap().writable); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); // If we read some of the data, the notification should still be available. reader.read_exact(&mut [0; 3]).unwrap(); @@ -52,9 +53,10 @@ fn level_triggered() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events.iter().next().unwrap().key, reader_token); - assert!(events.iter().next().unwrap().readable); - assert!(!events.iter().next().unwrap().writable); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); // If we read the rest of the data, the notification should be gone. reader.read_exact(&mut [0; 2]).unwrap(); @@ -63,7 +65,7 @@ fn level_triggered() { .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events.iter().collect::>(), []); + assert!(events.is_empty()); // After modifying the stream and sending more data, it should be oneshot. poller @@ -79,9 +81,10 @@ fn level_triggered() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events.iter().next().unwrap().key, reader_token); - assert!(events.iter().next().unwrap().readable); - assert!(!events.iter().next().unwrap().writable); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); // After reading, the notification should vanish. reader.read(&mut [0; 5]).unwrap(); @@ -90,7 +93,7 @@ fn level_triggered() { .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events.iter().collect::>(), []); + assert!(events.is_empty()); } #[test] @@ -139,9 +142,10 @@ fn edge_triggered() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events.iter().next().unwrap().key, reader_token); - assert!(events.iter().next().unwrap().readable); - assert!(!events.iter().next().unwrap().writable); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); // If we read some of the data, the notification should not still be available. reader.read_exact(&mut [0; 3]).unwrap(); @@ -149,7 +153,7 @@ fn edge_triggered() { poller .wait(&mut events, Some(Duration::from_secs(0))) .unwrap(); - assert_eq!(events.iter().collect::>(), []); + assert!(events.is_empty()); // If we write more data, a notification should be delivered. writer.write_all(&data).unwrap(); @@ -159,9 +163,10 @@ fn edge_triggered() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events.iter().next().unwrap().key, reader_token); - assert!(events.iter().next().unwrap().readable); - assert!(!events.iter().next().unwrap().writable); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); // After modifying the stream and sending more data, it should be oneshot. poller @@ -175,9 +180,10 @@ fn edge_triggered() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events.iter().next().unwrap().key, reader_token); - assert!(events.iter().next().unwrap().readable); - assert!(!events.iter().next().unwrap().writable); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); } #[test] @@ -232,8 +238,10 @@ fn edge_oneshot_triggered() { .unwrap(); assert_eq!(events.len(), 1); - assert!(events.iter().next().unwrap().readable); - assert!(!events.iter().next().unwrap().writable); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); // If we read some of the data, the notification should not still be available. reader.read_exact(&mut [0; 3]).unwrap(); @@ -257,8 +265,10 @@ fn edge_oneshot_triggered() { .unwrap(); assert_eq!(events.len(), 1); - assert!(events.iter().next().unwrap().readable); - assert!(!events.iter().next().unwrap().writable); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(reader_token) + ); } fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> { diff --git a/tests/windows_post.rs b/tests/windows_post.rs index c0bef9f..33c3b6c 100644 --- a/tests/windows_post.rs +++ b/tests/windows_post.rs @@ -20,7 +20,10 @@ fn post_smoke() { poller.wait(&mut events, None).unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events.iter().next().unwrap(), Event::readable(1)); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::readable(1) + ); } #[test] @@ -47,9 +50,10 @@ fn post_multithread() { .unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events.iter().next().unwrap().key, i); - assert!(!events.iter().next().unwrap().readable); - assert!(events.iter().next().unwrap().writable); + assert_eq!( + events.iter().next().unwrap().with_no_extra(), + Event::writable(i) + ); events.clear(); } diff --git a/tests/windows_waitable.rs b/tests/windows_waitable.rs index fd05983..dcd0edc 100644 --- a/tests/windows_waitable.rs +++ b/tests/windows_waitable.rs @@ -100,8 +100,7 @@ fn smoke() { .unwrap(); assert_eq!(events.len(), 1); - assert!(events.iter().next().unwrap().readable); - assert!(events.iter().next().unwrap().writable); + assert_eq!(events.iter().next().unwrap().with_no_extra(), Event::all(0)); // Interest should be cleared. events.clear(); @@ -122,8 +121,7 @@ fn smoke() { .unwrap(); assert_eq!(events.len(), 1); - assert!(events.iter().next().unwrap().readable); - assert!(events.iter().next().unwrap().writable); + assert_eq!(events.iter().next().unwrap().with_no_extra(), Event::all(0)); // If we reset the event, it should not be signaled. event.reset().unwrap(); From c743a7c5e3ed18ab192145422f63c6b8554b88dc Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 13 Aug 2023 15:43:42 -0700 Subject: [PATCH 7/8] Move completion packets into the Events buffer This removes one of the mutexes that we have to lock. Signed-off-by: John Nunley --- src/iocp/mod.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index 4d01cc3..87f2950 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -108,11 +108,6 @@ pub(super) struct Poller { /// whether pending updates should be run immediately or queued. polling: AtomicBool, - /// A list of completion packets. - /// - /// The IOCP writes to this list when it has new events. - packets: Mutex>>, - /// The packet used to notify the poller. /// /// This is a special-case packet that is used to wake up the poller when it is waiting. @@ -149,7 +144,6 @@ impl Poller { waitables: RwLock::new(HashMap::new()), pending_updates: ConcurrentQueue::bounded(1024), polling: AtomicBool::new(false), - packets: Mutex::new(Vec::with_capacity(1024)), notifier: Arc::pin( PacketInner::Wakeup { _pinned: PhantomPinned, @@ -434,7 +428,6 @@ impl Poller { // Make sure we have a consistent timeout. let deadline = timeout.and_then(|timeout| Instant::now().checked_add(timeout)); - let mut packets = lock!(self.packets.lock()); let mut notified = false; events.packets.clear(); @@ -458,7 +451,7 @@ impl Poller { let timeout = deadline.map(|t| t.saturating_duration_since(Instant::now())); // Wait for I/O events. - let len = self.port.wait(&mut packets, timeout)?; + let len = self.port.wait(&mut events.completions, timeout)?; tracing::trace!( handle = ?self.port, res = ?len, @@ -468,7 +461,7 @@ impl Poller { drop(guard); // Process all of the events. - for entry in packets.drain(..) { + for entry in events.completions.drain(..) { let packet = entry.into_packet(); // Feed the event into the packet. @@ -616,6 +609,9 @@ impl AsHandle for Poller { pub(super) struct Events { /// List of IOCP packets. packets: Vec, + + /// Buffer for completion packets. + completions: Vec>, } unsafe impl Send for Events {} @@ -625,6 +621,7 @@ impl Events { pub fn with_capacity(cap: usize) -> Events { Events { packets: Vec::with_capacity(cap), + completions: Vec::with_capacity(cap), } } From 8e4e7d01ab8facfd753216639483b22541b256b3 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 13 Aug 2023 16:02:43 -0700 Subject: [PATCH 8/8] Review comments Signed-off-by: John Nunley --- src/iocp/mod.rs | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index 87f2950..1597d43 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -783,10 +783,7 @@ impl PacketUnwrapped { // If there was a change, indicate that we need an update. match socket.status { SocketStatus::Polling { flags } => { - let our_flags = - event_to_afd_mask(interest.readable, interest.writable, true) - | interest.extra.flags; - + let our_flags = event_to_afd_mask(socket.interest, socket.interest_error); our_flags != flags } _ => true, @@ -877,12 +874,7 @@ impl PacketUnwrapped { SocketStatus::Polling { flags } => { // If we need to poll for events aside from what we are currently polling, we need // to update the packet. Cancel the ongoing poll. - let our_flags = event_to_afd_mask( - socket.interest.readable, - socket.interest.writable, - socket.interest_error, - ) | socket.interest.extra.flags; - + let our_flags = event_to_afd_mask(socket.interest, socket.interest_error); if our_flags != flags { return self.cancel(socket); } @@ -899,11 +891,7 @@ impl PacketUnwrapped { SocketStatus::Idle => { // Start a new poll. - let mask = event_to_afd_mask( - socket.interest.readable, - socket.interest.writable, - socket.interest_error, - ) | socket.interest.extra.flags; + let mask = event_to_afd_mask(socket.interest, socket.interest_error); let result = socket.afd.poll(self.clone(), socket.base_socket, mask); match result { @@ -1262,7 +1250,14 @@ impl WaitHandle { } /// Translate an event to the mask expected by AFD. -fn event_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPollMask { +#[inline] +fn event_to_afd_mask(event: Event, error: bool) -> afd::AfdPollMask { + event_properties_to_afd_mask(event.readable, event.writable, error) | event.extra.flags +} + +/// Translate an event to the mask expected by AFD. +#[inline] +fn event_properties_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPollMask { use afd::AfdPollMask as AfdPoll; let mut mask = AfdPoll::empty(); @@ -1284,6 +1279,7 @@ fn event_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPol } /// Convert the mask reported by AFD to an event. +#[inline] fn afd_mask_to_event(mask: afd::AfdPollMask) -> (bool, bool) { use afd::AfdPollMask as AfdPoll;