From 719e02338595ac4e450d518ad936d6531f632a91 Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Tue, 6 Aug 2019 20:11:26 +0200 Subject: [PATCH] Remove dynamic dispatch from main Runtime trait --- runtime-native/src/lib.rs | 1 + runtime-native/src/not_wasm32.rs | 78 +++++++-------- runtime-native/src/not_wasm32/tcp.rs | 71 ++++++-------- runtime-native/src/not_wasm32/time.rs | 27 ++---- runtime-native/src/not_wasm32/udp.rs | 48 +++++----- runtime-native/src/wasm32.rs | 63 +++++++----- runtime-native/src/wasm32/tcp.rs | 88 +++++++++++++++++ runtime-native/src/wasm32/time.rs | 29 ++++++ runtime-native/src/wasm32/udp.rs | 105 ++++++++++++++++++++ runtime-raw/src/into_dyn.rs | 88 +++++++++++++++++ runtime-raw/src/lib.rs | 53 +++++++--- runtime-raw/src/tcp.rs | 73 +++++++++++++- runtime-raw/src/time.rs | 22 +++++ runtime-raw/src/udp.rs | 92 ++++++++++++++++++ runtime-tokio/src/lib.rs | 133 +++++++++++++------------- runtime-tokio/src/tcp.rs | 52 +++++----- runtime-tokio/src/time.rs | 32 +++---- runtime-tokio/src/udp.rs | 45 ++++----- src/net/tcp.rs | 8 +- src/net/udp.rs | 2 +- src/time/delay.rs | 2 +- src/time/interval.rs | 2 +- 22 files changed, 802 insertions(+), 312 deletions(-) create mode 100644 runtime-native/src/wasm32/tcp.rs create mode 100644 runtime-native/src/wasm32/time.rs create mode 100644 runtime-native/src/wasm32/udp.rs create mode 100644 runtime-raw/src/into_dyn.rs diff --git a/runtime-native/src/lib.rs b/runtime-native/src/lib.rs index 8e163d9c..00d7e46f 100644 --- a/runtime-native/src/lib.rs +++ b/runtime-native/src/lib.rs @@ -1,6 +1,7 @@ //! A cross-platform asynchronous [Runtime](https://github.com/rustasync/runtime). See the [Runtime //! documentation](https://docs.rs/runtime) for more details. +#![feature(type_alias_impl_trait)] #![deny(unsafe_code)] #![warn( missing_debug_implementations, diff --git a/runtime-native/src/not_wasm32.rs b/runtime-native/src/not_wasm32.rs index 8fe41c17..6dc7a393 100644 --- a/runtime-native/src/not_wasm32.rs +++ b/runtime-native/src/not_wasm32.rs @@ -12,14 +12,10 @@ mod tcp; mod time; mod udp; -use tcp::{TcpListener, TcpStream}; -use time::{Delay, Interval}; -use udp::UdpSocket; - lazy_static! { static ref JULIEX_THREADPOOL: juliex::ThreadPool = { juliex::ThreadPool::with_setup(|| { - runtime_raw::set_runtime(&Native); + runtime_raw::set_runtime(Native); }) }; } @@ -28,53 +24,59 @@ lazy_static! { #[derive(Debug)] pub struct Native; +#[derive(Debug)] +struct Compat(T); + +impl Compat { + fn new(inner: T) -> Self { + Self(inner) + } + + fn get_ref(&self) -> &T { + &self.0 + } + + #[allow(unsafe_code)] + fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut T> { + unsafe { Pin::new_unchecked(&mut Pin::get_unchecked_mut(self).0) } + } +} + impl runtime_raw::Runtime for Native { + type TcpStream = impl runtime_raw::TcpStream; + type TcpListener = impl runtime_raw::TcpListener; + type UdpSocket = impl runtime_raw::UdpSocket; + type Delay = impl runtime_raw::Delay; + type Interval = impl runtime_raw::Interval; + + type ConnectTcpStream = impl Future> + Send; + fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> { - JULIEX_THREADPOOL.spawn_boxed(fut.into()); + JULIEX_THREADPOOL.spawn_boxed(fut); Ok(()) } - fn connect_tcp_stream( - &self, - addr: &SocketAddr, - ) -> BoxFuture<'static, io::Result>>> { - let romio_connect = romio::TcpStream::connect(addr); - let connect = romio_connect.map(|res| { - res.map(|romio_stream| { - Box::pin(TcpStream { romio_stream }) as Pin> - }) - }); - connect.boxed() + fn connect_tcp_stream(&self, addr: &SocketAddr) -> Self::ConnectTcpStream { + romio::TcpStream::connect(addr).map_ok(Compat::new) } - fn bind_tcp_listener( - &self, - addr: &SocketAddr, - ) -> io::Result>> { - let romio_listener = romio::TcpListener::bind(&addr)?; - Ok(Box::pin(TcpListener { romio_listener })) + fn bind_tcp_listener(&self, addr: &SocketAddr) -> io::Result { + romio::TcpListener::bind(&addr).map(Compat::new) } - fn bind_udp_socket( - &self, - addr: &SocketAddr, - ) -> io::Result>> { - let romio_socket = romio::UdpSocket::bind(&addr)?; - Ok(Box::pin(UdpSocket { romio_socket })) + fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result { + romio::UdpSocket::bind(&addr).map(Compat::new) } - fn new_delay(&self, dur: Duration) -> Pin> { - let async_delay = AsyncDelay::new(dur); - Box::pin(Delay { async_delay }) + fn new_delay(&self, dur: Duration) -> Self::Delay { + Compat::new(AsyncDelay::new(dur)) } - fn new_delay_at(&self, at: Instant) -> Pin> { - let async_delay = AsyncDelay::new_at(at); - Box::pin(Delay { async_delay }) + fn new_delay_at(&self, at: Instant) -> Self::Delay { + Compat::new(AsyncDelay::new_at(at)) } - fn new_interval(&self, dur: Duration) -> Pin> { - let async_interval = AsyncInterval::new(dur); - Box::pin(Interval { async_interval }) + fn new_interval(&self, dur: Duration) -> Self::Interval { + Compat::new(AsyncInterval::new(dur)) } } diff --git a/runtime-native/src/not_wasm32/tcp.rs b/runtime-native/src/not_wasm32/tcp.rs index 29dc6f51..7dc16dc6 100644 --- a/runtime-native/src/not_wasm32/tcp.rs +++ b/runtime-native/src/not_wasm32/tcp.rs @@ -1,32 +1,19 @@ use futures::prelude::*; use romio::raw::{AsyncReadReady, AsyncReady, AsyncWriteReady}; +use super::Compat; use std::io; use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; -#[derive(Debug)] -pub(crate) struct TcpStream { - pub romio_stream: romio::tcp::TcpStream, -} - -#[derive(Debug)] -pub(crate) struct TcpListener { - pub romio_listener: romio::tcp::TcpListener, -} - -impl runtime_raw::TcpStream for TcpStream { - fn poll_write_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.romio_stream) - .poll_write_ready(cx) - .map_ok(|_| ()) +impl runtime_raw::TcpStream for Compat { + fn poll_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_write_ready(cx).map_ok(|_| ()) } - fn poll_read_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.romio_stream) - .poll_read_ready(cx) - .map_ok(|_| ()) + fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_read_ready(cx).map_ok(|_| ()) } fn take_error(&self) -> io::Result> { @@ -34,72 +21,72 @@ impl runtime_raw::TcpStream for TcpStream { } fn local_addr(&self) -> io::Result { - self.romio_stream.local_addr() + self.get_ref().local_addr() } fn peer_addr(&self) -> io::Result { - self.romio_stream.peer_addr() + self.get_ref().peer_addr() } fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> { - self.romio_stream.shutdown(how) + self.get_ref().shutdown(how) } #[cfg(unix)] fn as_raw_fd(&self) -> std::os::unix::io::RawFd { use std::os::unix::io::AsRawFd; - self.romio_stream.as_raw_fd() + self.get_ref().as_raw_fd() } } -impl AsyncRead for TcpStream { +impl AsyncRead for Compat { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, mut buf: &mut [u8], ) -> Poll> { - Pin::new(&mut self.romio_stream).poll_read(cx, &mut buf) + self.get_pin_mut().poll_read(cx, &mut buf) } } -impl AsyncWrite for TcpStream { +impl AsyncWrite for Compat { fn poll_write( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.romio_stream).poll_write(cx, &buf) + self.get_pin_mut().poll_write(cx, &buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.romio_stream).poll_flush(cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.romio_stream).poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_pin_mut().poll_close(cx) } } -impl runtime_raw::TcpListener for TcpListener { +impl runtime_raw::TcpListener for Compat { + type TcpStream = Compat; + fn local_addr(&self) -> io::Result { - self.romio_listener.local_addr() + self.get_ref().local_addr() } fn poll_accept( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>>> { - Pin::new(&mut self.romio_listener) + ) -> Poll> { + self.get_pin_mut() .poll_ready(cx) - .map_ok(|(romio_stream, _)| { - Box::pin(TcpStream { romio_stream }) as Pin> - }) + .map_ok(|(romio_stream, _)| Compat::new(romio_stream)) } /// Extracts the raw file descriptor. #[cfg(unix)] fn as_raw_fd(&self) -> std::os::unix::io::RawFd { use std::os::unix::io::AsRawFd; - self.romio_listener.as_raw_fd() + self.get_ref().as_raw_fd() } } diff --git a/runtime-native/src/not_wasm32/time.rs b/runtime-native/src/not_wasm32/time.rs index c0daec0f..281ab2e8 100644 --- a/runtime-native/src/not_wasm32/time.rs +++ b/runtime-native/src/not_wasm32/time.rs @@ -3,38 +3,31 @@ use std::task::{Context, Poll}; use std::time::Instant; use futures::prelude::*; +use futures::ready; use futures_timer::{Delay as AsyncDelay, Interval as AsyncInterval}; -#[derive(Debug)] -pub(crate) struct Delay { - pub(crate) async_delay: AsyncDelay, -} +use super::Compat; -impl runtime_raw::Delay for Delay {} +impl runtime_raw::Delay for Compat {} -impl Future for Delay { +impl Future for Compat { type Output = Instant; #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - futures::ready!(Pin::new(&mut self.async_delay).poll(cx)).unwrap(); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + ready!(self.get_pin_mut().poll(cx)).unwrap(); Poll::Ready(Instant::now()) } } -#[derive(Debug)] -pub(crate) struct Interval { - pub(crate) async_interval: AsyncInterval, -} - -impl runtime_raw::Interval for Interval {} +impl runtime_raw::Interval for Compat {} -impl Stream for Interval { +impl Stream for Compat { type Item = Instant; #[inline] - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - futures::ready!(Pin::new(&mut self.async_interval).poll_next(cx)).unwrap(); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.get_pin_mut().poll_next(cx)).unwrap(); Poll::Ready(Some(Instant::now())) } } diff --git a/runtime-native/src/not_wasm32/udp.rs b/runtime-native/src/not_wasm32/udp.rs index 0cbb7443..28673738 100644 --- a/runtime-native/src/not_wasm32/udp.rs +++ b/runtime-native/src/not_wasm32/udp.rs @@ -1,111 +1,107 @@ use romio::raw::AsyncDatagram; +use super::Compat; use std::io; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::pin::Pin; use std::task::{Context, Poll}; -#[derive(Debug)] -pub(crate) struct UdpSocket { - pub romio_socket: romio::udp::UdpSocket, -} - -impl runtime_raw::UdpSocket for UdpSocket { +impl runtime_raw::UdpSocket for Compat { fn local_addr(&self) -> io::Result { - self.romio_socket.local_addr() + self.get_ref().local_addr() } fn poll_send_to( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], receiver: &SocketAddr, ) -> Poll> { - Pin::new(&mut self.romio_socket).poll_send_to(cx, buf, receiver) + self.get_pin_mut().poll_send_to(cx, buf, receiver) } fn poll_recv_from( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - Pin::new(&mut self.romio_socket).poll_recv_from(cx, buf) + self.get_pin_mut().poll_recv_from(cx, buf) } /// Gets the value of the `SO_BROADCAST` option for this socket. fn broadcast(&self) -> io::Result { - self.romio_socket.broadcast() + self.get_ref().broadcast() } /// Sets the value of the `SO_BROADCAST` option for this socket. fn set_broadcast(&self, on: bool) -> io::Result<()> { - self.romio_socket.set_broadcast(on) + self.get_ref().set_broadcast(on) } /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. fn multicast_loop_v4(&self) -> io::Result { - self.romio_socket.multicast_loop_v4() + self.get_ref().multicast_loop_v4() } /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { - self.romio_socket.set_multicast_loop_v4(on) + self.get_ref().set_multicast_loop_v4(on) } /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. fn multicast_ttl_v4(&self) -> io::Result { - self.romio_socket.multicast_ttl_v4() + self.get_ref().multicast_ttl_v4() } /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { - self.romio_socket.set_multicast_ttl_v4(ttl) + self.get_ref().set_multicast_ttl_v4(ttl) } /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. fn multicast_loop_v6(&self) -> io::Result { - self.romio_socket.multicast_loop_v6() + self.get_ref().multicast_loop_v6() } /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { - self.romio_socket.set_multicast_loop_v6(on) + self.get_ref().set_multicast_loop_v6(on) } /// Gets the value of the `IP_TTL` option for this socket. fn ttl(&self) -> io::Result { - self.romio_socket.ttl() + self.get_ref().ttl() } /// Sets the value for the `IP_TTL` option on this socket. fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.romio_socket.set_ttl(ttl) + self.get_ref().set_ttl(ttl) } /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. fn join_multicast_v4(&self, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { - self.romio_socket.join_multicast_v4(multiaddr, interface) + self.get_ref().join_multicast_v4(multiaddr, interface) } /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.romio_socket.join_multicast_v6(multiaddr, interface) + self.get_ref().join_multicast_v6(multiaddr, interface) } /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. fn leave_multicast_v4(&self, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { - self.romio_socket.leave_multicast_v4(multiaddr, interface) + self.get_ref().leave_multicast_v4(multiaddr, interface) } /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.romio_socket.leave_multicast_v6(multiaddr, interface) + self.get_ref().leave_multicast_v6(multiaddr, interface) } /// Extracts the raw file descriptor. #[cfg(unix)] fn as_raw_fd(&self) -> std::os::unix::io::RawFd { use std::os::unix::io::AsRawFd; - self.romio_socket.as_raw_fd() + self.get_ref().as_raw_fd() } } diff --git a/runtime-native/src/wasm32.rs b/runtime-native/src/wasm32.rs index a5c2746f..9bf4aec8 100644 --- a/runtime-native/src/wasm32.rs +++ b/runtime-native/src/wasm32.rs @@ -4,50 +4,69 @@ use futures::{future::BoxFuture, task::SpawnError}; use std::io; use std::net::SocketAddr; -use std::pin::Pin; use std::time::{Duration, Instant}; +mod tcp; +mod time; +mod udp; + /// The Native runtime. #[derive(Debug)] pub struct Native; +#[derive(Debug)] +struct Unimplemented; + +impl Unimplemented { + pub fn new(msg: &'static str) -> Self { + panic!(msg) + } +} + impl runtime_raw::Runtime for Native { + type TcpStream = impl runtime_raw::TcpStream; + type TcpListener = impl runtime_raw::TcpListener; + type UdpSocket = impl runtime_raw::UdpSocket; + type Delay = impl runtime_raw::Delay; + type Interval = impl runtime_raw::Interval; + + type ConnectTcpStream = impl Future> + Send; + fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> { let fut = fut.unit_error().compat(); wasm_bindgen_futures::spawn_local(fut); Ok(()) } - fn connect_tcp_stream( - &self, - _addr: &SocketAddr, - ) -> BoxFuture<'static, io::Result>>> { - panic!("Connecting TCP streams is currently not supported in wasm"); + fn connect_tcp_stream(&self, _addr: &SocketAddr) -> Self::ConnectTcpStream { + async { + Ok(Unimplemented::new( + "Connecting TCP streams is currently not supported in wasm", + )) + } } - fn bind_tcp_listener( - &self, - _addr: &SocketAddr, - ) -> io::Result>> { - panic!("Binding TCP listeners is currently not supported in wasm"); + fn bind_tcp_listener(&self, _addr: &SocketAddr) -> io::Result { + Ok(Unimplemented::new( + "Binding TCP listeners is currently not supported in wasm", + )) } - fn bind_udp_socket( - &self, - _addr: &SocketAddr, - ) -> io::Result>> { - panic!("Binding UDP sockets is currently not supported in wasm"); + fn bind_udp_socket(&self, _addr: &SocketAddr) -> io::Result { + Ok(Unimplemented::new( + "Binding UDP sockets is currently not supported in wasm", + )) } - fn new_delay(&self, _dur: Duration) -> Pin> { - panic!("Timers are currently not supported in wasm"); + fn new_delay(&self, _dur: Duration) -> Self::Delay { + Unimplemented::new("Timers are currently not supported in wasm") } - fn new_delay_at(&self, _at: Instant) -> Pin> { - panic!("Timers are currently not supported in wasm"); + fn new_delay_at(&self, _at: Instant) -> Self::Delay { + Unimplemented::new("Timers are currently not supported in wasm") } - fn new_interval(&self, _dur: Duration) -> Pin> { - panic!("Timers are currently not supported in wasm"); + fn new_interval(&self, _dur: Duration) -> Self::Interval { + Unimplemented::new("Timers are currently not supported in wasm") } } diff --git a/runtime-native/src/wasm32/tcp.rs b/runtime-native/src/wasm32/tcp.rs new file mode 100644 index 00000000..387c0550 --- /dev/null +++ b/runtime-native/src/wasm32/tcp.rs @@ -0,0 +1,88 @@ +use futures::io::{AsyncRead, AsyncWrite}; + +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use super::Unimplemented; + +impl runtime_raw::TcpStream for Unimplemented { + fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + unimplemented!() + } + + fn poll_read_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + unimplemented!() + } + + fn take_error(&self) -> io::Result> { + unimplemented!() + } + + fn local_addr(&self) -> io::Result { + unimplemented!() + } + + fn peer_addr(&self) -> io::Result { + unimplemented!() + } + + fn shutdown(&self, _how: std::net::Shutdown) -> std::io::Result<()> { + unimplemented!() + } + + #[cfg(unix)] + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + unimplemented!() + } +} + +impl AsyncRead for Unimplemented { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &mut [u8], + ) -> Poll> { + unimplemented!() + } +} + +impl AsyncWrite for Unimplemented { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll> { + unimplemented!() + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + unimplemented!() + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + unimplemented!() + } +} + +impl runtime_raw::TcpListener for Unimplemented { + type TcpStream = Unimplemented; + + fn local_addr(&self) -> io::Result { + unimplemented!() + } + + fn poll_accept( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + unimplemented!() + } + + /// Extracts the raw file descriptor. + #[cfg(unix)] + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + unimplemented!() + } +} diff --git a/runtime-native/src/wasm32/time.rs b/runtime-native/src/wasm32/time.rs new file mode 100644 index 00000000..37ec1d86 --- /dev/null +++ b/runtime-native/src/wasm32/time.rs @@ -0,0 +1,29 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; + +use futures::{future::Future, stream::Stream}; + +use super::Unimplemented; + +impl runtime_raw::Delay for Unimplemented {} + +impl Future for Unimplemented { + type Output = Instant; + + #[inline] + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + unimplemented!() + } +} + +impl runtime_raw::Interval for Unimplemented {} + +impl Stream for Unimplemented { + type Item = Instant; + + #[inline] + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + unimplemented!() + } +} diff --git a/runtime-native/src/wasm32/udp.rs b/runtime-native/src/wasm32/udp.rs new file mode 100644 index 00000000..d5826a0e --- /dev/null +++ b/runtime-native/src/wasm32/udp.rs @@ -0,0 +1,105 @@ +use std::io; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use super::Unimplemented; + +impl runtime_raw::UdpSocket for Unimplemented { + fn local_addr(&self) -> io::Result { + unimplemented!() + } + + fn poll_send_to( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + _receiver: &SocketAddr, + ) -> Poll> { + unimplemented!() + } + + fn poll_recv_from( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &mut [u8], + ) -> Poll> { + unimplemented!() + } + + /// Gets the value of the `SO_BROADCAST` option for this socket. + fn broadcast(&self) -> io::Result { + unimplemented!() + } + + /// Sets the value of the `SO_BROADCAST` option for this socket. + fn set_broadcast(&self, _on: bool) -> io::Result<()> { + unimplemented!() + } + + /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. + fn multicast_loop_v4(&self) -> io::Result { + unimplemented!() + } + + /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. + fn set_multicast_loop_v4(&self, _on: bool) -> io::Result<()> { + unimplemented!() + } + + /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. + fn multicast_ttl_v4(&self) -> io::Result { + unimplemented!() + } + + /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. + fn set_multicast_ttl_v4(&self, _ttl: u32) -> io::Result<()> { + unimplemented!() + } + + /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. + fn multicast_loop_v6(&self) -> io::Result { + unimplemented!() + } + + /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. + fn set_multicast_loop_v6(&self, _on: bool) -> io::Result<()> { + unimplemented!() + } + + /// Gets the value of the `IP_TTL` option for this socket. + fn ttl(&self) -> io::Result { + unimplemented!() + } + + /// Sets the value for the `IP_TTL` option on this socket. + fn set_ttl(&self, _ttl: u32) -> io::Result<()> { + unimplemented!() + } + + /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. + fn join_multicast_v4(&self, _multiaddr: &Ipv4Addr, _interface: &Ipv4Addr) -> io::Result<()> { + unimplemented!() + } + + /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. + fn join_multicast_v6(&self, _multiaddr: &Ipv6Addr, _interface: u32) -> io::Result<()> { + unimplemented!() + } + + /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. + fn leave_multicast_v4(&self, _multiaddr: &Ipv4Addr, _interface: &Ipv4Addr) -> io::Result<()> { + unimplemented!() + } + + /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. + fn leave_multicast_v6(&self, _multiaddr: &Ipv6Addr, _interface: u32) -> io::Result<()> { + unimplemented!() + } + + /// Extracts the raw file descriptor. + #[cfg(unix)] + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + unimplemented!() + } +} diff --git a/runtime-raw/src/into_dyn.rs b/runtime-raw/src/into_dyn.rs new file mode 100644 index 00000000..16febf41 --- /dev/null +++ b/runtime-raw/src/into_dyn.rs @@ -0,0 +1,88 @@ +use crate::{ + BoxDelay, BoxInterval, BoxTcpListener, BoxTcpStream, BoxUdpSocket, Runtime, TcpListener, +}; +use futures::{future::BoxFuture, ready, task::SpawnError}; +use std::{ + io, + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +/// Maps all the associated types on [`Runtime`] related traits into boxed trait objects to fully +/// type erase a `Runtime`. +#[derive(Debug)] +pub(crate) struct Dyn(T); + +impl Dyn { + /// Create a new [`Dyn`]. + pub const fn new(t: T) -> Self { + Self(t) + } + + /// Project into a pinned [`Dyn`]. + #[allow(unsafe_code)] + pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut T> { + unsafe { Pin::new_unchecked(&mut Pin::get_unchecked_mut(self).0) } + } +} + +impl Runtime for Dyn { + type TcpStream = BoxTcpStream; + type TcpListener = BoxTcpListener; + type UdpSocket = BoxUdpSocket; + type Delay = BoxDelay; + type Interval = BoxInterval; + type ConnectTcpStream = BoxFuture<'static, io::Result>; + + fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> { + self.0.spawn_boxed(fut) + } + + fn connect_tcp_stream(&self, addr: &SocketAddr) -> Self::ConnectTcpStream { + let fut = self.0.connect_tcp_stream(addr); + Box::pin(async { Ok(Box::pin(fut.await?) as _) }) + } + + fn bind_tcp_listener(&self, addr: &SocketAddr) -> io::Result { + Ok(Box::pin(Dyn::new(self.0.bind_tcp_listener(addr)?))) + } + + fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result { + Ok(Box::pin(self.0.bind_udp_socket(addr)?)) + } + + fn new_delay(&self, dur: Duration) -> Self::Delay { + Box::pin(self.0.new_delay(dur)) + } + + fn new_delay_at(&self, at: Instant) -> Self::Delay { + Box::pin(self.0.new_delay_at(at)) + } + + fn new_interval(&self, dur: Duration) -> Self::Interval { + Box::pin(self.0.new_interval(dur)) + } +} + +impl TcpListener for Dyn { + type TcpStream = BoxTcpStream; + + fn local_addr(&self) -> io::Result { + self.0.local_addr() + } + + fn poll_accept( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let stream = ready!(self.get_pin_mut().poll_accept(cx))?; + Poll::Ready(Ok(Box::pin(stream) as _)) + } + + #[cfg(unix)] + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.0.as_raw_fd() + } +} diff --git a/runtime-raw/src/lib.rs b/runtime-raw/src/lib.rs index b15fd767..1a0e79e3 100644 --- a/runtime-raw/src/lib.rs +++ b/runtime-raw/src/lib.rs @@ -21,9 +21,9 @@ use futures::task::SpawnError; use std::cell::Cell; use std::io; use std::net::SocketAddr; -use std::pin::Pin; use std::time::{Duration, Instant}; +mod into_dyn; mod tcp; mod time; mod udp; @@ -32,13 +32,23 @@ pub use tcp::*; pub use time::*; pub use udp::*; +/// A dynamic runtime that returns type-erased implementations. +pub type DynRuntime = dyn Runtime< + TcpStream = BoxTcpStream, + TcpListener = BoxTcpListener, + UdpSocket = BoxUdpSocket, + Delay = BoxDelay, + Interval = BoxInterval, + ConnectTcpStream = BoxFuture<'static, io::Result>, +>; + thread_local! { - static RUNTIME: Cell> = Cell::new(None); + static RUNTIME: Cell> = Cell::new(None); } /// Get the current runtime. #[inline] -pub fn current_runtime() -> &'static dyn Runtime { +pub fn current_runtime() -> &'static DynRuntime { RUNTIME.with(|r| r.get().expect("the runtime has not been set")) } @@ -46,10 +56,10 @@ pub fn current_runtime() -> &'static dyn Runtime { /// /// This function must be called at the beginning of runtime's threads before they start polling /// any futures. -pub fn set_runtime(runtime: &'static dyn Runtime) { +pub fn set_runtime(runtime: impl Runtime) { RUNTIME.with(|r| { assert!(r.get().is_none(), "the runtime has already been set"); - r.set(Some(runtime)) + r.set(Some(Box::leak(Box::new(into_dyn::Dyn::new(runtime))))) }); } @@ -74,6 +84,24 @@ where /// The runtime trait. pub trait Runtime: Send + Sync + 'static { + /// The [`TcpStream`] implementation for this runtime. + type TcpStream: TcpStream; + + /// The [`TcpListener`] implementation for this runtime. + type TcpListener: TcpListener; + + /// The [`UdpSocket`] implementation for this runtime. + type UdpSocket: UdpSocket; + + /// The [`Delay`] implementation for this runtime. + type Delay: Delay; + + /// The [`Interval`] implementation for this runtime. + type Interval: Interval; + + /// The [`Future`] implementation for this runtime's [`connect_tcp_stream`](Runtime::connect_tcp_stream) function. + type ConnectTcpStream: Future> + Send; + /// Spawn a new future. fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError>; @@ -81,38 +109,35 @@ pub trait Runtime: Send + Sync + 'static { /// /// This method is defined on the `Runtime` trait because defining it on /// `TcpStream` would prevent it from being a trait object. - fn connect_tcp_stream( - &self, - addr: &SocketAddr, - ) -> BoxFuture<'static, io::Result>>>; + fn connect_tcp_stream(&self, addr: &SocketAddr) -> Self::ConnectTcpStream; /// Create a new `TcpListener`. /// /// This method is defined on the `Runtime` trait because defining it on /// `TcpListener` would prevent it from being a trait object. - fn bind_tcp_listener(&self, addr: &SocketAddr) -> io::Result>>; + fn bind_tcp_listener(&self, addr: &SocketAddr) -> io::Result; /// Create a new `UdpSocket`. /// /// This method is defined on the `Runtime` trait because defining it on /// `UdpSocket` would prevent it from being a trait object. - fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result>>; + fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result; /// Create a new Future that wakes up after the given duration /// /// This method is defined on the `Runtime` trait because defining it on /// `Delay` would prevent it from being a trait object. - fn new_delay(&self, dur: Duration) -> Pin>; + fn new_delay(&self, dur: Duration) -> Self::Delay; /// Create a new Future that wakes up at the given time. /// /// This method is defined on the `Runtime` trait because defining it on /// `Delay` would prevent it from being a trait object. - fn new_delay_at(&self, at: Instant) -> Pin>; + fn new_delay_at(&self, at: Instant) -> Self::Delay; /// A stream representing notifications at a fixed interval. /// /// This method is defined on the `Runtime` trait because defining it on /// `Interval` would prevent it from being a trait object. - fn new_interval(&self, dur: Duration) -> Pin>; + fn new_interval(&self, dur: Duration) -> Self::Interval; } diff --git a/runtime-raw/src/tcp.rs b/runtime-raw/src/tcp.rs index aeb9a627..bb4ceae5 100644 --- a/runtime-raw/src/tcp.rs +++ b/runtime-raw/src/tcp.rs @@ -4,8 +4,15 @@ use futures::task::{Context, Poll}; use std::fmt::Debug; use std::io; use std::net::SocketAddr; +use std::ops::DerefMut; use std::pin::Pin; +/// A boxed type-erased [`TcpStream`]. +pub type BoxTcpStream = Pin>; + +/// A boxed type-erased [`TcpListener`] that returns boxed type-erased streams. +pub type BoxTcpListener = Pin>>; + /// A TcpStream for this Runtime pub trait TcpStream: AsyncRead + AsyncWrite + Debug + Send { /// Check if the stream can be written to. @@ -36,16 +43,76 @@ pub trait TcpStream: AsyncRead + AsyncWrite + Debug + Send { /// A TcpListener for this Runtime pub trait TcpListener: Debug + Send { + /// The [`TcpStream`] implementation for this [`TcpListener`]. + type TcpStream: TcpStream + 'static; + /// Get the address the listener is listening on. fn local_addr(&self) -> io::Result; /// Check if the listener is ready to accept connections. + fn poll_accept(self: Pin<&mut Self>, cx: &mut Context<'_>) + -> Poll>; + + /// Extracts the raw file descriptor. + #[cfg(unix)] + fn as_raw_fd(&self) -> std::os::unix::io::RawFd; +} + +impl

