Skip to content

Commit

Permalink
breaking: Extract the Events struct and make the Event struct opaque
Browse files Browse the repository at this point in the history
* Add better documentation for the IOCP module

* Extract Events from Poller

This prevents the need to have an intermediate buffer to read events
from, reducing the need for an allocation and a copy. This is a breaking
change.

* Add event extra information

Foundation for more details later on.

* Add PRI and HUP events
* Fix various failing tests

- Make sure that waitable handles interact properly with the new
  infrastructure
- Fix failing doctests

* Review comments

- Make set_* take a boolean for the value of the flag
- Make Events !Sync
- Fix visibility modifiers
- Inline more methods
- Use a better strategy for testing

* Move completion packets into the Events buffer

This removes one of the mutexes that we have to lock.

* Review comments

Signed-off-by: John Nunley <dev@notgull.net>
  • Loading branch information
notgull authored Aug 14, 2023
1 parent e42664d commit a521cd2
Show file tree
Hide file tree
Showing 20 changed files with 878 additions and 160 deletions.
6 changes: 3 additions & 3 deletions examples/two-listeners.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;
use std::net::TcpListener;

use polling::{Event, Poller};
use polling::{Event, Events, Poller};

fn main() -> io::Result<()> {
let l1 = TcpListener::bind("127.0.0.1:8001")?;
Expand All @@ -19,12 +19,12 @@ fn main() -> io::Result<()> {
println!(" $ nc 127.0.0.1 8001");
println!(" $ nc 127.0.0.1 8002");

let mut events = Vec::new();
let mut events = Events::new();
loop {
events.clear();
poller.wait(&mut events, None)?;

for ev in &events {
for ev in events.iter() {
match ev.key {
1 => {
println!("Accept on l1");
Expand Down
8 changes: 5 additions & 3 deletions examples/wait-signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
))]
mod example {
use polling::os::kqueue::{PollerKqueueExt, Signal};
use polling::{PollMode, Poller};
use polling::{Events, PollMode, Poller};

pub(super) fn main2() {
// Create a poller.
Expand All @@ -23,7 +23,7 @@ mod example {
let sigint = Signal(libc::SIGINT);
poller.add_filter(sigint, 1, PollMode::Oneshot).unwrap();

let mut events = vec![];
let mut events = Events::new();

println!("Press Ctrl+C to exit...");

Expand All @@ -32,7 +32,7 @@ mod example {
poller.wait(&mut events, None).unwrap();

// Process events.
for ev in events.drain(..) {
for ev in events.iter() {
match ev.key {
1 => {
println!("SIGINT received");
Expand All @@ -41,6 +41,8 @@ mod example {
_ => unreachable!(),
}
}

events.clear();
}
}
}
Expand Down
77 changes: 58 additions & 19 deletions src/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ impl Poller {

poller.add(
poller.event_fd.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
readable: true,
writable: false,
},
Event::readable(crate::NOTIFY_KEY),
PollMode::Oneshot,
)?;
}
Expand Down Expand Up @@ -106,7 +102,7 @@ impl Poller {
&self.epoll_fd,
unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
epoll::EventData::new_u64(ev.key as u64),
epoll_flags(&ev, mode),
epoll_flags(&ev, mode) | ev.extra.flags,
)?;

Ok(())
Expand All @@ -126,7 +122,7 @@ impl Poller {
&self.epoll_fd,
fd,
epoll::EventData::new_u64(ev.key as u64),
epoll_flags(&ev, mode),
epoll_flags(&ev, mode) | ev.extra.flags,
)?;

Ok(())
Expand Down Expand Up @@ -177,11 +173,7 @@ impl Poller {
// Set interest in timerfd.
self.modify(
timer_fd.as_fd(),
Event {
key: crate::NOTIFY_KEY,
readable: true,
writable: false,
},
Event::readable(crate::NOTIFY_KEY),
PollMode::Oneshot,
)?;
}
Expand Down Expand Up @@ -213,11 +205,7 @@ impl Poller {
let _ = read(&self.event_fd, &mut buf);
self.modify(
self.event_fd.as_fd(),
Event {
key: crate::NOTIFY_KEY,
readable: true,
writable: false,
},
Event::readable(crate::NOTIFY_KEY),
PollMode::Oneshot,
)?;
Ok(())
Expand Down Expand Up @@ -308,9 +296,9 @@ unsafe impl Send for Events {}

impl Events {
/// Creates an empty list.
pub fn new() -> Events {
pub fn with_capacity(cap: usize) -> Events {
Events {
list: epoll::EventVec::with_capacity(1024),
list: epoll::EventVec::with_capacity(cap),
}
}

Expand All @@ -322,7 +310,58 @@ impl Events {
key: ev.data.u64() as usize,
readable: flags.intersects(read_flags()),
writable: flags.intersects(write_flags()),
extra: EventExtra { flags },
}
})
}

/// Clear the list.
pub fn clear(&mut self) {
self.list.clear();
}

/// Get the capacity of the list.
pub fn capacity(&self) -> usize {
self.list.capacity()
}
}

/// Extra information about this event.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EventExtra {
flags: epoll::EventFlags,
}

impl EventExtra {
/// Create an empty version of the data.
#[inline]
pub fn empty() -> EventExtra {
EventExtra {
flags: epoll::EventFlags::empty(),
}
}

/// Add the interrupt flag to this event.
#[inline]
pub fn set_hup(&mut self, active: bool) {
self.flags.set(epoll::EventFlags::HUP, active);
}

/// Add the priority flag to this event.
#[inline]
pub fn set_pri(&mut self, active: bool) {
self.flags.set(epoll::EventFlags::PRI, active);
}

/// Tell if the interrupt flag is set.
#[inline]
pub fn is_hup(&self) -> bool {
self.flags.contains(epoll::EventFlags::HUP)
}

/// Tell if the priority flag is set.
#[inline]
pub fn is_pri(&self) -> bool {
self.flags.contains(epoll::EventFlags::PRI)
}
}
54 changes: 53 additions & 1 deletion src/iocp/afd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl AfdPollInfo {
}
}

#[derive(Default, Copy, Clone)]
#[derive(Default, Copy, Clone, PartialEq, Eq)]
#[repr(transparent)]
pub(super) struct AfdPollMask(u32);

Expand All @@ -89,6 +89,44 @@ impl AfdPollMask {
pub(crate) fn intersects(self, other: AfdPollMask) -> bool {
(self.0 & other.0) != 0
}

/// Sets a flag.
pub(crate) fn set(&mut self, other: AfdPollMask, value: bool) {
if value {
*self |= other;
} else {
self.0 &= !other.0;
}
}
}

impl fmt::Debug for AfdPollMask {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
const FLAGS: &[(&str, AfdPollMask)] = &[
("RECEIVE", AfdPollMask::RECEIVE),
("RECEIVE_EXPEDITED", AfdPollMask::RECEIVE_EXPEDITED),
("SEND", AfdPollMask::SEND),
("DISCONNECT", AfdPollMask::DISCONNECT),
("ABORT", AfdPollMask::ABORT),
("LOCAL_CLOSE", AfdPollMask::LOCAL_CLOSE),
("ACCEPT", AfdPollMask::ACCEPT),
("CONNECT_FAIL", AfdPollMask::CONNECT_FAIL),
];

let mut first = true;
for (name, value) in FLAGS {
if self.intersects(*value) {
if !first {
write!(f, " | ")?;
}

first = false;
write!(f, "{}", name)?;
}
}

Ok(())
}
}

impl ops::BitOr for AfdPollMask {
Expand All @@ -105,6 +143,20 @@ impl ops::BitOrAssign for AfdPollMask {
}
}

impl ops::BitAnd for AfdPollMask {
type Output = Self;

fn bitand(self, rhs: Self) -> Self {
AfdPollMask(self.0 & rhs.0)
}
}

impl ops::BitAndAssign for AfdPollMask {
fn bitand_assign(&mut self, rhs: Self) {
self.0 &= rhs.0;
}
}

pub(super) trait HasAfdInfo {
fn afd_info(self: Pin<&Self>) -> Pin<&UnsafeCell<AfdPollInfo>>;
}
Expand Down
Loading

0 comments on commit a521cd2

Please sign in to comment.