Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract the Events struct and make the Event struct opaque #133

Merged
merged 8 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/two-listeners.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;
use std::net::TcpListener;

use polling::{Event, Poller};
use polling::{Event, Events, Poller};

fn main() -> io::Result<()> {
let l1 = TcpListener::bind("127.0.0.1:8001")?;
Expand All @@ -19,12 +19,12 @@ fn main() -> io::Result<()> {
println!(" $ nc 127.0.0.1 8001");
println!(" $ nc 127.0.0.1 8002");

let mut events = Vec::new();
let mut events = Events::new();
loop {
events.clear();
poller.wait(&mut events, None)?;

for ev in &events {
for ev in events.iter() {
match ev.key {
1 => {
println!("Accept on l1");
Expand Down
8 changes: 5 additions & 3 deletions examples/wait-signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
))]
mod example {
use polling::os::kqueue::{PollerKqueueExt, Signal};
use polling::{PollMode, Poller};
use polling::{Events, PollMode, Poller};

pub(super) fn main2() {
// Create a poller.
Expand All @@ -23,7 +23,7 @@ mod example {
let sigint = Signal(libc::SIGINT);
poller.add_filter(sigint, 1, PollMode::Oneshot).unwrap();

let mut events = vec![];
let mut events = Events::new();

println!("Press Ctrl+C to exit...");

Expand All @@ -32,7 +32,7 @@ mod example {
poller.wait(&mut events, None).unwrap();

// Process events.
for ev in events.drain(..) {
for ev in events.iter() {
match ev.key {
1 => {
println!("SIGINT received");
Expand All @@ -41,6 +41,8 @@ mod example {
_ => unreachable!(),
}
}

events.clear();
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,9 @@ unsafe impl Send for Events {}

impl Events {
/// Creates an empty list.
pub fn new() -> Events {
pub fn with_capacity(cap: usize) -> Events {
Events {
list: epoll::EventVec::with_capacity(1024),
list: epoll::EventVec::with_capacity(cap),
}
}

Expand All @@ -325,4 +325,14 @@ impl Events {
}
})
}

/// Clear the list.
pub fn clear(&mut self) {
self.list.clear();
}

/// Get the capacity of the list.
pub fn capacity(&self) -> usize {
self.list.capacity()
}
}
33 changes: 19 additions & 14 deletions src/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ pub(super) struct Poller {
port: Arc<IoCompletionPort<Packet>>,

/// List of currently active AFD instances.
///
///
/// AFD acts as the actual source of the socket events. It's essentially running `WSAPoll` on
/// the sockets and then posting the events to the IOCP.
///
///
/// AFD instances can be keyed to an unlimited number of sockets. However, each AFD instance
/// polls their sockets linearly. Therefore, it is best to limit the number of sockets each AFD
/// instance is responsible for. The limit of 32 is chosen because that's what `wepoll` uses.
Expand All @@ -87,34 +87,34 @@ pub(super) struct Poller {
afd: Mutex<Vec<Weak<Afd<Packet>>>>,

/// The state of the sources registered with this poller.
///
///
/// Each source is keyed by its raw socket ID.
sources: RwLock<HashMap<RawSocket, Packet>>,

/// The state of the waitable handles registered with this poller.
waitables: RwLock<HashMap<RawHandle, Packet>>,

/// Sockets with pending updates.
///
///
/// This list contains packets with sockets that need to have their AFD state adjusted by
/// calling the `update()` function on them. It's best to queue up packets as they need to
/// be updated and then run all of the updates before we start waiting on the IOCP, rather than
/// updating them as we come. If we're waiting on the IOCP updates should be run immediately.
pending_updates: ConcurrentQueue<Packet>,

/// Are we currently polling?
///
///
/// This indicates whether or not we are blocking on the IOCP, and is used to determine
/// whether pending updates should be run immediately or queued.
polling: AtomicBool,

/// A list of completion packets.
///
///
/// The IOCP writes to this list when it has new events.
packets: Mutex<Vec<OverlappedEntry<Packet>>>,

/// The packet used to notify the poller.
///
///
/// This is a special-case packet that is used to wake up the poller when it is waiting.
notifier: Packet,
}
Expand Down Expand Up @@ -548,7 +548,7 @@ impl Poller {
}

/// Get a handle to the AFD reference.
///
///
/// This finds an AFD handle with less than 32 associated sockets, or creates a new one if
/// one does not exist.
fn afd_handle(&self) -> io::Result<Arc<Afd<Packet>>> {
Expand Down Expand Up @@ -622,16 +622,21 @@ unsafe impl Send for Events {}

impl Events {
/// Creates an empty list of events.
pub(super) fn new() -> Events {
pub(super) fn with_capacity(cap: usize) -> Events {
Events {
packets: Vec::with_capacity(1024),
packets: Vec::with_capacity(cap),
}
}

/// Iterate over I/O events.
pub(super) fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.packets.iter().copied()
}

/// Clear the list of events.
pub(super) fn clear(&mut self) {
self.packets.clear();
}
notgull marked this conversation as resolved.
Show resolved Hide resolved
}

/// A packet used to wake up the poller with an event.
Expand All @@ -656,7 +661,7 @@ impl CompletionPacket {
}

/// The type of our completion packet.
///
///
/// It needs to be pinned, since it contains data that is expected by IOCP not to be moved.
type Packet = Pin<Arc<PacketUnwrapped>>;
type PacketUnwrapped = IoStatusBlock<PacketInner>;
Expand Down Expand Up @@ -754,9 +759,9 @@ impl PacketUnwrapped {
}

/// Update the socket and install the new status in AFD.
///
///
/// This function does one of the following:
///
///
/// - Nothing, if the packet is waiting on being dropped anyways.
/// - Cancels the ongoing poll, if we want to poll for different events than we are currently
/// polling for.
Expand Down Expand Up @@ -882,7 +887,7 @@ impl PacketUnwrapped {
}

/// This socket state was notified; see if we need to update it.
///
///
/// This indicates that this packet was indicated as "ready" by the IOCP and needs to be
/// processed.
fn feed_event(self: Pin<Arc<Self>>, poller: &Poller) -> io::Result<FeedEventResult> {
Expand Down
14 changes: 12 additions & 2 deletions src/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ unsafe impl Send for Events {}

impl Events {
/// Creates an empty list.
pub fn new() -> Events {
pub fn with_capacity(cap: usize) -> Events {
Events {
list: Vec::with_capacity(1024),
list: Vec::with_capacity(cap),
}
}

Expand All @@ -249,6 +249,16 @@ impl Events {
&& (ev.flags().intersects(kqueue::EventFlags::EOF))),
})
}

/// Clears the list.
pub fn clear(&mut self) {
self.list.clear();
}

/// Get the capacity of the list.
pub fn capacity(&self) -> usize {
self.list.capacity()
}
}

pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags {
Expand Down
Loading