Skip to content

Commit

Permalink
Add PRI and HUP events
Browse files Browse the repository at this point in the history
Signed-off-by: John Nunley <dev@notgull.net>
  • Loading branch information
notgull committed Aug 9, 2023
1 parent 74a162a commit 8983311
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 27 deletions.
24 changes: 22 additions & 2 deletions src/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl Poller {
&self.epoll_fd,
unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
epoll::EventData::new_u64(ev.key as u64),
epoll_flags(&ev, mode),
epoll_flags(&ev, mode) | ev.extra.flags,
)?;

Ok(())
Expand All @@ -122,7 +122,7 @@ impl Poller {
&self.epoll_fd,
fd,
epoll::EventData::new_u64(ev.key as u64),
epoll_flags(&ev, mode),
epoll_flags(&ev, mode) | ev.extra.flags,
)?;

Ok(())
Expand Down Expand Up @@ -339,4 +339,24 @@ impl EventExtra {
flags: epoll::EventFlags::empty(),
}
}

/// Add the interrupt flag to this event.
pub fn set_hup(&mut self) {
self.flags |= epoll::EventFlags::HUP;
}

/// Add the priority flag to this event.
pub fn set_pri(&mut self) {
self.flags |= epoll::EventFlags::PRI;
}

/// Tell if the interrupt flag is set.
pub fn is_hup(&self) -> bool {
self.flags.contains(epoll::EventFlags::HUP)
}

/// Tell if the priority flag is set.
pub fn is_pri(&self) -> bool {
self.flags.contains(epoll::EventFlags::PRI)
}
}
14 changes: 14 additions & 0 deletions src/iocp/afd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ impl ops::BitOrAssign for AfdPollMask {
}
}

impl ops::BitAnd for AfdPollMask {
type Output = Self;

fn bitand(self, rhs: Self) -> Self {
AfdPollMask(self.0 & rhs.0)
}
}

impl ops::BitAndAssign for AfdPollMask {
fn bitand_assign(&mut self, rhs: Self) {
self.0 &= rhs.0;
}
}

pub(super) trait HasAfdInfo {
fn afd_info(self: Pin<&Self>) -> Pin<&UnsafeCell<AfdPollInfo>>;
}
Expand Down
75 changes: 50 additions & 25 deletions src/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,26 @@ impl EventExtra {
flags: AfdPollMask::empty(),
}
}

/// Is this a HUP event?
pub fn is_hup(&self) -> bool {
self.flags.intersects(AfdPollMask::ABORT)
}

/// Is this a PRI event?
pub fn is_pri(&self) -> bool {
self.flags.intersects(AfdPollMask::RECEIVE_EXPEDITED)
}

/// Set up a listener for HUP events.
pub fn set_hup(&mut self) {
self.flags |= AfdPollMask::ABORT;
}

/// Set up a listener for PRI events.
pub fn set_pri(&mut self) {
self.flags |= AfdPollMask::RECEIVE_EXPEDITED;
}
}

