Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SourceSocket #1183

Closed
wants to merge 10 commits into from
5 changes: 3 additions & 2 deletions src/event/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use std::ops::DerefMut;
/// Event sources are always backed by system handles, such as sockets or other
/// system handles. These `event::Source`s will be monitored by the system
/// selector. An implementation of `Source` will almost always delegates to a
/// lower level handle. Examples of this are [`TcpStream`]s, or the *unix only*
/// [`SourceFd`].
/// lower level handle. Examples of this are [`TcpStream`]s, the *unix only*
/// [`SourceFd`] or the *windows only* counterpart [`SourceSocket`].
///
/// [`TcpStream`]: crate::net::TcpStream
/// [`SourceFd`]: crate::unix::SourceFd
/// [`SourceSocket`]: crate::windows::SourceSocket
///
/// # Dropping `event::Source`s
///
Expand Down
11 changes: 9 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ pub use waker::Waker;
#[cfg(unix)]
pub mod unix {
//! Unix only extensions.
pub use crate::sys::SocketAddr;
pub use crate::sys::SourceFd;

pub use crate::sys::{SocketAddr, SourceFd};
}

#[cfg(windows)]
pub mod windows {
//! Windows only extensions.

pub use crate::sys::{SocketState, SourceSocket};
}

// Enable with `cargo doc --features guide`.
Expand Down
5 changes: 4 additions & 1 deletion src/sys/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ pub use self::unix::{
mod unix;

#[cfg(windows)]
pub use self::windows::{event, Event, Events, Selector, TcpListener, TcpStream, UdpSocket, Waker};
pub use self::windows::{
event, Event, Events, Selector, SocketState, SourceSocket, TcpListener, TcpStream, UdpSocket,
Waker,
};

#[cfg(windows)]
mod windows;
50 changes: 9 additions & 41 deletions src/sys/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ use std::mem::size_of_val;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::pin::Pin;
use std::sync::{Arc, Mutex, Once};

use winapi::ctypes::c_int;
use winapi::shared::ws2def::SOCKADDR;
use winapi::um::winsock2::{
ioctlsocket, socket, FIONBIO, INVALID_SOCKET, PF_INET, PF_INET6, SOCKET,
};

use crate::{Interest, Token};

/// Helper macro to execute a system call that returns an `io::Result`.
//
// Macro must be defined before any modules that uses them.
Expand All @@ -23,69 +26,34 @@ macro_rules! syscall {
}};
}

/// Helper macro to execute an I/O operation and register interests if the
/// operation would block.
macro_rules! try_io {
($self: ident, $method: ident $(, $args: expr)*) => {{
let result = (&$self.inner).$method($($args),*);
if let Err(ref e) = result {
if e.kind() == io::ErrorKind::WouldBlock {
$self.io_blocked_reregister()?;
}
}
result
}};
}

mod afd;
pub mod event;
mod io_status_block;
mod selector;
mod source_socket;
mod tcp;
mod udp;
mod waker;

pub use event::{Event, Events};
pub use selector::{Selector, SelectorInner, SockState};
pub use source_socket::{SocketState, SourceSocket};
pub use tcp::{TcpListener, TcpStream};
pub use udp::UdpSocket;
pub use waker::Waker;

pub trait SocketState {
// The `SockState` struct needs to be pinned in memory because it contains
// `OVERLAPPED` and `AFD_POLL_INFO` fields which are modified in the
// background by the windows kernel, therefore we need to ensure they are
// never moved to a different memory address.
fn get_sock_state(&self) -> Option<Pin<Arc<Mutex<SockState>>>>;
fn set_sock_state(&self, sock_state: Option<Pin<Arc<Mutex<SockState>>>>);
}

use crate::{Interest, Token};

#[derive(Debug)]
struct InternalState {
selector: Arc<SelectorInner>,
token: Token,
interests: Interest,
sock_state: Option<Pin<Arc<Mutex<SockState>>>>,
}

impl InternalState {
fn new(selector: Arc<SelectorInner>, token: Token, interests: Interest) -> InternalState {
InternalState {
selector,
token,
interests,
sock_state: None,
}
}
sock_state: Pin<Arc<Mutex<SockState>>>,
}

impl Drop for InternalState {
fn drop(&mut self) {
if let Some(sock_state) = self.sock_state.as_ref() {
let mut sock_state = sock_state.lock().unwrap();
sock_state.mark_delete();
}
let mut sock_state = self.sock_state.lock().unwrap();
sock_state.mark_delete();
}
}

Expand Down
117 changes: 41 additions & 76 deletions src/sys/windows/selector.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use super::afd::{self, Afd, AfdPollInfo};
use super::io_status_block::IoStatusBlock;
use super::Event;
use super::SocketState;
use crate::sys::Events;
use super::{Event, Events, InternalState};
use crate::{Interest, Token};