TcpStream for Pin

+where + P: DerefMut + Debug + Send + Unpin, + P::Target: TcpStream, +{ + fn poll_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().as_mut().poll_write_ready(cx) + } + + fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().as_mut().poll_read_ready(cx) + } + + fn take_error(&self) -> io::Result> { + self.as_ref().take_error() + } + + fn local_addr(&self) -> io::Result { + self.as_ref().local_addr() + } + + fn peer_addr(&self) -> io::Result { + self.as_ref().peer_addr() + } + + fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> { + self.as_ref().shutdown(how) + } + + #[cfg(unix)] + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.as_ref().as_raw_fd() + } +} + +impl

TcpListener for Pin

+where + P: DerefMut + Debug + Send + Unpin, + P::Target: TcpListener, +{ + type TcpStream = ::TcpStream; + + fn local_addr(&self) -> io::Result { + self.as_ref().local_addr() + } + fn poll_accept( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>>>; + ) -> Poll> { + self.get_mut().as_mut().poll_accept(cx) + } - /// Extracts the raw file descriptor. #[cfg(unix)] - fn as_raw_fd(&self) -> std::os::unix::io::RawFd; + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.as_ref().as_raw_fd() + } } diff --git a/runtime-raw/src/time.rs b/runtime-raw/src/time.rs index 1e2d53b9..b3d66187 100644 --- a/runtime-raw/src/time.rs +++ b/runtime-raw/src/time.rs @@ -1,11 +1,33 @@ use std::fmt::Debug; use std::future::Future; +use std::ops::DerefMut; +use std::pin::Pin; use std::time::Instant; use futures::Stream; +/// A boxed type-erased [`Delay`]. +pub type BoxDelay = Pin>; + +/// A boxed type-erased [`Interval`]. +pub type BoxInterval = Pin>; + /// A future representing the notification that an elapsed duration has occurred. pub trait Delay: Future + Debug + Send {} +impl

Delay for Pin

+where + P: DerefMut + Debug + Send + Unpin, + P::Target: Delay, +{ +} + /// A stream representing notifications at a fixed interval. pub trait Interval: Stream + Debug + Send {} + +impl

Interval for Pin

+where + P: DerefMut + Debug + Send + Unpin, + P::Target: Interval, +{ +} diff --git a/runtime-raw/src/udp.rs b/runtime-raw/src/udp.rs index 8e1dd149..da6d61e4 100644 --- a/runtime-raw/src/udp.rs +++ b/runtime-raw/src/udp.rs @@ -1,9 +1,13 @@ use std::fmt::Debug; use std::io; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; +/// A boxed type-erased [`UdpSocket`]. +pub type BoxUdpSocket = Pin>; + /// A UDP socket. pub trait UdpSocket: Debug + Send + Sync { /// Returns the local address that this listener is bound to. @@ -78,3 +82,91 @@ pub trait UdpSocket: Debug + Send + Sync { #[cfg(unix)] fn as_raw_fd(&self) -> std::os::unix::io::RawFd; } + +impl

UdpSocket for Pin

+where + P: DerefMut + Debug + Send + Sync + Unpin, + P::Target: UdpSocket, +{ + fn local_addr(&self) -> io::Result { + self.as_ref().local_addr() + } + + fn poll_send_to( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + receiver: &SocketAddr, + ) -> Poll> { + self.get_mut().as_mut().poll_send_to(cx, buf, receiver) + } + + fn poll_recv_from( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.get_mut().as_mut().poll_recv_from(cx, buf) + } + + fn broadcast(&self) -> io::Result { + self.as_ref().broadcast() + } + + fn set_broadcast(&self, on: bool) -> io::Result<()> { + self.as_ref().set_broadcast(on) + } + + fn multicast_loop_v4(&self) -> io::Result { + self.as_ref().multicast_loop_v4() + } + + fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { + self.as_ref().set_multicast_loop_v4(on) + } + + fn multicast_ttl_v4(&self) -> io::Result { + self.as_ref().multicast_ttl_v4() + } + + fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { + self.as_ref().set_multicast_ttl_v4(ttl) + } + + fn multicast_loop_v6(&self) -> io::Result { + self.as_ref().multicast_loop_v6() + } + + fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { + self.as_ref().set_multicast_loop_v6(on) + } + + fn ttl(&self) -> io::Result { + self.as_ref().ttl() + } + + fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.as_ref().set_ttl(ttl) + } + + fn join_multicast_v4(&self, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { + self.as_ref().join_multicast_v4(multiaddr, interface) + } + + fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { + self.as_ref().join_multicast_v6(multiaddr, interface) + } + + fn leave_multicast_v4(&self, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { + self.as_ref().leave_multicast_v4(multiaddr, interface) + } + + fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { + self.as_ref().leave_multicast_v6(multiaddr, interface) + } + + #[cfg(unix)] + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.as_ref().as_raw_fd() + } +} diff --git a/runtime-tokio/src/lib.rs b/runtime-tokio/src/lib.rs index 0b996719..c9757dae 100644 --- a/runtime-tokio/src/lib.rs +++ b/runtime-tokio/src/lib.rs @@ -2,6 +2,7 @@ //! [Runtime](https://github.com/rustasync/runtime). See the [Runtime //! documentation](https://docs.rs/runtime) for more details. +#![feature(type_alias_impl_trait)] #![warn( missing_debug_implementations, missing_docs, @@ -11,7 +12,7 @@ use futures::{ compat::Future01CompatExt, - future::{BoxFuture, FutureExt, TryFutureExt}, + future::{BoxFuture, Future, FutureExt, TryFutureExt}, task::SpawnError, }; use lazy_static::lazy_static; @@ -19,7 +20,6 @@ use tokio::timer::{Delay as TokioDelay, Interval as TokioInterval}; use std::io; use std::net::SocketAddr; -use std::pin::Pin; use std::sync::{mpsc, Mutex}; use std::thread; use std::time::{Duration, Instant}; @@ -28,21 +28,42 @@ mod tcp; mod time; mod udp; -use tcp::{TcpListener, TcpStream}; -use time::{Delay, Interval}; -use udp::UdpSocket; - /// The default Tokio runtime. #[derive(Debug)] pub struct Tokio; +#[derive(Debug)] +struct Compat(T); + +impl Compat { + fn new(inner: T) -> Self { + Self(inner) + } + + fn get_ref(&self) -> &T { + &self.0 + } + + fn get_mut(&mut self) -> &mut T { + &mut self.0 + } +} + impl runtime_raw::Runtime for Tokio { + type TcpStream = impl runtime_raw::TcpStream; + type TcpListener = impl runtime_raw::TcpListener; + type UdpSocket = impl runtime_raw::UdpSocket; + type Delay = impl runtime_raw::Delay; + type Interval = impl runtime_raw::Interval; + + type ConnectTcpStream = impl Future> + Send; + fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> { lazy_static! { static ref TOKIO_RUNTIME: tokio::runtime::Runtime = { tokio::runtime::Builder::new() .after_start(|| { - runtime_raw::set_runtime(&Tokio); + runtime_raw::set_runtime(Tokio); }) .build() .unwrap() @@ -53,48 +74,32 @@ impl runtime_raw::Runtime for Tokio { Ok(()) } - fn connect_tcp_stream( - &self, - addr: &SocketAddr, - ) -> BoxFuture<'static, io::Result>>> { + fn connect_tcp_stream(&self, addr: &SocketAddr) -> Self::ConnectTcpStream { use futures01::Future; - let tokio_connect = tokio::net::TcpStream::connect(addr); - let connect = tokio_connect.map(|tokio_stream| { - Box::pin(TcpStream { tokio_stream }) as Pin> - }); - connect.compat().boxed() + tokio::net::TcpStream::connect(addr) + .map(Compat::new) + .compat() } - fn bind_tcp_listener( - &self, - addr: &SocketAddr, - ) -> io::Result>> { - let tokio_listener = tokio::net::TcpListener::bind(&addr)?; - Ok(Box::pin(TcpListener { tokio_listener })) + fn bind_tcp_listener(&self, addr: &SocketAddr) -> io::Result { + tokio::net::TcpListener::bind(&addr).map(Compat::new) } - fn bind_udp_socket( - &self, - addr: &SocketAddr, - ) -> io::Result>> { - let tokio_socket = tokio::net::UdpSocket::bind(&addr)?; - Ok(Box::pin(UdpSocket { tokio_socket })) + fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result { + tokio::net::UdpSocket::bind(&addr).map(Compat::new) } - fn new_delay(&self, dur: Duration) -> Pin> { - let tokio_delay = TokioDelay::new(Instant::now() + dur); - Box::pin(Delay { tokio_delay }) + fn new_delay(&self, dur: Duration) -> Self::Delay { + Compat::new(TokioDelay::new(Instant::now() + dur)) } - fn new_delay_at(&self, at: Instant) -> Pin> { - let tokio_delay = TokioDelay::new(at); - Box::pin(Delay { tokio_delay }) + fn new_delay_at(&self, at: Instant) -> Self::Delay { + Compat::new(TokioDelay::new(at)) } - fn new_interval(&self, dur: Duration) -> Pin> { - let tokio_interval = TokioInterval::new(Instant::now(), dur); - Box::pin(Interval { tokio_interval }) + fn new_interval(&self, dur: Duration) -> Self::Interval { + Compat::new(TokioInterval::new(Instant::now(), dur)) } } @@ -103,6 +108,14 @@ impl runtime_raw::Runtime for Tokio { pub struct TokioCurrentThread; impl runtime_raw::Runtime for TokioCurrentThread { + type TcpStream = impl runtime_raw::TcpStream; + type TcpListener = impl runtime_raw::TcpListener; + type UdpSocket = impl runtime_raw::UdpSocket; + type Delay = impl runtime_raw::Delay; + type Interval = impl runtime_raw::Interval; + + type ConnectTcpStream = impl Future> + Send; + fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> { lazy_static! { static ref TOKIO_RUNTIME: Mutex = { @@ -113,7 +126,7 @@ impl runtime_raw::Runtime for TokioCurrentThread { let handle = rt.handle(); tx.send(handle).unwrap(); - runtime_raw::set_runtime(&TokioCurrentThread); + runtime_raw::set_runtime(TokioCurrentThread); let forever = futures01::future::poll_fn(|| { Ok::, ()>(futures01::Async::NotReady) }); @@ -133,47 +146,31 @@ impl runtime_raw::Runtime for TokioCurrentThread { Ok(()) } - fn connect_tcp_stream( - &self, - addr: &SocketAddr, - ) -> BoxFuture<'static, io::Result>>> { + fn connect_tcp_stream(&self, addr: &SocketAddr) -> Self::ConnectTcpStream { use futures01::Future; - let tokio_connect = tokio::net::TcpStream::connect(addr); - let connect = tokio_connect.map(|tokio_stream| { - Box::pin(TcpStream { tokio_stream }) as Pin> - }); - connect.compat().boxed() + tokio::net::TcpStream::connect(addr) + .map(Compat::new) + .compat() } - fn bind_tcp_listener( - &self, - addr: &SocketAddr, - ) -> io::Result>> { - let tokio_listener = tokio::net::TcpListener::bind(&addr)?; - Ok(Box::pin(TcpListener { tokio_listener })) + fn bind_tcp_listener(&self, addr: &SocketAddr) -> io::Result { + tokio::net::TcpListener::bind(&addr).map(Compat::new) } - fn bind_udp_socket( - &self, - addr: &SocketAddr, - ) -> io::Result>> { - let tokio_socket = tokio::net::UdpSocket::bind(&addr)?; - Ok(Box::pin(UdpSocket { tokio_socket })) + fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result { + tokio::net::UdpSocket::bind(&addr).map(Compat::new) } - fn new_delay(&self, dur: Duration) -> Pin> { - let tokio_delay = TokioDelay::new(Instant::now() + dur); - Box::pin(Delay { tokio_delay }) + fn new_delay(&self, dur: Duration) -> Self::Delay { + Compat::new(TokioDelay::new(Instant::now() + dur)) } - fn new_delay_at(&self, at: Instant) -> Pin> { - let tokio_delay = TokioDelay::new(at); - Box::pin(Delay { tokio_delay }) + fn new_delay_at(&self, at: Instant) -> Self::Delay { + Compat::new(TokioDelay::new(at)) } - fn new_interval(&self, dur: Duration) -> Pin> { - let tokio_interval = TokioInterval::new(Instant::now(), dur); - Box::pin(Interval { tokio_interval }) + fn new_interval(&self, dur: Duration) -> Self::Interval { + Compat::new(TokioInterval::new(Instant::now(), dur)) } } diff --git a/runtime-tokio/src/tcp.rs b/runtime-tokio/src/tcp.rs index 4a34b1f9..c62542cb 100644 --- a/runtime-tokio/src/tcp.rs +++ b/runtime-tokio/src/tcp.rs @@ -8,19 +8,11 @@ use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; -#[derive(Debug)] -pub(crate) struct TcpStream { - pub tokio_stream: tokio::net::tcp::TcpStream, -} - -#[derive(Debug)] -pub(crate) struct TcpListener { - pub tokio_listener: tokio::net::tcp::TcpListener, -} +use crate::Compat; -impl runtime_raw::TcpStream for TcpStream { +impl runtime_raw::TcpStream for Compat { fn poll_write_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - match self.tokio_stream.poll_write_ready()? { + match self.get_ref().poll_write_ready()? { futures01::Async::Ready(_) => Poll::Ready(Ok(())), futures01::Async::NotReady => Poll::Pending, } @@ -28,7 +20,7 @@ impl runtime_raw::TcpStream for TcpStream { fn poll_read_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { let mask = mio::Ready::readable(); - match self.tokio_stream.poll_read_ready(mask)? { + match self.get_ref().poll_read_ready(mask)? { futures01::Async::Ready(_) => Poll::Ready(Ok(())), futures01::Async::NotReady => Poll::Pending, } @@ -39,70 +31,70 @@ impl runtime_raw::TcpStream for TcpStream { } fn local_addr(&self) -> io::Result { - self.tokio_stream.local_addr() + self.get_ref().local_addr() } fn peer_addr(&self) -> io::Result { - self.tokio_stream.peer_addr() + self.get_ref().peer_addr() } fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> { - self.tokio_stream.shutdown(how) + self.get_ref().shutdown(how) } #[cfg(unix)] fn as_raw_fd(&self) -> std::os::unix::io::RawFd { use std::os::unix::io::AsRawFd; - self.tokio_stream.as_raw_fd() + self.get_ref().as_raw_fd() } } -impl AsyncRead for TcpStream { +impl AsyncRead for Compat { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, mut buf: &mut [u8], ) -> Poll> { - let mut stream = Compat01As03::new(&self.tokio_stream); + let mut stream = Compat01As03::new(self.get_ref()); Pin::new(&mut stream).poll_read(cx, &mut buf) } } -impl AsyncWrite for TcpStream { +impl AsyncWrite for Compat { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - let mut stream = Compat01As03::new(&self.tokio_stream); + let mut stream = Compat01As03::new(self.get_ref()); Pin::new(&mut stream).poll_write(cx, &buf) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut stream = Compat01As03::new(&self.tokio_stream); + let mut stream = Compat01As03::new(self.get_ref()); Pin::new(&mut stream).poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut stream = Compat01As03::new(&self.tokio_stream); + let mut stream = Compat01As03::new(self.get_ref()); Pin::new(&mut stream).poll_close(cx) } } -impl runtime_raw::TcpListener for TcpListener { +impl runtime_raw::TcpListener for Compat { + type TcpStream = Compat; + fn local_addr(&self) -> io::Result { - self.tokio_listener.local_addr() + self.get_ref().local_addr() } fn poll_accept( self: Pin<&mut Self>, _cx: &mut Context<'_>, - ) -> Poll>>> { - let listener = unsafe { &mut self.get_unchecked_mut().tokio_listener }; - match listener.poll_accept()? { + ) -> Poll> { + match self.get_mut().get_mut().poll_accept()? { futures01::Async::Ready((tokio_stream, _)) => { - let stream = Box::pin(TcpStream { tokio_stream }); - Poll::Ready(Ok(stream)) + Poll::Ready(Ok(Compat::new(tokio_stream))) } futures01::Async::NotReady => Poll::Pending, } @@ -111,6 +103,6 @@ impl runtime_raw::TcpListener for TcpListener { #[cfg(unix)] fn as_raw_fd(&self) -> std::os::unix::io::RawFd { use std::os::unix::io::AsRawFd; - self.tokio_listener.as_raw_fd() + self.get_ref().as_raw_fd() } } diff --git a/runtime-tokio/src/time.rs b/runtime-tokio/src/time.rs index 0931a5d4..1af11fc7 100644 --- a/runtime-tokio/src/time.rs +++ b/runtime-tokio/src/time.rs @@ -4,41 +4,33 @@ use std::time::Instant; use futures::compat::Compat01As03; use futures::prelude::*; -use tokio::timer::{Delay as TokioDelay, Interval as TokioInterval}; +use futures::ready; -#[derive(Debug)] -pub(crate) struct Delay { - pub(crate) tokio_delay: TokioDelay, -} +use crate::Compat; -impl runtime_raw::Delay for Delay {} +impl runtime_raw::Delay for Compat {} -impl Future for Delay { +impl Future for Compat { type Output = Instant; #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut fut = Compat01As03::new(&mut self.tokio_delay); - futures::ready!(Pin::new(&mut fut).poll(cx)).unwrap(); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut fut = Compat01As03::new(self.get_mut().get_mut()); + ready!(Pin::new(&mut fut).poll(cx)).unwrap(); Poll::Ready(Instant::now()) } } -#[derive(Debug)] -pub(crate) struct Interval { - pub(crate) tokio_interval: TokioInterval, -} - -impl runtime_raw::Interval for Interval {} +impl runtime_raw::Interval for Compat {} -impl Stream for Interval { +impl Stream for Compat { type Item = Instant; #[inline] - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut stream = Compat01As03::new(&mut self.tokio_interval); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut stream = Compat01As03::new(self.get_mut().get_mut()); // https://docs.rs/tokio/0.1.20/tokio/timer/struct.Error.html - futures::ready!(Pin::new(&mut stream).poll_next(cx)) + ready!(Pin::new(&mut stream).poll_next(cx)) .unwrap() .unwrap(); Poll::Ready(Some(Instant::now())) diff --git a/runtime-tokio/src/udp.rs b/runtime-tokio/src/udp.rs index 69fd1c9c..860669e1 100644 --- a/runtime-tokio/src/udp.rs +++ b/runtime-tokio/src/udp.rs @@ -5,14 +5,11 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::pin::Pin; use std::task::{Context, Poll}; -#[derive(Debug)] -pub(crate) struct UdpSocket { - pub tokio_socket: tokio::net::udp::UdpSocket, -} +use crate::Compat; -impl runtime_raw::UdpSocket for UdpSocket { +impl runtime_raw::UdpSocket for Compat { fn local_addr(&self) -> io::Result { - self.tokio_socket.local_addr() + self.get_ref().local_addr() } fn poll_send_to( @@ -21,8 +18,7 @@ impl runtime_raw::UdpSocket for UdpSocket { buf: &[u8], receiver: &SocketAddr, ) -> Poll> { - let socket = unsafe { &mut self.get_unchecked_mut().tokio_socket }; - match socket.poll_send_to(&buf, &receiver)? { + match self.get_mut().get_mut().poll_send_to(&buf, &receiver)? { futures01::Async::Ready(size) => Poll::Ready(Ok(size)), futures01::Async::NotReady => Poll::Pending, } @@ -33,8 +29,7 @@ impl runtime_raw::UdpSocket for UdpSocket { _cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - let socket = unsafe { &mut self.get_unchecked_mut().tokio_socket }; - match socket.poll_recv_from(buf)? { + match self.get_mut().get_mut().poll_recv_from(buf)? { futures01::Async::Ready((size, addr)) => Poll::Ready(Ok((size, addr))), futures01::Async::NotReady => Poll::Pending, } @@ -42,78 +37,78 @@ impl runtime_raw::UdpSocket for UdpSocket { /// Gets the value of the `SO_BROADCAST` option for this socket. fn broadcast(&self) -> io::Result { - self.tokio_socket.broadcast() + self.get_ref().broadcast() } /// Sets the value of the `SO_BROADCAST` option for this socket. fn set_broadcast(&self, on: bool) -> io::Result<()> { - self.tokio_socket.set_broadcast(on) + self.get_ref().set_broadcast(on) } /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. fn multicast_loop_v4(&self) -> io::Result { - self.tokio_socket.multicast_loop_v4() + self.get_ref().multicast_loop_v4() } /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { - self.tokio_socket.set_multicast_loop_v4(on) + self.get_ref().set_multicast_loop_v4(on) } /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. fn multicast_ttl_v4(&self) -> io::Result { - self.tokio_socket.multicast_ttl_v4() + self.get_ref().multicast_ttl_v4() } /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { - self.tokio_socket.set_multicast_ttl_v4(ttl) + self.get_ref().set_multicast_ttl_v4(ttl) } /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. fn multicast_loop_v6(&self) -> io::Result { - self.tokio_socket.multicast_loop_v6() + self.get_ref().multicast_loop_v6() } /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { - self.tokio_socket.set_multicast_loop_v6(on) + self.get_ref().set_multicast_loop_v6(on) } /// Gets the value of the `IP_TTL` option for this socket. fn ttl(&self) -> io::Result { - self.tokio_socket.ttl() + self.get_ref().ttl() } /// Sets the value for the `IP_TTL` option on this socket. fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.tokio_socket.set_ttl(ttl) + self.get_ref().set_ttl(ttl) } /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. fn join_multicast_v4(&self, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { - self.tokio_socket.join_multicast_v4(multiaddr, interface) + self.get_ref().join_multicast_v4(multiaddr, interface) } /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.tokio_socket.join_multicast_v6(multiaddr, interface) + self.get_ref().join_multicast_v6(multiaddr, interface) } /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. fn leave_multicast_v4(&self, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { - self.tokio_socket.leave_multicast_v4(multiaddr, interface) + self.get_ref().leave_multicast_v4(multiaddr, interface) } /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.tokio_socket.leave_multicast_v6(multiaddr, interface) + self.get_ref().leave_multicast_v6(multiaddr, interface) } /// Extracts the raw file descriptor. #[cfg(unix)] fn as_raw_fd(&self) -> std::os::unix::io::RawFd { use std::os::unix::io::AsRawFd; - self.tokio_socket.as_raw_fd() + self.get_ref().as_raw_fd() } } diff --git a/src/net/tcp.rs b/src/net/tcp.rs index 26524eb8..58eb9c01 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -67,7 +67,7 @@ use futures::task::{Context, Poll}; /// ``` #[derive(Debug)] pub struct TcpStream { - inner: Pin>, + inner: runtime_raw::BoxTcpStream, } impl TcpStream { @@ -214,8 +214,8 @@ impl AsyncWrite for TcpStream { pub struct ConnectFuture { addrs: Option>>, last_err: Option, - future: Option>>>>, - runtime: &'static dyn runtime_raw::Runtime, + future: Option>>, + runtime: &'static runtime_raw::DynRuntime, } impl Future for ConnectFuture { @@ -309,7 +309,7 @@ impl fmt::Debug for ConnectFuture { /// ``` #[derive(Debug)] pub struct TcpListener { - inner: Pin>, + inner: runtime_raw::BoxTcpListener, } impl TcpListener { diff --git a/src/net/udp.rs b/src/net/udp.rs index 00b4f22f..a6450648 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -54,7 +54,7 @@ use std::task::{Context, Poll}; /// ``` #[derive(Debug)] pub struct UdpSocket { - inner: Pin>, + inner: runtime_raw::BoxUdpSocket, } impl UdpSocket { diff --git a/src/time/delay.rs b/src/time/delay.rs index 03d162c3..c43fc022 100644 --- a/src/time/delay.rs +++ b/src/time/delay.rs @@ -8,7 +8,7 @@ use std::time::{Duration, Instant}; /// A future representing the notification that an elapsed duration has occurred. #[must_use = "futures do nothing unless awaited"] pub struct Delay { - inner: Pin>, + inner: runtime_raw::BoxDelay, } impl Delay { diff --git a/src/time/interval.rs b/src/time/interval.rs index 0bc397c3..118caf43 100644 --- a/src/time/interval.rs +++ b/src/time/interval.rs @@ -8,7 +8,7 @@ use std::time::{Duration, Instant}; /// A stream representing notifications at a fixed interval. #[must_use = "streams do nothing unless polled"] pub struct Interval { - inner: Pin>, + inner: runtime_raw::BoxInterval, } impl Interval {