diff --git a/Cargo.toml b/Cargo.toml index b5419fe9b..11a5095e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ libc = "0.2.16" [target.'cfg(windows)'.dependencies] winapi = "0.2.1" -miow = "0.1.3" +miow = "0.1.4" kernel32-sys = "0.2" [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 213c0e075..79c954b46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -146,6 +146,62 @@ pub mod unix { }; } +/// Windows-only extensions to the mio crate. +/// +/// Mio on windows is currently implemented with IOCP for a high-performance +/// implementation of asynchronous I/O. Mio then provides TCP and UDP as sample +/// bindings for the system to connect networking types to asynchronous I/O. On +/// Unix this scheme is then also extensible to all other file descriptors with +/// the `EventedFd` type, but on Windows no such analog is available. The +/// purpose of this module, however, is to similarly provide a mechanism for +/// foreign I/O types to get hooked up into the IOCP event loop. +/// +/// This module provides two types for interfacing with a custom IOCP handle: +/// +/// * `Binding` - this type is intended to govern binding with mio's `Poll` +/// type. Each I/O object should contain an instance of `Binding` that's +/// interfaced with for the implementation of the `Evented` trait. The +/// `register`, `reregister`, and `deregister` methods for the `Evented` trait +/// all have rough analogs with `Binding`. +/// +/// Note that this type **does not handle readiness**. That is, this type does +/// not handle whether sockets are readable/writable/etc. It's intended that +/// IOCP types will internally manage this state with a `SetReadiness` type +/// from the `poll` module. The `SetReadiness` is typically lazily created on +/// the first time that `Evented::register` is called and then stored in the +/// I/O object. +/// +/// Also note that for types which represent streams of bytes the mio +/// interface of *readiness* doesn't map directly to the Windows model of +/// *completion*. This means that types will have to perform internal +/// buffering to ensure that a readiness interface can be provided. For a +/// sample implementation see the TCP/UDP modules in mio itself. +/// +/// * `Overlapped` - this type is intended to be used as the concreate instances +/// of the `OVERLAPPED` type that most win32 methods expect. It's crucial, for +/// safety, that all asynchronous operations are initiated with an instance of +/// `Overlapped` and not another instantiation of `OVERLAPPED`. +/// +/// Mio's `Overlapped` type is created with a function pointer that receives +/// a `OVERLAPPED_ENTRY` type when called. This `OVERLAPPED_ENTRY` type is +/// defined in the `winapi` crate. Whenever a completion is posted to an IOCP +/// object the `OVERLAPPED` that was signaled will be interpreted as +/// `Overlapped` in the mio crate and this function pointer will be invoked. +/// Through this function pointer, and through the `OVERLAPPED` pointer, +/// implementations can handle management of I/O events. +/// +/// When put together these two types enable custom Windows handles to be +/// registered with mio's event loops. The `Binding` type is used to associate +/// handles and the `Overlapped` type is used to execute I/O operations. When +/// the I/O operations are completed a custom function pointer is called which +/// typically modifies a `SetReadiness` set by `Evented` methods which will get +/// later hooked into the mio event loop. +#[cfg(windows)] +pub mod windows { + + pub use sys::{Overlapped, Binding}; +} + // Conversion utilities mod convert { use std::time::Duration; diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 9297f067d..96386619c 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -25,6 +25,8 @@ pub use self::windows::{ TcpStream, TcpListener, UdpSocket, + Overlapped, + Binding, }; #[cfg(windows)] diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index d73be4b72..6d493ef08 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -142,8 +142,6 @@ use std::os::windows::prelude::*; use kernel32; use winapi; -use self::selector::Overlapped; - mod awakener; #[macro_use] mod selector; @@ -153,7 +151,7 @@ mod from_raw_arc; mod buffer_pool; pub use self::awakener::Awakener; -pub use self::selector::{Events, Selector}; +pub use self::selector::{Events, Selector, Overlapped, Binding}; pub use self::tcp::{TcpStream, TcpListener}; pub use self::udp::UdpSocket; @@ -169,8 +167,7 @@ fn wouldblock() -> io::Error { unsafe fn cancel(socket: &AsRawSocket, overlapped: &Overlapped) -> io::Result<()> { let handle = socket.as_raw_socket() as winapi::HANDLE; - let overlapped = overlapped.get_mut().raw(); - let ret = kernel32::CancelIoEx(handle, overlapped); + let ret = kernel32::CancelIoEx(handle, overlapped.as_mut_ptr()); if ret == 0 { Err(io::Error::last_os_error()) } else { diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs index 6a592fb46..bf7471142 100644 --- a/src/sys/windows/selector.rs +++ b/src/sys/windows/selector.rs @@ -1,10 +1,12 @@ -use std::{cmp, io, mem, u32}; +use std::{cmp, io, u32}; use std::cell::UnsafeCell; use std::os::windows::prelude::*; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; use std::time::Duration; +use lazycell::AtomicLazyCell; + use convert; use winapi::*; use miow; @@ -13,7 +15,6 @@ use miow::iocp::{CompletionPort, CompletionStatus}; use event::{Event, Ready}; use poll::{self, Poll}; use sys::windows::buffer_pool::BufferPool; -use sys::windows::from_raw_arc::FromRawArc; use {Token, PollOpt}; /// Each Selector has a globally unique(ish) ID associated with it. This ID @@ -93,11 +94,11 @@ impl Selector { } let callback = unsafe { - (*(status.overlapped() as *mut Overlapped)).callback() + (*(status.overlapped() as *mut Overlapped)).callback }; trace!("select; -> got overlapped"); - callback(status); + callback(status.entry()); } trace!("returning"); @@ -127,36 +128,169 @@ impl SelectorInner { } } -/// A registration is stored in each I/O object which keeps track of how it is -/// associated with a `Selector` above. +// A registration is stored in each I/O object which keeps track of how it is +// associated with a `Selector` above. +// +// Once associated with a `Selector`, a registration can never be un-associated +// (due to IOCP requirements). This is actually implemented through the +// `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the +// level/edge/filtering business. +/// A `Binding` is embedded in all I/O objects associated with a `Poll` +/// object. +/// +/// Each registration keeps track of which selector the I/O object is +/// associated with, ensuring that implementations of `Evented` can be +/// conformant for the various methods on Windows. +/// +/// If you're working with custom IOCP-enabled objects then you'll want to +/// ensure that one of these instances is stored in your object and used in the +/// implementation of `Evented`. /// -/// Once associated with a `Selector`, a registration can never be un-associated -/// (due to IOCP requirements). This is actually implemented through the -/// `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the -/// level/edge/filtering business. -pub struct Registration { - inner: Option, +/// For more information about how to use this see the `windows` module +/// documentation in this crate. +pub struct Binding { + selector: AtomicLazyCell>, } -struct RegistrationInner { - set_readiness: poll::SetReadiness, - selector: Arc, +impl Binding { + /// Creates a new blank binding ready to be inserted into an I/O + /// object. + /// + /// Won't actually do anything until associated with an `Poll` loop. + pub fn new() -> Binding { + Binding { selector: AtomicLazyCell::new() } + } + + /// Registers a new handle with the `Poll` specified, also assigning the + /// `token` specified. + /// + /// This function is intended to be used as part of `Evented::register` for + /// custom IOCP objects. It will add the specified handle to the internal + /// IOCP object with the provided `token`. All future events generated by + /// the handled provided will be recieved by the `Poll`'s internal IOCP + /// object. + /// + /// # Unsafety + /// + /// This function is unsafe as the `Poll` instance has assumptions about + /// what the `OVERLAPPED` pointer used for each I/O operation looks like. + /// Specifically they must all be instances of the `Overlapped` type in + /// this crate. More information about this can be found on the + /// `windows` module in this crate. + pub unsafe fn register_handle(&self, + handle: &AsRawHandle, + token: Token, + poll: &Poll) -> io::Result<()> { + let selector = poll::selector(poll); + + // Ignore errors, we'll see them on the next line. + drop(self.selector.fill(selector.inner.clone())); + try!(self.check_same_selector(poll)); + + selector.inner.port.add_handle(usize::from(token), handle) + } + + /// Same as `register_handle` but for sockets. + pub unsafe fn register_socket(&self, + handle: &AsRawSocket, + token: Token, + poll: &Poll) -> io::Result<()> { + let selector = poll::selector(poll); + drop(self.selector.fill(selector.inner.clone())); + try!(self.check_same_selector(poll)); + selector.inner.port.add_socket(usize::from(token), handle) + } + + /// Reregisters the handle provided from the `Poll` provided. + /// + /// This is intended to be used as part of `Evented::reregister` but note + /// that this function does not currently reregister the provided handle + /// with the `poll` specified. IOCP has a special binding for changing the + /// token which has not yet been implemented. Instead this function should + /// be used to assert that the call to `reregister` happened on the same + /// `Poll` that was passed into to `register`. + /// + /// Eventually, though, the provided `handle` will be re-assigned to have + /// the token `token` on the given `poll` assuming that it's been + /// previously registered with it. + /// + /// # Unsafety + /// + /// This function is unsafe for similar reasons to `register`. That is, + /// there may be pending I/O events and such which aren't handled correctly. + pub unsafe fn reregister_handle(&self, + _handle: &AsRawHandle, + _token: Token, + poll: &Poll) -> io::Result<()> { + self.check_same_selector(poll) + } + + /// Same as `reregister_handle`, but for sockets. + pub unsafe fn reregister_socket(&self, + _socket: &AsRawSocket, + _token: Token, + poll: &Poll) -> io::Result<()> { + self.check_same_selector(poll) + } + + /// Deregisters the handle provided from the `Poll` provided. + /// + /// This is intended to be used as part of `Evented::deregister` but note + /// that this function does not currently deregister the provided handle + /// from the `poll` specified. IOCP has a special binding for that which has + /// not yet been implemented. Instead this function should be used to assert + /// that the call to `deregister` happened on the same `Poll` that was + /// passed into to `register`. + /// + /// # Unsafety + /// + /// This function is unsafe for similar reasons to `register`. That is, + /// there may be pending I/O events and such which aren't handled correctly. + pub unsafe fn deregister_handle(&self, + _handle: &AsRawHandle, + poll: &Poll) -> io::Result<()> { + self.check_same_selector(poll) + } + + /// Same as `deregister_handle`, but for sockets. + pub unsafe fn deregister_socket(&self, + _socket: &AsRawSocket, + poll: &Poll) -> io::Result<()> { + self.check_same_selector(poll) + } + + fn check_same_selector(&self, poll: &Poll) -> io::Result<()> { + let selector = poll::selector(poll); + match self.selector.borrow() { + Some(prev) if prev.identical(&selector.inner) => Ok(()), + Some(_) | + None => Err(other("socket already registered")), + } + } } -impl Registration { - /// Creates a new blank registration ready to be inserted into an I/O object. +/// Helper struct used for TCP and UDP which bundles a `binding` with a +/// `SetReadiness` handle. +pub struct ReadyBinding { + binding: Binding, + readiness: Option, +} + +impl ReadyBinding { + /// Creates a new blank binding ready to be inserted into an I/O object. /// /// Won't actually do anything until associated with an `Selector` loop. - pub fn new() -> Registration { - Registration { - inner: None, + pub fn new() -> ReadyBinding { + ReadyBinding { + binding: Binding::new(), + readiness: None, } } - /// Returns whether this registration has been associated with a selector + /// Returns whether this binding has been associated with a selector /// yet. pub fn registered(&self) -> bool { - self.inner.is_some() + self.readiness.is_some() } /// Acquires a buffer with at least `size` capacity. @@ -165,19 +299,19 @@ impl Registration { /// that buffer pool. If not associated with a selector, this will allocate /// a fresh buffer. pub fn get_buffer(&self, size: usize) -> Vec { - match self.inner { - Some(ref i) => i.selector.buffers.lock().unwrap().get(size), + match self.binding.selector.borrow() { + Some(i) => i.buffers.lock().unwrap().get(size), None => Vec::with_capacity(size), } } - /// Returns a buffer to this registration. + /// Returns a buffer to this binding. /// /// If associated with a selector, this will push the buffer back into the /// selector's pool of buffers. Otherwise this will just drop the buffer. pub fn put_buffer(&self, buf: Vec) { - if let Some(ref i) = self.inner { - i.selector.buffers.lock().unwrap().put(buf); + if let Some(i) = self.binding.selector.borrow() { + i.buffers.lock().unwrap().put(buf); } } @@ -187,10 +321,9 @@ impl Registration { /// that this is all implemented through the `SetReadiness` structure in the /// `poll` module. pub fn set_readiness(&self, set: Ready) { - if let Some(ref i) = self.inner { + if let Some(ref i) = self.readiness { trace!("set readiness to {:?}", set); - let s = &i.set_readiness; - s.set_readiness(set).expect("event loop disappeared?"); + i.set_readiness(set).expect("event loop disappeared?"); } } @@ -198,8 +331,8 @@ impl Registration { /// /// This is what's being used to generate events returned by `poll`. pub fn readiness(&self) -> Ready { - match self.inner { - Some(ref i) => i.set_readiness.readiness(), + match self.readiness { + Some(ref i) => i.readiness(), None => Ready::none(), } } @@ -213,42 +346,41 @@ impl Registration { socket: &AsRawSocket, poll: &Poll, token: Token, - interest: Ready, + events: Ready, opts: PollOpt, registration: &Mutex>) -> io::Result<()> { - trace!("register {:?} {:?}", token, interest); - try!(self.associate(poll, token, interest, opts, registration)); - let selector = poll::selector(poll); - try!(selector.inner.port.add_socket(usize::from(token), socket)); + trace!("register {:?} {:?}", token, events); + unsafe { + try!(self.binding.register_socket(socket, token, poll)); + } + + // To keep the same semantics as epoll, if I/O objects are interested in + // being readable then they're also interested in listening for hup + let events = if events.is_readable() { + events | Ready::hup() + } else { + events + }; + let (r, s) = poll::Registration::new(poll, token, events, opts); + self.readiness = Some(s); + *registration.lock().unwrap() = Some(r); Ok(()) } /// Implementation of `Evented::reregister` function. pub fn reregister_socket(&mut self, - _socket: &AsRawSocket, + socket: &AsRawSocket, poll: &Poll, token: Token, - interest: Ready, + events: Ready, opts: PollOpt, registration: &Mutex>) -> io::Result<()> { - trace!("reregister {:?} {:?}", token, interest); - if self.inner.is_none() { - return Err(other("cannot reregister unregistered socket")) + trace!("reregister {:?} {:?}", token, events); + unsafe { + try!(self.binding.reregister_socket(socket, token, poll)); } - try!(self.associate(poll, token, interest, opts, registration)); - Ok(()) - } - - fn associate(&mut self, - poll: &Poll, - token: Token, - events: Ready, - opts: PollOpt, - registration: &Mutex>) - -> io::Result<()> { - let selector = poll::selector(poll); // To keep the same semantics as epoll, if I/O objects are interested in // being readable then they're also interested in listening for hup @@ -257,37 +389,9 @@ impl Registration { } else { events }; - - match self.inner { - // Ensure that we're only ever associated with at most one event - // loop. IOCP doesn't allow a handle to ever be associated with more - // than one event loop. - Some(ref i) if !i.selector.identical(&selector.inner) => { - return Err(other("socket already registered")); - } - - // If we're already registered, then just update the existing - // registration. - Some(_) => { - let registration = registration.lock().unwrap(); - let registration = registration.as_ref().unwrap(); - trace!("updating existing registration node"); - registration.update(poll, token, events, opts) - } - - // Create a new registration and we'll soon be added to the - // completion port for IOCP as well. - None => { - trace!("allocating new registration node"); - let (r, s) = poll::Registration::new(poll, token, events, opts); - self.inner = Some(RegistrationInner { - set_readiness: s, - selector: selector.inner.clone(), - }); - *registration.lock().unwrap() = Some(r); - Ok(()) - } - } + registration.lock().unwrap() + .as_mut().unwrap() + .update(poll, token, events, opts) } /// Implementation of the `Evented::deregister` function. @@ -295,23 +399,18 @@ impl Registration { /// Doesn't allow registration with another event loop, just shuts down /// readiness notifications and such. pub fn deregister(&mut self, + socket: &AsRawSocket, poll: &Poll, registration: &Mutex>) -> io::Result<()> { trace!("deregistering"); - let selector = poll::selector(poll); - match self.inner { - Some(ref mut i) => { - let registration = registration.lock().unwrap(); - let registration = registration.as_ref().unwrap(); - if !selector.inner.identical(&i.selector) { - return Err(other("socket already registered")); - } - try!(registration.deregister(poll)); - Ok(()) - } - None => Err(other("socket not registered")), + unsafe { + try!(self.binding.deregister_socket(socket, poll)); } + + registration.lock().unwrap() + .as_ref().unwrap() + .deregister(poll) } } @@ -361,10 +460,11 @@ impl Events { } macro_rules! overlapped2arc { - ($e:expr, $t:ty, $($field:ident).+) => ( - ::sys::windows::selector::Overlapped::cast_to_arc::<$t>($e, - offset_of!($t, $($field).+)) - ) + ($e:expr, $t:ty, $($field:ident).+) => ({ + let offset = offset_of!($t, $($field).+); + debug_assert!(offset < mem::size_of::<$t>()); + FromRawArc::from_raw(($e as usize - offset) as *mut $t) + }) } macro_rules! offset_of { @@ -373,42 +473,50 @@ macro_rules! offset_of { ) } -pub type Callback = fn(&CompletionStatus); - -/// See sys::windows module docs for why this exists. +// See sys::windows module docs for why this exists. +// +// The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are +// actually inside one of these structures so it can use the `Callback` stored +// right after it. +// +// We use repr(C) here to ensure that we can assume the overlapped pointer is +// at the start of the structure so we can just do a cast. +/// A wrapper around an internal instance over `miow::Overlapped` which is in +/// turn a wrapper around the Windows type `OVERLAPPED`. /// -/// The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are -/// actually inside one of these structures so it can use the `Callback` stored -/// right after it. -/// -/// We use repr(C) here to ensure that we can assume the overlapped pointer is -/// at the start of the structure so we can just do a cast. +/// This type is required to be used for all IOCP operations on handles that are +/// registered with an event loop. The event loop will receive notifications +/// over `OVERLAPPED` pointers that have completed, and it will cast that +/// pointer to a pointer to this structure and invoke the associated callback. #[repr(C)] pub struct Overlapped { inner: UnsafeCell, - callback: Callback, + callback: fn(&OVERLAPPED_ENTRY), } impl Overlapped { - pub fn new(cb: Callback) -> Overlapped { + /// Creates a new `Overlapped` which will invoke the provided `cb` callback + /// whenever it's triggered. + /// + /// The returned `Overlapped` must be used as the `OVERLAPPED` passed to all + /// I/O operations that are registered with mio's event loop. When the I/O + /// operation associated with an `OVERLAPPED` pointer completes the event + /// loop will invoke the function pointer provided by `cb`. + pub fn new(cb: fn(&OVERLAPPED_ENTRY)) -> Overlapped { Overlapped { inner: UnsafeCell::new(miow::Overlapped::zero()), callback: cb, } } - pub unsafe fn get_mut(&self) -> &mut miow::Overlapped { - &mut *self.inner.get() - } - - pub unsafe fn cast_to_arc(overlapped: *mut miow::Overlapped, - offset: usize) -> FromRawArc { - debug_assert!(offset < mem::size_of::()); - FromRawArc::from_raw((overlapped as usize - offset) as *mut T) - } - - pub unsafe fn callback(&self) -> &Callback { - &self.callback + /// Get the underlying `Overlapped` instance as a raw pointer. + /// + /// This can be useful when only a shared borrow is held and the overlapped + /// pointer needs to be passed down to winapi. + pub fn as_mut_ptr(&self) -> *mut OVERLAPPED { + unsafe { + (*self.inner.get()).raw() + } } } diff --git a/src/sys/windows/tcp.rs b/src/sys/windows/tcp.rs index ffdcafc3a..dea9af35f 100644 --- a/src/sys/windows/tcp.rs +++ b/src/sys/windows/tcp.rs @@ -4,7 +4,7 @@ use std::mem; use std::net::{self, SocketAddr}; use std::sync::{Mutex, MutexGuard}; -use io::would_block; +use miow; use miow::iocp::CompletionStatus; use miow::net::*; use net2::{TcpBuilder, TcpStreamExt as Net2TcpExt}; @@ -12,9 +12,10 @@ use net::tcp::Shutdown; use winapi::*; use {Evented, Ready, Poll, PollOpt, Token}; +use io::would_block; use poll; use sys::windows::from_raw_arc::FromRawArc; -use sys::windows::selector::{Overlapped, Registration}; +use sys::windows::selector::{Overlapped, ReadyBinding}; use sys::windows::{wouldblock, Family}; pub struct TcpStream { @@ -66,14 +67,14 @@ struct ListenerIo { } struct StreamInner { - iocp: Registration, + iocp: ReadyBinding, deferred_connect: Option, read: State<(), ()>, write: State<(Vec, usize), (Vec, usize)>, } struct ListenerInner { - iocp: Registration, + iocp: ReadyBinding, accept: State, accept_buf: AcceptAddrsBuf, } @@ -96,7 +97,7 @@ impl TcpStream { write: Overlapped::new(write_done), socket: socket, inner: Mutex::new(StreamInner { - iocp: Registration::new(), + iocp: ReadyBinding::new(), deferred_connect: deferred_connect, read: State::Empty, write: State::Empty, @@ -244,8 +245,8 @@ impl StreamImp { fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> { unsafe { trace!("scheduling a connect"); - try!(self.inner.socket.connect_overlapped(addr, - self.inner.read.get_mut())); + let overlapped = miow::Overlapped::from_raw(self.inner.read.as_mut_ptr()); + try!(self.inner.socket.connect_overlapped(addr, overlapped)); } // see docs above on StreamImp.inner for rationale on forget mem::forget(self.clone()); @@ -273,8 +274,8 @@ impl StreamImp { trace!("scheduling a read"); let res = unsafe { - self.inner.socket.read_overlapped(&mut [], - self.inner.read.get_mut()) + let overlapped = miow::Overlapped::from_raw(self.inner.read.as_mut_ptr()); + self.inner.socket.read_overlapped(&mut [], overlapped) }; match res { // TODO: investigate better handling `Ok(true)` @@ -336,8 +337,8 @@ impl StreamImp { trace!("scheduling a write"); let err = unsafe { - self.inner.socket.write_overlapped(&buf[pos..], - self.inner.write.get_mut()) + let overlapped = miow::Overlapped::from_raw(self.inner.write.as_mut_ptr()); + self.inner.socket.write_overlapped(&buf[pos..], overlapped) }; match err { Ok(_) => { @@ -363,7 +364,8 @@ impl StreamImp { } } -fn read_done(status: &CompletionStatus) { +fn read_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); let me2 = StreamImp { inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) }, }; @@ -394,7 +396,8 @@ fn read_done(status: &CompletionStatus) { } } -fn write_done(status: &CompletionStatus) { +fn write_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); trace!("finished a write {}", status.bytes_transferred()); let me2 = StreamImp { inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) }, @@ -440,7 +443,8 @@ impl Evented for TcpStream { } fn deregister(&self, poll: &Poll) -> io::Result<()> { - self.inner().iocp.deregister(poll, &self.registration) + self.inner().iocp.deregister(&self.imp.inner.socket, + poll, &self.registration) } } @@ -489,7 +493,7 @@ impl TcpListener { family: family, socket: socket, inner: Mutex::new(ListenerInner { - iocp: Registration::new(), + iocp: ReadyBinding::new(), accept: State::Empty, accept_buf: AcceptAddrsBuf::new(), }), @@ -572,8 +576,9 @@ impl ListenerImp { Family::V6 => TcpBuilder::new_v6(), }.and_then(|builder| unsafe { trace!("scheduling an accept"); + let overlapped = miow::Overlapped::from_raw(self.inner.accept.as_mut_ptr()); self.inner.socket.accept_overlapped(&builder, &mut me.accept_buf, - self.inner.accept.get_mut()) + overlapped) }); match res { Ok((socket, _)) => { @@ -594,7 +599,8 @@ impl ListenerImp { } } -fn accept_done(status: &CompletionStatus) { +fn accept_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); let me2 = ListenerImp { inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) }, }; @@ -639,7 +645,8 @@ impl Evented for TcpListener { } fn deregister(&self, poll: &Poll) -> io::Result<()> { - self.inner().iocp.deregister(poll, &self.registration) + self.inner().iocp.deregister(&self.imp.inner.socket, + poll, &self.registration) } } diff --git a/src/sys/windows/udp.rs b/src/sys/windows/udp.rs index d90be9e26..6dd034617 100644 --- a/src/sys/windows/udp.rs +++ b/src/sys/windows/udp.rs @@ -13,6 +13,7 @@ use std::sync::{Mutex, MutexGuard}; #[allow(unused_imports)] use net2::{UdpBuilder, UdpSocketExt}; use winapi::*; +use miow; use miow::iocp::CompletionStatus; use miow::net::SocketAddrBuf; use miow::net::UdpSocketExt as MiowUdpSocketExt; @@ -20,7 +21,7 @@ use miow::net::UdpSocketExt as MiowUdpSocketExt; use {Evented, Ready, Poll, PollOpt, Token}; use poll; use sys::windows::from_raw_arc::FromRawArc; -use sys::windows::selector::{Overlapped, Registration}; +use sys::windows::selector::{Overlapped, ReadyBinding}; pub struct UdpSocket { imp: Imp, @@ -40,7 +41,7 @@ struct Io { } struct Inner { - iocp: Registration, + iocp: ReadyBinding, read: State, Vec>, write: State, (Vec, usize)>, read_buf: SocketAddrBuf, @@ -63,7 +64,7 @@ impl UdpSocket { write: Overlapped::new(send_done), socket: socket, inner: Mutex::new(Inner { - iocp: Registration::new(), + iocp: ReadyBinding::new(), read: State::Empty, write: State::Empty, read_buf: SocketAddrBuf::new(), @@ -108,8 +109,9 @@ impl UdpSocket { let amt = try!(owned_buf.write(buf)); try!(unsafe { trace!("scheduling a send"); + let overlapped = miow::Overlapped::from_raw(self.imp.inner.write.as_mut_ptr()); self.imp.inner.socket.send_to_overlapped(&owned_buf, target, - self.imp.inner.write.get_mut()) + overlapped) }); me.write = State::Pending(owned_buf); mem::forget(self.imp.clone()); @@ -252,8 +254,9 @@ impl Imp { trace!("scheduling a read"); let cap = buf.capacity(); buf.set_len(cap); + let overlapped = miow::Overlapped::from_raw(self.inner.read.as_mut_ptr()); self.inner.socket.recv_from_overlapped(&mut buf, &mut me.read_buf, - self.inner.read.get_mut()) + overlapped) }; match res { Ok(_) => { @@ -296,7 +299,8 @@ impl Evented for UdpSocket { } fn deregister(&self, poll: &Poll) -> io::Result<()> { - self.inner().iocp.deregister(poll, &self.registration) + self.inner().iocp.deregister(&self.imp.inner.socket, + poll, &self.registration) } } @@ -327,7 +331,8 @@ impl Drop for UdpSocket { } } -fn send_done(status: &CompletionStatus) { +fn send_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); trace!("finished a send {}", status.bytes_transferred()); let me2 = Imp { inner: unsafe { overlapped2arc!(status.overlapped(), Io, write) }, @@ -337,7 +342,8 @@ fn send_done(status: &CompletionStatus) { me2.add_readiness(&mut me, Ready::writable()); } -fn recv_done(status: &CompletionStatus) { +fn recv_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); trace!("finished a recv {}", status.bytes_transferred()); let me2 = Imp { inner: unsafe { overlapped2arc!(status.overlapped(), Io, read) },