/// A packet used to wake up the poller with an event.
Expand Down Expand Up @@ -620,8 +640,11 @@ impl PacketUnwrapped {

// If there was a change, indicate that we need an update.
match socket.status {
SocketStatus::Polling { readable, writable } => {
(interest.readable && !readable) || (interest.writable && !writable)
SocketStatus::Polling { flags } => {
let our_flags = event_to_afd_mask(interest.readable, interest.writable, true)
| interest.extra.flags;

our_flags != flags
}
_ => true,
}
Expand All @@ -648,12 +671,16 @@ impl PacketUnwrapped {

// Check the current status.
match socket.status {
SocketStatus::Polling { readable, writable } => {
SocketStatus::Polling { flags } => {
// If we need to poll for events aside from what we are currently polling, we need
// to update the packet. Cancel the ongoing poll.
if (socket.interest.readable && !readable)
|| (socket.interest.writable && !writable)
{
let our_flags = event_to_afd_mask(
socket.interest.readable,
socket.interest.writable,
socket.interest_error,
) | socket.interest.extra.flags;

if our_flags != flags {
return self.cancel(socket);
}

Expand All @@ -669,15 +696,12 @@ impl PacketUnwrapped {

SocketStatus::Idle => {
// Start a new poll.
let result = socket.afd.poll(
self.clone(),
socket.base_socket,
event_to_afd_mask(
socket.interest.readable,
socket.interest.writable,
socket.interest_error,
),
);
let mask = event_to_afd_mask(
socket.interest.readable,
socket.interest.writable,
socket.interest_error,
) | socket.interest.extra.flags;
let result = socket.afd.poll(self.clone(), socket.base_socket, mask);

match result {
Ok(()) => {}
Expand All @@ -698,10 +722,7 @@ impl PacketUnwrapped {
}

// We are now polling for the current events.
socket.status = SocketStatus::Polling {
readable: socket.interest.readable,
writable: socket.interest.writable,
};
socket.status = SocketStatus::Polling { flags: mask };

Ok(())
}
Expand Down Expand Up @@ -774,6 +795,7 @@ impl PacketUnwrapped {
let (readable, writable) = afd_mask_to_event(events);
event.readable = readable;
event.writable = writable;
event.extra.flags = events;
}
}
}
Expand All @@ -785,7 +807,13 @@ impl PacketUnwrapped {

// If this event doesn't have anything that interests us, don't return or
// update the oneshot state.
let return_value = if event.readable || event.writable {
let return_value = if event.readable
|| event.writable
|| event
.extra
.flags
.intersects(socket_state.interest.extra.flags)
{
// If we are in oneshot mode, remove the interest.
if matches!(socket_state.mode, PollMode::Oneshot) {
socket_state.interest = Event::none(socket_state.interest.key);
Expand Down Expand Up @@ -887,11 +915,8 @@ enum SocketStatus {

/// We are currently polling these events.
Polling {
/// We are currently polling for readable events.
readable: bool,

/// We are currently polling for writable events.
writable: bool,
/// The flags we are currently polling for.
flags: AfdPollMask,
},

/// The last poll operation was cancelled, and we're waiting for it to
Expand Down
20 changes: 20 additions & 0 deletions src/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,26 @@ impl EventExtra {
pub fn empty() -> EventExtra {
EventExtra
}

/// Set the interrupt flag.
pub fn set_hup(&mut self) {
// No-op.
}

/// Set the priority flag.
pub fn set_pri(&mut self) {
// No-op.
}

/// Is the interrupt flag set?
pub fn is_hup(&self) -> bool {
false
}

/// Is the priority flag set?
pub fn is_pri(&self) -> bool {
false
}
}

pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags {
Expand Down
104 changes: 104 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,110 @@ impl Event {
extra: sys::EventExtra::empty(),
}
}

/// Add interruption events to this interest.
///
/// This usually indicates that the file descriptor or socket has been closed. It corresponds
/// to the `EPOLLHUP` and `POLLHUP` events.
///
/// Interruption events are only supported on the following platforms:
///
/// - `epoll`
/// - `poll`
/// - IOCP
/// - Event Ports
///
/// On other platforms, this function is a no-op.
pub fn set_interrupt(&mut self) {
self.extra.set_hup();
}

/// Add interruption events to this interest.
///
/// This usually indicates that the file descriptor or socket has been closed. It corresponds
/// to the `EPOLLHUP` and `POLLHUP` events.
///
/// Interruption events are only supported on the following platforms:
///
/// - `epoll`
/// - `poll`
/// - IOCP
/// - Event Ports
///
/// On other platforms, this function is a no-op.
pub fn with_interrupt(mut self) -> Self {
self.set_interrupt();
self
}

/// Add priority events to this interest.
///
/// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and
/// `POLLPRI` events.
///
/// Priority events are only supported on the following platforms:
///
/// - `epoll`
/// - `poll`
/// - IOCP
/// - Event Ports
///
/// On other platforms, this function is a no-op.
pub fn set_priority(&mut self) {
self.extra.set_pri();
}

/// Add priority events to this interest.
///
/// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and
/// `POLLPRI` events.
///
/// Priority events are only supported on the following platforms:
///
/// - `epoll`
/// - `poll`
/// - IOCP
/// - Event Ports
///
/// On other platforms, this function is a no-op.
pub fn with_priority(mut self) -> Self {
self.set_priority();
self
}

/// Tell if this event is the result of an interrupt notification.
///
/// This usually indicates that the file descriptor or socket has been closed. It corresponds
/// to the `EPOLLHUP` and `POLLHUP` events.
///
/// Interruption events are only supported on the following platforms:
///
/// - `epoll`
/// - `poll`
/// - IOCP
/// - Event Ports
///
/// On other platforms, this always returns `false`.
pub fn is_interrupt(&self) -> bool {
self.extra.is_hup()
}

/// Tell if this event is the result of a priority notification.
///
/// This indicates that there is urgent data to read. It corresponds to the `EPOLLPRI` and
/// `POLLPRI` events.
///
/// Priority events are only supported on the following platforms:
///
/// - `epoll`
/// - `poll`
/// - IOCP
/// - Event Ports
///
/// On other platforms, this always returns `false`.
pub fn is_priority(&self) -> bool {
self.extra.is_pri()
}
}

/// Waits for I/O events.
Expand Down
20 changes: 20 additions & 0 deletions src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,26 @@ impl EventExtra {
flags: PollFlags::empty(),
}
}

/// Set the interrupt flag.
pub fn set_hup(&mut self) {
self.flags.set(PollFlags::HUP, true);
}

/// Set the priority flag.
pub fn set_pri(&mut self) {
self.flags.set(PollFlags::PRI, true);
}

/// Is this an interrupt event?
pub fn is_hup(&self) -> bool {
self.flags.contains(PollFlags::HUP)
}

/// Is this a priority event?
pub fn is_pri(&self) -> bool {
self.flags.contains(PollFlags::PRI)
}
}

fn cvt_mode_as_remove(mode: PollMode) -> io::Result<bool> {
Expand Down
20 changes: 20 additions & 0 deletions src/port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,24 @@ impl EventExtra {
flags: PollFlags::empty(),
}
}

/// Set the interrupt flag.
pub fn set_hup(&mut self) {
self.flags.set(PollFlags::HUP, true);
}

/// Set the priority flag.
pub fn set_pri(&mut self) {
self.flags.set(PollFlags::PRI, true);
}

/// Is this an interrupt event?
pub fn is_hup(&self) -> bool {
self.flags.contains(PollFlags::HUP)
}

/// Is this a priority event?
pub fn is_pri(&self) -> bool {
self.flags.contains(PollFlags::PRI)
}
}

0 comments on commit 8983311

Please sign in to comment.