use miow::iocp::{CompletionPort, CompletionStatus};
use miow::Overlapped;

use std::collections::VecDeque;
use std::marker::PhantomPinned;
use std::mem::size_of;
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::os::windows::io::RawSocket;
use std::pin::Pin;
use std::ptr::null_mut;
#[cfg(debug_assertions)]
Expand Down Expand Up @@ -348,37 +347,29 @@ impl Selector {
self.inner.select(events, timeout)
}

pub fn register<S: SocketState + AsRawSocket>(
pub(super) fn register(
&self,
socket: &S,
socket: RawSocket,
token: Token,
interests: Interest,
) -> io::Result<()> {
self.inner.register(socket, token, interests)
) -> io::Result<InternalState> {
SelectorInner::register(&self.inner, socket, token, interests)
}

pub fn reregister<S: SocketState>(
pub fn reregister(
&self,
socket: &S,
state: &Pin<Arc<Mutex<SockState>>>,
token: Token,
interests: Interest,
) -> io::Result<()> {
self.inner.reregister(socket, token, interests)
}

pub fn deregister<S: SocketState>(&self, socket: &S) -> io::Result<()> {
self.inner.deregister(socket)
self.inner.reregister(state, token, interests)
}

#[cfg(debug_assertions)]
pub fn id(&self) -> usize {
self.id
}

pub(super) fn clone_inner(&self) -> Arc<SelectorInner> {
self.inner.clone()
}

pub(super) fn clone_port(&self) -> Arc<CompletionPort> {
self.inner.cp.clone()
}
Expand Down Expand Up @@ -486,71 +477,51 @@ impl SelectorInner {
}
}

pub fn register<S: SocketState + AsRawSocket>(
&self,
socket: &S,
fn register(
this: &Arc<Self>,
socket: RawSocket,
token: Token,
interests: Interest,
) -> io::Result<()> {
if socket.get_sock_state().is_some() {
return Err(io::Error::from(io::ErrorKind::AlreadyExists));
}

let flags = interests_to_afd_flags(interests);

let sock = self._alloc_sock_for_rawsocket(socket.as_raw_socket())?;
let event = Event {
flags,
data: token.0 as u64,
) -> io::Result<InternalState> {
let sock = {
let sock = this._alloc_sock_for_rawsocket(socket)?;
let event = Event {
flags: interests_to_afd_flags(interests),
data: token.0 as u64,
};
sock.lock().unwrap().set_event(event);
sock
};

{
sock.lock().unwrap().set_event(event);
}
socket.set_sock_state(Some(sock));
unsafe {
self.add_socket_to_update_queue(socket);
self.update_sockets_events_if_polling()?;
}
let state = InternalState {
selector: this.clone(),
token,
interests,
sock_state: sock.clone(),
};

Ok(())
let mut update_queue = this.update_queue.lock().unwrap();
update_queue.push_back(sock);
unsafe { this.update_sockets_events_if_polling().map(|()| state) }
}

pub fn reregister<S: SocketState>(
pub(super) fn reregister(
&self,
socket: &S,
state: &Pin<Arc<Mutex<SockState>>>,
token: Token,
interests: Interest,
) -> io::Result<()> {
let flags = interests_to_afd_flags(interests);

let sock = match socket.get_sock_state() {
Some(sock) => sock,
None => return Err(io::Error::from(io::ErrorKind::NotFound)),
};
let event = Event {
flags,
data: token.0 as u64,
};

{
sock.lock().unwrap().set_event(event);
}
unsafe {
self.add_socket_to_update_queue(socket);
self.update_sockets_events_if_polling()?;
let event = Event {
flags: interests_to_afd_flags(interests),
data: token.0 as u64,
};
state.lock().unwrap().set_event(event);
}

Ok(())
}

pub fn deregister<S: SocketState>(&self, socket: &S) -> io::Result<()> {
if socket.get_sock_state().is_none() {
return Err(io::Error::from(io::ErrorKind::NotFound));
}
socket.set_sock_state(None);
self.afd_group.release_unused_afd();
Ok(())
let mut update_queue = self.update_queue.lock().unwrap();
update_queue.push_back(state.clone());
unsafe { self.update_sockets_events_if_polling() }
}

unsafe fn update_sockets_events(&self) -> io::Result<()> {
Expand Down Expand Up @@ -594,12 +565,6 @@ impl SelectorInner {
}
}

unsafe fn add_socket_to_update_queue<S: SocketState>(&self, socket: &S) {
let sock_state = socket.get_sock_state().unwrap();
let mut update_queue = self.update_queue.lock().unwrap();
update_queue.push_back(sock_state);
}

// It returns processed count of iocp_events rather than the events itself.
unsafe fn feed_events(
&self,
Expand Down
Loading