Skip to content

Commit

Permalink
Miscellaneous windows implementation clean-ups
Browse files Browse the repository at this point in the history
  • Loading branch information
piscisaureus committed Nov 14, 2019
1 parent 1522d5f commit 6ab4edf
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 79 deletions.
5 changes: 5 additions & 0 deletions src/sys/windows/io_status_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ use ntapi::ntioapi::IO_STATUS_BLOCK;
use std::fmt::{self, Debug, Formatter};
use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut};
use winapi::shared::ntdef::NTSTATUS;

pub struct IoStatusBlock(IO_STATUS_BLOCK);

impl IoStatusBlock {
pub fn zeroed() -> Self {
Self(unsafe { MaybeUninit::<IO_STATUS_BLOCK>::zeroed().assume_init() })
}

pub fn status(&self) -> NTSTATUS {
unsafe { self.u.Status }
}
}

unsafe impl Send for IoStatusBlock {}
Expand Down
154 changes: 76 additions & 78 deletions src/sys/windows/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl AfdGroup {
}
}

#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
enum SockPollStatus {
Idle,
Pending,
Expand Down Expand Up @@ -134,70 +134,71 @@ impl SockState {
fn update(&mut self, self_arc: &Pin<Arc<Mutex<SockState>>>) -> io::Result<()> {
assert!(!self.delete_pending);

if let SockPollStatus::Pending = self.poll_status {
if (self.user_evts & afd::KNOWN_EVENTS & !self.pending_evts) == 0 {
/* All the events the user is interested in are already being monitored by
* the pending poll operation. It might spuriously complete because of an
* event that we're no longer interested in; when that happens we'll submit
* a new poll operation with the updated event mask. */
} else {
/* A poll operation is already pending, but it's not monitoring for all the
* events that the user is interested in. Therefore, cancel the pending
* poll operation; when we receive it's completion package, a new poll
* operation will be submitted with the correct event mask. */
self.cancel()?;
use SockPollStatus::*;
match self.poll_status {
Pending if (self.user_evts & afd::KNOWN_EVENTS & !self.pending_evts) == 0 => {
// All the events the user is interested in are already being monitored by
// the pending poll operation. It might spuriously complete because of an
// event that we're no longer interested in; when that happens we'll submit
// a new poll operation with the updated event mask.
Ok(())
}
} else if let SockPollStatus::Cancelled = self.poll_status {
/* The poll operation has already been cancelled, we're still waiting for
* it to return. For now, there's nothing that needs to be done. */
} else if let SockPollStatus::Idle = self.poll_status {
/* No poll operation is pending; start one. */
self.poll_info.exclusive = 0;
self.poll_info.number_of_handles = 1;
unsafe {
*self.poll_info.timeout.QuadPart_mut() = std::i64::MAX;
Pending => {
// A poll operation is already pending, but it's not monitoring for all the
// events that the user is interested in. Therefore, cancel the pending
// poll operation; when we receive it's completion package, a new poll
// operation will be submitted with the correct event mask.
self.cancel()
}
self.poll_info.handles[0].handle = self.base_socket as HANDLE;
self.poll_info.handles[0].status = 0;
self.poll_info.handles[0].events = self.user_evts | afd::POLL_LOCAL_CLOSE;

let result = unsafe {
self.afd.poll(
&mut self.poll_info,
&mut self.iosb,
transmute_copy(self_arc),
)
};
if let Err(e) = result {
let code = e.raw_os_error().unwrap();
if code == ERROR_IO_PENDING as i32 {
/* Overlapped poll operation in progress; this is expected. */
} else if code == ERROR_INVALID_HANDLE as i32 {
/* Socket closed; it'll be dropped. */
self.mark_delete();
return Ok(());
} else {
return Err(e);
Cancelled => {
// The poll operation has already been cancelled, we're still waiting for
// it to return. For now, there's nothing that needs to be done.
Ok(())
}
Idle => {
// No poll operation is pending; start one.
self.poll_info.exclusive = 0;
self.poll_info.number_of_handles = 1;
*unsafe { self.poll_info.timeout.QuadPart_mut() } = std::i64::MAX;

self.poll_info.handles[0].handle = self.base_socket as HANDLE;
self.poll_info.handles[0].status = 0;
self.poll_info.handles[0].events = self.user_evts | afd::POLL_LOCAL_CLOSE;

let result = unsafe {
self.afd.poll(
&mut self.poll_info,
&mut self.iosb,
transmute_copy(self_arc),
)
};

match result {
Err(ref e) if e.raw_os_error() == Some(ERROR_INVALID_HANDLE as i32) => {
// Socket has been closed behind our back; delete it.
self.mark_delete();
Ok(())
}
Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => {
// Afd::poll() returns `Ok(false)` if the operation is pending,
// so this should never happen.
unreachable!();
}
Err(e) => Err(e),
Ok(_) => {
self.poll_status = SockPollStatus::Pending;
self.pending_evts = self.user_evts;
forget(self_arc.clone());
Ok(())
}
}
}

self.poll_status = SockPollStatus::Pending;
self.pending_evts = self.user_evts;
forget(self_arc.clone());
} else {
unreachable!();
}
Ok(())
}

fn cancel(&mut self) -> io::Result<()> {
match self.poll_status {
SockPollStatus::Pending => {}
_ => unreachable!(),
};
unsafe {
self.afd.cancel(&mut self.iosb)?;
}
assert_eq!(self.poll_status, SockPollStatus::Pending);
unsafe { self.afd.cancel(&mut self.iosb) }?;
self.poll_status = SockPollStatus::Cancelled;
self.pending_evts = 0;
Ok(())
Expand All @@ -209,24 +210,22 @@ impl SockState {
self.pending_evts = 0;

let mut afd_events = 0;
// We use the status info in IO_STATUS_BLOCK to determine the socket poll status. It is unsafe to use a pointer of IO_STATUS_BLOCK.
unsafe {
if self.delete_pending {
return None;
} else if self.iosb.u.Status == STATUS_CANCELLED {
/* The poll request was cancelled by CancelIoEx. */
} else if !NT_SUCCESS(self.iosb.u.Status) {
/* The overlapped request itself failed in an unexpected way. */
afd_events = afd::POLL_CONNECT_FAIL;
} else if self.poll_info.number_of_handles < 1 {
/* This poll operation succeeded but didn't report any socket events. */
} else if self.poll_info.handles[0].events & afd::POLL_LOCAL_CLOSE != 0 {
/* The poll operation reported that the socket was closed. */
self.mark_delete();
return None;
} else {
afd_events = self.poll_info.handles[0].events;
}

if self.delete_pending {
return None;
} else if self.iosb.status() == STATUS_CANCELLED {
// The poll request was cancelled by CancelIoEx.
} else if !NT_SUCCESS(self.iosb.status()) {
// The overlapped request itself failed in an unexpected way.
afd_events = afd::POLL_CONNECT_FAIL
} else if self.poll_info.number_of_handles < 1 {
// This poll operation succeeded but didn't report any socket events.
} else if self.poll_info.handles[0].events & afd::POLL_LOCAL_CLOSE != 0 {
// The poll operation reported that the socket was closed.
self.mark_delete();
return None;
} else {
afd_events = self.poll_info.handles[0].events
}

afd_events &= self.user_evts;
Expand Down Expand Up @@ -260,10 +259,9 @@ impl SockState {

pub fn mark_delete(&mut self) {
if !self.delete_pending {
if let SockPollStatus::Pending = self.poll_status {
drop(self.cancel());
if self.poll_status == SockPollStatus::Pending {
self.cancel().unwrap();
}

self.delete_pending = true;
}
}
Expand Down
5 changes: 4 additions & 1 deletion tests/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ use mio::{Interests, Token};
#[macro_use]
mod util;

#[cfg(not(target_os = "windows"))]
use util::init;

use util::{
any_local_address, any_local_ipv6_address, assert_send, assert_sync, assert_would_block,
expect_events, expect_no_events, init, init_with_poll, ExpectEvent,
expect_events, expect_no_events, init_with_poll, ExpectEvent,
};

const DATA1: &[u8] = b"Hello world!";
Expand Down

0 comments on commit 6ab4edf

Please sign in to comment.