From 89833118c2d62e1ff7df26f7244559c75c6ded0a Mon Sep 17 00:00:00 2001 From: John Nunley Date: Tue, 8 Aug 2023 19:52:43 -0700 Subject: [PATCH] Add PRI and HUP events Signed-off-by: John Nunley --- src/epoll.rs | 24 ++++++++++- src/iocp/afd.rs | 14 +++++++ src/iocp/mod.rs | 75 ++++++++++++++++++++++------------ src/kqueue.rs | 20 ++++++++++ src/lib.rs | 104 ++++++++++++++++++++++++++++++++++++++++++++++++ src/poll.rs | 20 ++++++++++ src/port.rs | 20 ++++++++++ 7 files changed, 250 insertions(+), 27 deletions(-) diff --git a/src/epoll.rs b/src/epoll.rs index 1dd570f..024af0f 100644 --- a/src/epoll.rs +++ b/src/epoll.rs @@ -102,7 +102,7 @@ impl Poller { &self.epoll_fd, unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) }, epoll::EventData::new_u64(ev.key as u64), - epoll_flags(&ev, mode), + epoll_flags(&ev, mode) | ev.extra.flags, )?; Ok(()) @@ -122,7 +122,7 @@ impl Poller { &self.epoll_fd, fd, epoll::EventData::new_u64(ev.key as u64), - epoll_flags(&ev, mode), + epoll_flags(&ev, mode) | ev.extra.flags, )?; Ok(()) @@ -339,4 +339,24 @@ impl EventExtra { flags: epoll::EventFlags::empty(), } } + + /// Add the interrupt flag to this event. + pub fn set_hup(&mut self) { + self.flags |= epoll::EventFlags::HUP; + } + + /// Add the priority flag to this event. + pub fn set_pri(&mut self) { + self.flags |= epoll::EventFlags::PRI; + } + + /// Tell if the interrupt flag is set. + pub fn is_hup(&self) -> bool { + self.flags.contains(epoll::EventFlags::HUP) + } + + /// Tell if the priority flag is set. + pub fn is_pri(&self) -> bool { + self.flags.contains(epoll::EventFlags::PRI) + } } diff --git a/src/iocp/afd.rs b/src/iocp/afd.rs index 89f478b..f6ff1c0 100644 --- a/src/iocp/afd.rs +++ b/src/iocp/afd.rs @@ -134,6 +134,20 @@ impl ops::BitOrAssign for AfdPollMask { } } +impl ops::BitAnd for AfdPollMask { + type Output = Self; + + fn bitand(self, rhs: Self) -> Self { + AfdPollMask(self.0 & rhs.0) + } +} + +impl ops::BitAndAssign for AfdPollMask { + fn bitand_assign(&mut self, rhs: Self) { + self.0 &= rhs.0; + } +} + pub(super) trait HasAfdInfo { fn afd_info(self: Pin<&Self>) -> Pin<&UnsafeCell>; } diff --git a/src/iocp/mod.rs b/src/iocp/mod.rs index c7640b3..2fe5256 100644 --- a/src/iocp/mod.rs +++ b/src/iocp/mod.rs @@ -526,6 +526,26 @@ impl EventExtra { flags: AfdPollMask::empty(), } } + + /// Is this a HUP event? + pub fn is_hup(&self) -> bool { + self.flags.intersects(AfdPollMask::ABORT) + } + + /// Is this a PRI event? + pub fn is_pri(&self) -> bool { + self.flags.intersects(AfdPollMask::RECEIVE_EXPEDITED) + } + + /// Set up a listener for HUP events. + pub fn set_hup(&mut self) { + self.flags |= AfdPollMask::ABORT; + } + + /// Set up a listener for PRI events. + pub fn set_pri(&mut self) { + self.flags |= AfdPollMask::RECEIVE_EXPEDITED; + } } /// A packet used to wake up the poller with an event. @@ -620,8 +640,11 @@ impl PacketUnwrapped { // If there was a change, indicate that we need an update. match socket.status { - SocketStatus::Polling { readable, writable } => { - (interest.readable && !readable) || (interest.writable && !writable) + SocketStatus::Polling { flags } => { + let our_flags = event_to_afd_mask(interest.readable, interest.writable, true) + | interest.extra.flags; + + our_flags != flags } _ => true, } @@ -648,12 +671,16 @@ impl PacketUnwrapped { // Check the current status. match socket.status { - SocketStatus::Polling { readable, writable } => { + SocketStatus::Polling { flags } => { // If we need to poll for events aside from what we are currently polling, we need // to update the packet. Cancel the ongoing poll. - if (socket.interest.readable && !readable) - || (socket.interest.writable && !writable) - { + let our_flags = event_to_afd_mask( + socket.interest.readable, + socket.interest.writable, + socket.interest_error, + ) | socket.interest.extra.flags; + + if our_flags != flags { return self.cancel(socket); } @@ -669,15 +696,12 @@ impl PacketUnwrapped { SocketStatus::Idle => { // Start a new poll. - let result = socket.afd.poll( - self.clone(), - socket.base_socket, - event_to_afd_mask( - socket.interest.readable, - socket.interest.writable, - socket.interest_error, - ), - ); + let mask = event_to_afd_mask( + socket.interest.readable, + socket.interest.writable, + socket.interest_error, + ) | socket.interest.extra.flags; + let result = socket.afd.poll(self.clone(), socket.base_socket, mask); match result { Ok(()) => {} @@ -698,10 +722,7 @@ impl PacketUnwrapped { } // We are now polling for the current events. - socket.status = SocketStatus::Polling { - readable: socket.interest.readable, - writable: socket.interest.writable, - }; + socket.status = SocketStatus::Polling { flags: mask }; Ok(()) } @@ -774,6 +795,7 @@ impl PacketUnwrapped { let (readable, writable) = afd_mask_to_event(events); event.readable = readable; event.writable = writable; + event.extra.flags = events; } } } @@ -785,7 +807,13 @@ impl PacketUnwrapped { // If this event doesn't have anything that interests us, don't return or // update the oneshot state. - let return_value = if event.readable || event.writable { + let return_value = if event.readable + || event.writable + || event + .extra + .flags + .intersects(socket_state.interest.extra.flags) + { // If we are in oneshot mode, remove the interest. if matches!(socket_state.mode, PollMode::Oneshot) { socket_state.interest = Event::none(socket_state.interest.key); @@ -887,11 +915,8 @@ enum SocketStatus { /// We are currently polling these events. Polling { - /// We are currently polling for readable events. - readable: bool, - - /// We are currently polling for writable events. - writable: bool, + /// The flags we are currently polling for. + flags: AfdPollMask, }, /// The last poll operation was cancelled, and we're waiting for it to diff --git a/src/kqueue.rs b/src/kqueue.rs index 2c7e511..aca51d5 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -271,6 +271,26 @@ impl EventExtra { pub fn empty() -> EventExtra { EventExtra } + + /// Set the interrupt flag. + pub fn set_hup(&mut self) { + // No-op. + } + + /// Set the priority flag. + pub fn set_pri(&mut self) { + // No-op. + } + + /// Is the interrupt flag set? + pub fn is_hup(&self) -> bool { + false + } + + /// Is the priority flag set? + pub fn is_pri(&self) -> bool { + false + } } pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags { diff --git a/src/lib.rs b/src/lib.rs index c02b117..bd748ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -228,6 +228,110 @@ impl Event { extra: sys::EventExtra::empty(), } } + + /// Add interruption events to this interest. + /// + /// This usually indicates that the file descriptor or socket has been closed. It corresponds + /// to the `EPOLLHUP` and `POLLHUP` events. + /// + /// Interruption events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this function is a no-op. + pub fn set_interrupt(&mut self) { + self.extra.set_hup(); + } + + /// Add interruption events to this interest. + /// + /// This usually indicates that the file descriptor or socket has been closed. It corresponds + /// to the `EPOLLHUP` and `POLLHUP` events. + /// + /// Interruption events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this function is a no-op. + pub fn with_interrupt(mut self) -> Self { + self.set_interrupt(); + self + } + + /// Add priority events to this interest. + /// + /// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and + /// `POLLPRI` events. + /// + /// Priority events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this function is a no-op. + pub fn set_priority(&mut self) { + self.extra.set_pri(); + } + + /// Add priority events to this interest. + /// + /// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and + /// `POLLPRI` events. + /// + /// Priority events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this function is a no-op. + pub fn with_priority(mut self) -> Self { + self.set_priority(); + self + } + + /// Tell if this event is the result of an interrupt notification. + /// + /// This usually indicates that the file descriptor or socket has been closed. It corresponds + /// to the `EPOLLHUP` and `POLLHUP` events. + /// + /// Interruption events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this always returns `false`. + pub fn is_interrupt(&self) -> bool { + self.extra.is_hup() + } + + /// Tell if this event is the result of a priority notification. + /// + /// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and + /// `POLLPRI` events. + /// + /// Priority events are only supported on the following platforms: + /// + /// - `epoll` + /// - `poll` + /// - IOCP + /// - Event Ports + /// + /// On other platforms, this always returns `false`. + pub fn is_priority(&self) -> bool { + self.extra.is_pri() + } } /// Waits for I/O events. diff --git a/src/poll.rs b/src/poll.rs index 59f3ce7..e73ea59 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -402,6 +402,26 @@ impl EventExtra { flags: PollFlags::empty(), } } + + /// Set the interrupt flag. + pub fn set_hup(&mut self) { + self.flags.set(PollFlags::HUP, true); + } + + /// Set the priority flag. + pub fn set_pri(&mut self) { + self.flags.set(PollFlags::PRI, true); + } + + /// Is this an interrupt event? + pub fn is_hup(&self) -> bool { + self.flags.contains(PollFlags::HUP) + } + + /// Is this a priority event? + pub fn is_pri(&self) -> bool { + self.flags.contains(PollFlags::PRI) + } } fn cvt_mode_as_remove(mode: PollMode) -> io::Result { diff --git a/src/port.rs b/src/port.rs index 6225228..be8668b 100644 --- a/src/port.rs +++ b/src/port.rs @@ -225,4 +225,24 @@ impl EventExtra { flags: PollFlags::empty(), } } + + /// Set the interrupt flag. + pub fn set_hup(&mut self) { + self.flags.set(PollFlags::HUP, true); + } + + /// Set the priority flag. + pub fn set_pri(&mut self) { + self.flags.set(PollFlags::PRI, true); + } + + /// Is this an interrupt event? + pub fn is_hup(&self) -> bool { + self.flags.contains(PollFlags::HUP) + } + + /// Is this a priority event? + pub fn is_pri(&self) -> bool { + self.flags.contains(PollFlags::PRI) + } }