From ac8b7de45687815386a6ebcd23c07048d1826f38 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 10 Dec 2016 12:00:09 -0800 Subject: [PATCH] Allow registration of custom handles on Windows This commit intends to extend the functionality of mio on Windows to support custom handles being registered with the internal IOCP object. This in turn should unlock the ability to work with named pipes, filesystem changes, or any other IOCP-enabled object on Windows. Named pipes are in particular quite important as they're often a foundational IPC mechanism on Windows. This support is provided by exporting two new types in a `windows` module. A `Binding` serves as the ability to register with the actual IOCP port in an `Evented` implementation. Internally the `Binding` keeps track of what it was last associated with to implement IOCP semantics. This may one day be possible to make a zero-sized-type. The second type, `Overlapped`, is exported as a contract that all overlapped I/O operations must be executed with this particular type. This ensures that after an event is received from an IOCP object we know what to do with it. Essentially this is just a `OVERLAPPED` with a function pointer after it. Along the way this exposes the `winapi` crate as a public dependency of `mio`. The `OVERLAPPED_ENTRY` and `OVERLAPPED` types in `winapi` are exposed through the `Overlapped` type that mio itself exports. I've implemented [bindings to named pipes][bindings] and I've also got a proof-of-concept [process management library][tokio-process] using these bindings. So far it seems that this support in mio is sufficient for building up these applications, and it all appears to be working so far. I personally see this as a much bigger committment on the mio side of things than the Unix implementation. The `EventedFd` type on Unix is quite small and minimal, but the `Overlapped` and `binding` types on Windows are just pieces of a larger puzzle when dealing with overlapped operations. Questions about ownership of I/O objects arise along with the method of handling completion status notifications. For now this is essentially binding mio to stick to at least the same strategy for handling IOCP for the 0.6 series. A major version bump of mio could perhaps change these semantics, but it may be difficult to do so. It seems, though, that the Windows semantics are unlikely to change much in the near future. The overhead seems to have essentially reached its limit ("bolting readiness on completion") and otherwise the ownership management seems negligible. Closes #252 Closes #320 --- Cargo.toml | 2 +- src/lib.rs | 56 ++++++ src/sys/mod.rs | 2 + src/sys/windows/mod.rs | 7 +- src/sys/windows/selector.rs | 356 +++++++++++++++++++++++------------- src/sys/windows/tcp.rs | 43 +++-- src/sys/windows/udp.rs | 22 ++- 7 files changed, 332 insertions(+), 156 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b5419fe9b..11a5095e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ libc = "0.2.16" [target.'cfg(windows)'.dependencies] winapi = "0.2.1" -miow = "0.1.3" +miow = "0.1.4" kernel32-sys = "0.2" [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 213c0e075..79c954b46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -146,6 +146,62 @@ pub mod unix { }; } +/// Windows-only extensions to the mio crate. +/// +/// Mio on windows is currently implemented with IOCP for a high-performance +/// implementation of asynchronous I/O. Mio then provides TCP and UDP as sample +/// bindings for the system to connect networking types to asynchronous I/O. On +/// Unix this scheme is then also extensible to all other file descriptors with +/// the `EventedFd` type, but on Windows no such analog is available. The +/// purpose of this module, however, is to similarly provide a mechanism for +/// foreign I/O types to get hooked up into the IOCP event loop. +/// +/// This module provides two types for interfacing with a custom IOCP handle: +/// +/// * `Binding` - this type is intended to govern binding with mio's `Poll` +/// type. Each I/O object should contain an instance of `Binding` that's +/// interfaced with for the implementation of the `Evented` trait. The +/// `register`, `reregister`, and `deregister` methods for the `Evented` trait +/// all have rough analogs with `Binding`. +/// +/// Note that this type **does not handle readiness**. That is, this type does +/// not handle whether sockets are readable/writable/etc. It's intended that +/// IOCP types will internally manage this state with a `SetReadiness` type +/// from the `poll` module. The `SetReadiness` is typically lazily created on +/// the first time that `Evented::register` is called and then stored in the +/// I/O object. +/// +/// Also note that for types which represent streams of bytes the mio +/// interface of *readiness* doesn't map directly to the Windows model of +/// *completion*. This means that types will have to perform internal +/// buffering to ensure that a readiness interface can be provided. For a +/// sample implementation see the TCP/UDP modules in mio itself. +/// +/// * `Overlapped` - this type is intended to be used as the concreate instances +/// of the `OVERLAPPED` type that most win32 methods expect. It's crucial, for +/// safety, that all asynchronous operations are initiated with an instance of +/// `Overlapped` and not another instantiation of `OVERLAPPED`. +/// +/// Mio's `Overlapped` type is created with a function pointer that receives +/// a `OVERLAPPED_ENTRY` type when called. This `OVERLAPPED_ENTRY` type is +/// defined in the `winapi` crate. Whenever a completion is posted to an IOCP +/// object the `OVERLAPPED` that was signaled will be interpreted as +/// `Overlapped` in the mio crate and this function pointer will be invoked. +/// Through this function pointer, and through the `OVERLAPPED` pointer, +/// implementations can handle management of I/O events. +/// +/// When put together these two types enable custom Windows handles to be +/// registered with mio's event loops. The `Binding` type is used to associate +/// handles and the `Overlapped` type is used to execute I/O operations. When +/// the I/O operations are completed a custom function pointer is called which +/// typically modifies a `SetReadiness` set by `Evented` methods which will get +/// later hooked into the mio event loop. +#[cfg(windows)] +pub mod windows { + + pub use sys::{Overlapped, Binding}; +} + // Conversion utilities mod convert { use std::time::Duration; diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 9297f067d..96386619c 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -25,6 +25,8 @@ pub use self::windows::{ TcpStream, TcpListener, UdpSocket, + Overlapped, + Binding, }; #[cfg(windows)] diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index d73be4b72..6d493ef08 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -142,8 +142,6 @@ use std::os::windows::prelude::*; use kernel32; use winapi; -use self::selector::Overlapped; - mod awakener; #[macro_use] mod selector; @@ -153,7 +151,7 @@ mod from_raw_arc; mod buffer_pool; pub use self::awakener::Awakener; -pub use self::selector::{Events, Selector}; +pub use self::selector::{Events, Selector, Overlapped, Binding}; pub use self::tcp::{TcpStream, TcpListener}; pub use self::udp::UdpSocket; @@ -169,8 +167,7 @@ fn wouldblock() -> io::Error { unsafe fn cancel(socket: &AsRawSocket, overlapped: &Overlapped) -> io::Result<()> { let handle = socket.as_raw_socket() as winapi::HANDLE; - let overlapped = overlapped.get_mut().raw(); - let ret = kernel32::CancelIoEx(handle, overlapped); + let ret = kernel32::CancelIoEx(handle, overlapped.as_mut_ptr()); if ret == 0 { Err(io::Error::last_os_error()) } else { diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs index 6a592fb46..bf7471142 100644 --- a/src/sys/windows/selector.rs +++ b/src/sys/windows/selector.rs @@ -1,10 +1,12 @@ -use std::{cmp, io, mem, u32}; +use std::{cmp, io, u32}; use std::cell::UnsafeCell; use std::os::windows::prelude::*; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; use std::time::Duration; +use lazycell::AtomicLazyCell; + use convert; use winapi::*; use miow; @@ -13,7 +15,6 @@ use miow::iocp::{CompletionPort, CompletionStatus}; use event::{Event, Ready}; use poll::{self, Poll}; use sys::windows::buffer_pool::BufferPool; -use sys::windows::from_raw_arc::FromRawArc; use {Token, PollOpt}; /// Each Selector has a globally unique(ish) ID associated with it. This ID @@ -93,11 +94,11 @@ impl Selector { } let callback = unsafe { - (*(status.overlapped() as *mut Overlapped)).callback() + (*(status.overlapped() as *mut Overlapped)).callback }; trace!("select; -> got overlapped"); - callback(status); + callback(status.entry()); } trace!("returning"); @@ -127,36 +128,169 @@ impl SelectorInner { } } -/// A registration is stored in each I/O object which keeps track of how it is -/// associated with a `Selector` above. +// A registration is stored in each I/O object which keeps track of how it is +// associated with a `Selector` above. +// +// Once associated with a `Selector`, a registration can never be un-associated +// (due to IOCP requirements). This is actually implemented through the +// `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the +// level/edge/filtering business. +/// A `Binding` is embedded in all I/O objects associated with a `Poll` +/// object. +/// +/// Each registration keeps track of which selector the I/O object is +/// associated with, ensuring that implementations of `Evented` can be +/// conformant for the various methods on Windows. +/// +/// If you're working with custom IOCP-enabled objects then you'll want to +/// ensure that one of these instances is stored in your object and used in the +/// implementation of `Evented`. /// -/// Once associated with a `Selector`, a registration can never be un-associated -/// (due to IOCP requirements). This is actually implemented through the -/// `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the -/// level/edge/filtering business. -pub struct Registration { - inner: Option, +/// For more information about how to use this see the `windows` module +/// documentation in this crate. +pub struct Binding { + selector: AtomicLazyCell>, } -struct RegistrationInner { - set_readiness: poll::SetReadiness, - selector: Arc, +impl Binding { + /// Creates a new blank binding ready to be inserted into an I/O + /// object. + /// + /// Won't actually do anything until associated with an `Poll` loop. + pub fn new() -> Binding { + Binding { selector: AtomicLazyCell::new() } + } + + /// Registers a new handle with the `Poll` specified, also assigning the + /// `token` specified. + /// + /// This function is intended to be used as part of `Evented::register` for + /// custom IOCP objects. It will add the specified handle to the internal + /// IOCP object with the provided `token`. All future events generated by + /// the handled provided will be recieved by the `Poll`'s internal IOCP + /// object. + /// + /// # Unsafety + /// + /// This function is unsafe as the `Poll` instance has assumptions about + /// what the `OVERLAPPED` pointer used for each I/O operation looks like. + /// Specifically they must all be instances of the `Overlapped` type in + /// this crate. More information about this can be found on the + /// `windows` module in this crate. + pub unsafe fn register_handle(&self, + handle: &AsRawHandle, + token: Token, + poll: &Poll) -> io::Result<()> { + let selector = poll::selector(poll); + + // Ignore errors, we'll see them on the next line. + drop(self.selector.fill(selector.inner.clone())); + try!(self.check_same_selector(poll)); + + selector.inner.port.add_handle(usize::from(token), handle) + } + + /// Same as `register_handle` but for sockets. + pub unsafe fn register_socket(&self, + handle: &AsRawSocket, + token: Token, + poll: &Poll) -> io::Result<()> { + let selector = poll::selector(poll); + drop(self.selector.fill(selector.inner.clone())); + try!(self.check_same_selector(poll)); + selector.inner.port.add_socket(usize::from(token), handle) + } + + /// Reregisters the handle provided from the `Poll` provided. + /// + /// This is intended to be used as part of `Evented::reregister` but note + /// that this function does not currently reregister the provided handle + /// with the `poll` specified. IOCP has a special binding for changing the + /// token which has not yet been implemented. Instead this function should + /// be used to assert that the call to `reregister` happened on the same + /// `Poll` that was passed into to `register`. + /// + /// Eventually, though, the provided `handle` will be re-assigned to have + /// the token `token` on the given `poll` assuming that it's been + /// previously registered with it. + /// + /// # Unsafety + /// + /// This function is unsafe for similar reasons to `register`. That is, + /// there may be pending I/O events and such which aren't handled correctly. + pub unsafe fn reregister_handle(&self, + _handle: &AsRawHandle, + _token: Token, + poll: &Poll) -> io::Result<()> { + self.check_same_selector(poll) + } + + /// Same as `reregister_handle`, but for sockets. + pub unsafe fn reregister_socket(&self, + _socket: &AsRawSocket, + _token: Token, + poll: &Poll) -> io::Result<()> { + self.check_same_selector(poll) + } + + /// Deregisters the handle provided from the `Poll` provided. + /// + /// This is intended to be used as part of `Evented::deregister` but note + /// that this function does not currently deregister the provided handle + /// from the `poll` specified. IOCP has a special binding for that which has + /// not yet been implemented. Instead this function should be used to assert + /// that the call to `deregister` happened on the same `Poll` that was + /// passed into to `register`. + /// + /// # Unsafety + /// + /// This function is unsafe for similar reasons to `register`. That is, + /// there may be pending I/O events and such which aren't handled correctly. + pub unsafe fn deregister_handle(&self, + _handle: &AsRawHandle, + poll: &Poll) -> io::Result<()> { + self.check_same_selector(poll) + } + + /// Same as `deregister_handle`, but for sockets. + pub unsafe fn deregister_socket(&self, + _socket: &AsRawSocket, + poll: &Poll) -> io::Result<()> { + self.check_same_selector(poll) + } + + fn check_same_selector(&self, poll: &Poll) -> io::Result<()> { + let selector = poll::selector(poll); + match self.selector.borrow() { + Some(prev) if prev.identical(&selector.inner) => Ok(()), + Some(_) | + None => Err(other("socket already registered")), + } + } } -impl Registration { - /// Creates a new blank registration ready to be inserted into an I/O object. +/// Helper struct used for TCP and UDP which bundles a `binding` with a +/// `SetReadiness` handle. +pub struct ReadyBinding { + binding: Binding, + readiness: Option, +} + +impl ReadyBinding { + /// Creates a new blank binding ready to be inserted into an I/O object. /// /// Won't actually do anything until associated with an `Selector` loop. - pub fn new() -> Registration { - Registration { - inner: None, + pub fn new() -> ReadyBinding { + ReadyBinding { + binding: Binding::new(), + readiness: None, } } - /// Returns whether this registration has been associated with a selector + /// Returns whether this binding has been associated with a selector /// yet. pub fn registered(&self) -> bool { - self.inner.is_some() + self.readiness.is_some() } /// Acquires a buffer with at least `size` capacity. @@ -165,19 +299,19 @@ impl Registration { /// that buffer pool. If not associated with a selector, this will allocate /// a fresh buffer. pub fn get_buffer(&self, size: usize) -> Vec { - match self.inner { - Some(ref i) => i.selector.buffers.lock().unwrap().get(size), + match self.binding.selector.borrow() { + Some(i) => i.buffers.lock().unwrap().get(size), None => Vec::with_capacity(size), } } - /// Returns a buffer to this registration. + /// Returns a buffer to this binding. /// /// If associated with a selector, this will push the buffer back into the /// selector's pool of buffers. Otherwise this will just drop the buffer. pub fn put_buffer(&self, buf: Vec) { - if let Some(ref i) = self.inner { - i.selector.buffers.lock().unwrap().put(buf); + if let Some(i) = self.binding.selector.borrow() { + i.buffers.lock().unwrap().put(buf); } } @@ -187,10 +321,9 @@ impl Registration { /// that this is all implemented through the `SetReadiness` structure in the /// `poll` module. pub fn set_readiness(&self, set: Ready) { - if let Some(ref i) = self.inner { + if let Some(ref i) = self.readiness { trace!("set readiness to {:?}", set); - let s = &i.set_readiness; - s.set_readiness(set).expect("event loop disappeared?"); + i.set_readiness(set).expect("event loop disappeared?"); } } @@ -198,8 +331,8 @@ impl Registration { /// /// This is what's being used to generate events returned by `poll`. pub fn readiness(&self) -> Ready { - match self.inner { - Some(ref i) => i.set_readiness.readiness(), + match self.readiness { + Some(ref i) => i.readiness(), None => Ready::none(), } } @@ -213,42 +346,41 @@ impl Registration { socket: &AsRawSocket, poll: &Poll, token: Token, - interest: Ready, + events: Ready, opts: PollOpt, registration: &Mutex>) -> io::Result<()> { - trace!("register {:?} {:?}", token, interest); - try!(self.associate(poll, token, interest, opts, registration)); - let selector = poll::selector(poll); - try!(selector.inner.port.add_socket(usize::from(token), socket)); + trace!("register {:?} {:?}", token, events); + unsafe { + try!(self.binding.register_socket(socket, token, poll)); + } + + // To keep the same semantics as epoll, if I/O objects are interested in + // being readable then they're also interested in listening for hup + let events = if events.is_readable() { + events | Ready::hup() + } else { + events + }; + let (r, s) = poll::Registration::new(poll, token, events, opts); + self.readiness = Some(s); + *registration.lock().unwrap() = Some(r); Ok(()) } /// Implementation of `Evented::reregister` function. pub fn reregister_socket(&mut self, - _socket: &AsRawSocket, + socket: &AsRawSocket, poll: &Poll, token: Token, - interest: Ready, + events: Ready, opts: PollOpt, registration: &Mutex>) -> io::Result<()> { - trace!("reregister {:?} {:?}", token, interest); - if self.inner.is_none() { - return Err(other("cannot reregister unregistered socket")) + trace!("reregister {:?} {:?}", token, events); + unsafe { + try!(self.binding.reregister_socket(socket, token, poll)); } - try!(self.associate(poll, token, interest, opts, registration)); - Ok(()) - } - - fn associate(&mut self, - poll: &Poll, - token: Token, - events: Ready, - opts: PollOpt, - registration: &Mutex>) - -> io::Result<()> { - let selector = poll::selector(poll); // To keep the same semantics as epoll, if I/O objects are interested in // being readable then they're also interested in listening for hup @@ -257,37 +389,9 @@ impl Registration { } else { events }; - - match self.inner { - // Ensure that we're only ever associated with at most one event - // loop. IOCP doesn't allow a handle to ever be associated with more - // than one event loop. - Some(ref i) if !i.selector.identical(&selector.inner) => { - return Err(other("socket already registered")); - } - - // If we're already registered, then just update the existing - // registration. - Some(_) => { - let registration = registration.lock().unwrap(); - let registration = registration.as_ref().unwrap(); - trace!("updating existing registration node"); - registration.update(poll, token, events, opts) - } - - // Create a new registration and we'll soon be added to the - // completion port for IOCP as well. - None => { - trace!("allocating new registration node"); - let (r, s) = poll::Registration::new(poll, token, events, opts); - self.inner = Some(RegistrationInner { - set_readiness: s, - selector: selector.inner.clone(), - }); - *registration.lock().unwrap() = Some(r); - Ok(()) - } - } + registration.lock().unwrap() + .as_mut().unwrap() + .update(poll, token, events, opts) } /// Implementation of the `Evented::deregister` function. @@ -295,23 +399,18 @@ impl Registration { /// Doesn't allow registration with another event loop, just shuts down /// readiness notifications and such. pub fn deregister(&mut self, + socket: &AsRawSocket, poll: &Poll, registration: &Mutex>) -> io::Result<()> { trace!("deregistering"); - let selector = poll::selector(poll); - match self.inner { - Some(ref mut i) => { - let registration = registration.lock().unwrap(); - let registration = registration.as_ref().unwrap(); - if !selector.inner.identical(&i.selector) { - return Err(other("socket already registered")); - } - try!(registration.deregister(poll)); - Ok(()) - } - None => Err(other("socket not registered")), + unsafe { + try!(self.binding.deregister_socket(socket, poll)); } + + registration.lock().unwrap() + .as_ref().unwrap() + .deregister(poll) } } @@ -361,10 +460,11 @@ impl Events { } macro_rules! overlapped2arc { - ($e:expr, $t:ty, $($field:ident).+) => ( - ::sys::windows::selector::Overlapped::cast_to_arc::<$t>($e, - offset_of!($t, $($field).+)) - ) + ($e:expr, $t:ty, $($field:ident).+) => ({ + let offset = offset_of!($t, $($field).+); + debug_assert!(offset < mem::size_of::<$t>()); + FromRawArc::from_raw(($e as usize - offset) as *mut $t) + }) } macro_rules! offset_of { @@ -373,42 +473,50 @@ macro_rules! offset_of { ) } -pub type Callback = fn(&CompletionStatus); - -/// See sys::windows module docs for why this exists. +// See sys::windows module docs for why this exists. +// +// The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are +// actually inside one of these structures so it can use the `Callback` stored +// right after it. +// +// We use repr(C) here to ensure that we can assume the overlapped pointer is +// at the start of the structure so we can just do a cast. +/// A wrapper around an internal instance over `miow::Overlapped` which is in +/// turn a wrapper around the Windows type `OVERLAPPED`. /// -/// The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are -/// actually inside one of these structures so it can use the `Callback` stored -/// right after it. -/// -/// We use repr(C) here to ensure that we can assume the overlapped pointer is -/// at the start of the structure so we can just do a cast. +/// This type is required to be used for all IOCP operations on handles that are +/// registered with an event loop. The event loop will receive notifications +/// over `OVERLAPPED` pointers that have completed, and it will cast that +/// pointer to a pointer to this structure and invoke the associated callback. #[repr(C)] pub struct Overlapped { inner: UnsafeCell, - callback: Callback, + callback: fn(&OVERLAPPED_ENTRY), } impl Overlapped { - pub fn new(cb: Callback) -> Overlapped { + /// Creates a new `Overlapped` which will invoke the provided `cb` callback + /// whenever it's triggered. + /// + /// The returned `Overlapped` must be used as the `OVERLAPPED` passed to all + /// I/O operations that are registered with mio's event loop. When the I/O + /// operation associated with an `OVERLAPPED` pointer completes the event + /// loop will invoke the function pointer provided by `cb`. + pub fn new(cb: fn(&OVERLAPPED_ENTRY)) -> Overlapped { Overlapped { inner: UnsafeCell::new(miow::Overlapped::zero()), callback: cb, } } - pub unsafe fn get_mut(&self) -> &mut miow::Overlapped { - &mut *self.inner.get() - } - - pub unsafe fn cast_to_arc(overlapped: *mut miow::Overlapped, - offset: usize) -> FromRawArc { - debug_assert!(offset < mem::size_of::()); - FromRawArc::from_raw((overlapped as usize - offset) as *mut T) - } - - pub unsafe fn callback(&self) -> &Callback { - &self.callback + /// Get the underlying `Overlapped` instance as a raw pointer. + /// + /// This can be useful when only a shared borrow is held and the overlapped + /// pointer needs to be passed down to winapi. + pub fn as_mut_ptr(&self) -> *mut OVERLAPPED { + unsafe { + (*self.inner.get()).raw() + } } } diff --git a/src/sys/windows/tcp.rs b/src/sys/windows/tcp.rs index ffdcafc3a..dea9af35f 100644 --- a/src/sys/windows/tcp.rs +++ b/src/sys/windows/tcp.rs @@ -4,7 +4,7 @@ use std::mem; use std::net::{self, SocketAddr}; use std::sync::{Mutex, MutexGuard}; -use io::would_block; +use miow; use miow::iocp::CompletionStatus; use miow::net::*; use net2::{TcpBuilder, TcpStreamExt as Net2TcpExt}; @@ -12,9 +12,10 @@ use net::tcp::Shutdown; use winapi::*; use {Evented, Ready, Poll, PollOpt, Token}; +use io::would_block; use poll; use sys::windows::from_raw_arc::FromRawArc; -use sys::windows::selector::{Overlapped, Registration}; +use sys::windows::selector::{Overlapped, ReadyBinding}; use sys::windows::{wouldblock, Family}; pub struct TcpStream { @@ -66,14 +67,14 @@ struct ListenerIo { } struct StreamInner { - iocp: Registration, + iocp: ReadyBinding, deferred_connect: Option, read: State<(), ()>, write: State<(Vec, usize), (Vec, usize)>, } struct ListenerInner { - iocp: Registration, + iocp: ReadyBinding, accept: State, accept_buf: AcceptAddrsBuf, } @@ -96,7 +97,7 @@ impl TcpStream { write: Overlapped::new(write_done), socket: socket, inner: Mutex::new(StreamInner { - iocp: Registration::new(), + iocp: ReadyBinding::new(), deferred_connect: deferred_connect, read: State::Empty, write: State::Empty, @@ -244,8 +245,8 @@ impl StreamImp { fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> { unsafe { trace!("scheduling a connect"); - try!(self.inner.socket.connect_overlapped(addr, - self.inner.read.get_mut())); + let overlapped = miow::Overlapped::from_raw(self.inner.read.as_mut_ptr()); + try!(self.inner.socket.connect_overlapped(addr, overlapped)); } // see docs above on StreamImp.inner for rationale on forget mem::forget(self.clone()); @@ -273,8 +274,8 @@ impl StreamImp { trace!("scheduling a read"); let res = unsafe { - self.inner.socket.read_overlapped(&mut [], - self.inner.read.get_mut()) + let overlapped = miow::Overlapped::from_raw(self.inner.read.as_mut_ptr()); + self.inner.socket.read_overlapped(&mut [], overlapped) }; match res { // TODO: investigate better handling `Ok(true)` @@ -336,8 +337,8 @@ impl StreamImp { trace!("scheduling a write"); let err = unsafe { - self.inner.socket.write_overlapped(&buf[pos..], - self.inner.write.get_mut()) + let overlapped = miow::Overlapped::from_raw(self.inner.write.as_mut_ptr()); + self.inner.socket.write_overlapped(&buf[pos..], overlapped) }; match err { Ok(_) => { @@ -363,7 +364,8 @@ impl StreamImp { } } -fn read_done(status: &CompletionStatus) { +fn read_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); let me2 = StreamImp { inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) }, }; @@ -394,7 +396,8 @@ fn read_done(status: &CompletionStatus) { } } -fn write_done(status: &CompletionStatus) { +fn write_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); trace!("finished a write {}", status.bytes_transferred()); let me2 = StreamImp { inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) }, @@ -440,7 +443,8 @@ impl Evented for TcpStream { } fn deregister(&self, poll: &Poll) -> io::Result<()> { - self.inner().iocp.deregister(poll, &self.registration) + self.inner().iocp.deregister(&self.imp.inner.socket, + poll, &self.registration) } } @@ -489,7 +493,7 @@ impl TcpListener { family: family, socket: socket, inner: Mutex::new(ListenerInner { - iocp: Registration::new(), + iocp: ReadyBinding::new(), accept: State::Empty, accept_buf: AcceptAddrsBuf::new(), }), @@ -572,8 +576,9 @@ impl ListenerImp { Family::V6 => TcpBuilder::new_v6(), }.and_then(|builder| unsafe { trace!("scheduling an accept"); + let overlapped = miow::Overlapped::from_raw(self.inner.accept.as_mut_ptr()); self.inner.socket.accept_overlapped(&builder, &mut me.accept_buf, - self.inner.accept.get_mut()) + overlapped) }); match res { Ok((socket, _)) => { @@ -594,7 +599,8 @@ impl ListenerImp { } } -fn accept_done(status: &CompletionStatus) { +fn accept_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); let me2 = ListenerImp { inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) }, }; @@ -639,7 +645,8 @@ impl Evented for TcpListener { } fn deregister(&self, poll: &Poll) -> io::Result<()> { - self.inner().iocp.deregister(poll, &self.registration) + self.inner().iocp.deregister(&self.imp.inner.socket, + poll, &self.registration) } } diff --git a/src/sys/windows/udp.rs b/src/sys/windows/udp.rs index d90be9e26..6dd034617 100644 --- a/src/sys/windows/udp.rs +++ b/src/sys/windows/udp.rs @@ -13,6 +13,7 @@ use std::sync::{Mutex, MutexGuard}; #[allow(unused_imports)] use net2::{UdpBuilder, UdpSocketExt}; use winapi::*; +use miow; use miow::iocp::CompletionStatus; use miow::net::SocketAddrBuf; use miow::net::UdpSocketExt as MiowUdpSocketExt; @@ -20,7 +21,7 @@ use miow::net::UdpSocketExt as MiowUdpSocketExt; use {Evented, Ready, Poll, PollOpt, Token}; use poll; use sys::windows::from_raw_arc::FromRawArc; -use sys::windows::selector::{Overlapped, Registration}; +use sys::windows::selector::{Overlapped, ReadyBinding}; pub struct UdpSocket { imp: Imp, @@ -40,7 +41,7 @@ struct Io { } struct Inner { - iocp: Registration, + iocp: ReadyBinding, read: State, Vec>, write: State, (Vec, usize)>, read_buf: SocketAddrBuf, @@ -63,7 +64,7 @@ impl UdpSocket { write: Overlapped::new(send_done), socket: socket, inner: Mutex::new(Inner { - iocp: Registration::new(), + iocp: ReadyBinding::new(), read: State::Empty, write: State::Empty, read_buf: SocketAddrBuf::new(), @@ -108,8 +109,9 @@ impl UdpSocket { let amt = try!(owned_buf.write(buf)); try!(unsafe { trace!("scheduling a send"); + let overlapped = miow::Overlapped::from_raw(self.imp.inner.write.as_mut_ptr()); self.imp.inner.socket.send_to_overlapped(&owned_buf, target, - self.imp.inner.write.get_mut()) + overlapped) }); me.write = State::Pending(owned_buf); mem::forget(self.imp.clone()); @@ -252,8 +254,9 @@ impl Imp { trace!("scheduling a read"); let cap = buf.capacity(); buf.set_len(cap); + let overlapped = miow::Overlapped::from_raw(self.inner.read.as_mut_ptr()); self.inner.socket.recv_from_overlapped(&mut buf, &mut me.read_buf, - self.inner.read.get_mut()) + overlapped) }; match res { Ok(_) => { @@ -296,7 +299,8 @@ impl Evented for UdpSocket { } fn deregister(&self, poll: &Poll) -> io::Result<()> { - self.inner().iocp.deregister(poll, &self.registration) + self.inner().iocp.deregister(&self.imp.inner.socket, + poll, &self.registration) } } @@ -327,7 +331,8 @@ impl Drop for UdpSocket { } } -fn send_done(status: &CompletionStatus) { +fn send_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); trace!("finished a send {}", status.bytes_transferred()); let me2 = Imp { inner: unsafe { overlapped2arc!(status.overlapped(), Io, write) }, @@ -337,7 +342,8 @@ fn send_done(status: &CompletionStatus) { me2.add_readiness(&mut me, Ready::writable()); } -fn recv_done(status: &CompletionStatus) { +fn recv_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); trace!("finished a recv {}", status.bytes_transferred()); let me2 = Imp { inner: unsafe { overlapped2arc!(status.overlapped(), Io, read) },