Skip to content

Commit

Permalink
doc: add comments for psn backend
Browse files Browse the repository at this point in the history
  • Loading branch information
Berrysoft committed Jun 11, 2024
1 parent 5dacb1a commit 6104c0d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
5 changes: 5 additions & 0 deletions src/iocp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
//! Bindings to Windows I/O Completion Ports.
//!
//! There are two implementations. The `wepoll` one is classical and uses AFD subsystem.
//! The `psn` one uses `ProcessSocketNotifications` and need Windows 10 21H1+.
cfg_if::cfg_if! {
if #[cfg(feature = "iocp-psn")] {
mod psn;
Expand Down
31 changes: 28 additions & 3 deletions src/iocp/psn/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
//! Bindings to Windows IOCP with `ProcessSocketNotifications` and
//! `NtAssociateWaitCompletionPacket` support.
//!
//! `ProcessSocketNotifications` is a new Windows API after 21H1. It is much like kqueue,
//! and support edge triggers. The implementation is easier to be adapted to the crate's API.
//! However, there are some behaviors different from other platforms:
//! - The `psn` poller distingushes "disabled" state and "removed" state. When the registration
//! disabled, the notifications won't be queued to the poller.
//! - The edge trigger only triggers condition changes after it is enabled. You cannot expect
//! an event coming if you change the condition before registering the notification.
//! - A socket can be registered to only one IOCP at a time.
//!
//! `NtAssociateWaitCompletionPacket` is an undocumented API and it's the back of thread pool
//! APIs like `RegisterWaitForSingleObject`. We use it to avoid starting thread pools. It only
//! supports `Oneshot` mode.
mod wait;

use std::collections::HashMap;
Expand Down Expand Up @@ -33,14 +49,18 @@ use crate::{Event, PollMode, NOTIFY_KEY};
pub struct Poller {
/// The I/O completion port.
port: Arc<OwnedHandle>,
/// Attribute map.
sources: RwLock<HashMap<usize, SourceAttr>>,
}

/// Attributes of added sources.
#[derive(Debug)]
pub(crate) enum SourceAttr {
Socket {
key: usize,
},
/// A socket with key.
Socket { key: usize },
/// A waitable object with key and [`WaitCompletionPacket`].
///
/// [`WaitCompletionPacket`]: wait::WaitCompletionPacket
Waitable {
key: usize,
packet: wait::WaitCompletionPacket,
Expand Down Expand Up @@ -139,6 +159,7 @@ impl Poller {
}
}

/// Add a new waitable to the poller.
pub(crate) fn add_waitable(
&self,
handle: RawHandle,
Expand Down Expand Up @@ -180,6 +201,7 @@ impl Poller {
)
}

/// Update a waitable in the poller.
pub(crate) fn modify_waitable(
&self,
waitable: RawHandle,
Expand Down Expand Up @@ -215,6 +237,7 @@ impl Poller {
})
}

/// Delete a waitable from the poller.
pub(crate) fn remove_waitable(&self, waitable: RawHandle) -> io::Result<()> {
tracing::trace!("remove: handle={:?}, waitable={:p}", self.port, waitable);

Expand Down Expand Up @@ -286,6 +309,7 @@ impl Poller {
.ok_or_else(|| io::Error::from(io::ErrorKind::NotFound))
}

/// Add or modify the registration.
unsafe fn update_source(&self, mut reg: SOCK_NOTIFY_REGISTRATION) -> io::Result<()> {
let res = unsafe {
ProcessSocketNotifications(
Expand Down Expand Up @@ -353,6 +377,7 @@ impl Poller {
self.post(CompletionPacket::new(Event::none(NOTIFY_KEY)))
}

/// Push an IOCP packet into the queue.
pub fn post(&self, packet: CompletionPacket) -> io::Result<()> {
let span = tracing::trace_span!(
"post",
Expand Down
10 changes: 10 additions & 0 deletions src/iocp/psn/wait.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Safe wrapper around `NtAssociateWaitCompletionPacket` API series.
use std::ffi::c_void;
use std::io;
use std::os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle, RawHandle};
Expand All @@ -9,6 +11,7 @@ use windows_sys::Win32::Foundation::{
STATUS_CANCELLED, STATUS_PENDING, STATUS_SUCCESS,
};

#[link(name = "ntdll")]
extern "system" {
fn NtCreateWaitCompletionPacket(
WaitCompletionPacketHandle: *mut HANDLE,
Expand All @@ -33,6 +36,7 @@ extern "system" {
) -> NTSTATUS;
}

/// Wrapper of NT WaitCompletionPacket.
#[derive(Debug)]
pub struct WaitCompletionPacket {
handle: OwnedHandle,
Expand All @@ -58,6 +62,8 @@ impl WaitCompletionPacket {
Ok(Self { handle })
}

/// Associate waitable object to IOCP. The parameter `info` is the
/// field `dwNumberOfBytesTransferred` in `OVERLAPPED_ENTRY`
pub fn associate(
&mut self,
port: RawHandle,
Expand All @@ -80,6 +86,10 @@ impl WaitCompletionPacket {
Ok(())
}

/// Cancels the completion packet. The return value means:
/// - `Ok(true)`: cancellation is successful.
/// - `Ok(false)`: cancellation failed, the packet is still in use.
/// - `Err(e)`: other errors.
pub fn cancel(&mut self) -> io::Result<bool> {
let status = unsafe { NtCancelWaitCompletionPacket(self.handle.as_raw_handle() as _, 0) };
match status {
Expand Down

0 comments on commit 6104c0d

Please sign in to comment.