Skip to content
This repository was archived by the owner on Oct 30, 2019. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runtime-native/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! A cross-platform asynchronous [Runtime](https://github.com/rustasync/runtime). See the [Runtime
//! documentation](https://docs.rs/runtime) for more details.

#![feature(type_alias_impl_trait)]
#![deny(unsafe_code)]
#![warn(
missing_debug_implementations,
Expand Down
78 changes: 40 additions & 38 deletions runtime-native/src/not_wasm32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@ mod tcp;
mod time;
mod udp;

use tcp::{TcpListener, TcpStream};
use time::{Delay, Interval};
use udp::UdpSocket;

lazy_static! {
static ref JULIEX_THREADPOOL: juliex::ThreadPool = {
juliex::ThreadPool::with_setup(|| {
runtime_raw::set_runtime(&Native);
runtime_raw::set_runtime(Native);
})
};
}
Expand All @@ -28,53 +24,59 @@ lazy_static! {
#[derive(Debug)]
pub struct Native;

#[derive(Debug)]
struct Compat<T>(T);

impl<T> Compat<T> {
fn new(inner: T) -> Self {
Self(inner)
}

fn get_ref(&self) -> &T {
&self.0
}

#[allow(unsafe_code)]
fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut T> {
unsafe { Pin::new_unchecked(&mut Pin::get_unchecked_mut(self).0) }
}
}

impl runtime_raw::Runtime for Native {
type TcpStream = impl runtime_raw::TcpStream;
type TcpListener = impl runtime_raw::TcpListener<TcpStream = Self::TcpStream>;
type UdpSocket = impl runtime_raw::UdpSocket;
type Delay = impl runtime_raw::Delay;
type Interval = impl runtime_raw::Interval;

type ConnectTcpStream = impl Future<Output = io::Result<Self::TcpStream>> + Send;

fn spawn_boxed(&self, fut: BoxFuture<'static, ()>) -> Result<(), SpawnError> {
JULIEX_THREADPOOL.spawn_boxed(fut.into());
JULIEX_THREADPOOL.spawn_boxed(fut);
Ok(())
}

fn connect_tcp_stream(
&self,
addr: &SocketAddr,
) -> BoxFuture<'static, io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> {
let romio_connect = romio::TcpStream::connect(addr);
let connect = romio_connect.map(|res| {
res.map(|romio_stream| {
Box::pin(TcpStream { romio_stream }) as Pin<Box<dyn runtime_raw::TcpStream>>
})
});
connect.boxed()
fn connect_tcp_stream(&self, addr: &SocketAddr) -> Self::ConnectTcpStream {
romio::TcpStream::connect(addr).map_ok(Compat::new)
}

fn bind_tcp_listener(
&self,
addr: &SocketAddr,
) -> io::Result<Pin<Box<dyn runtime_raw::TcpListener>>> {
let romio_listener = romio::TcpListener::bind(&addr)?;
Ok(Box::pin(TcpListener { romio_listener }))
fn bind_tcp_listener(&self, addr: &SocketAddr) -> io::Result<Self::TcpListener> {
romio::TcpListener::bind(&addr).map(Compat::new)
}

fn bind_udp_socket(
&self,
addr: &SocketAddr,
) -> io::Result<Pin<Box<dyn runtime_raw::UdpSocket>>> {
let romio_socket = romio::UdpSocket::bind(&addr)?;
Ok(Box::pin(UdpSocket { romio_socket }))
fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result<Self::UdpSocket> {
romio::UdpSocket::bind(&addr).map(Compat::new)
}

fn new_delay(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Delay>> {
let async_delay = AsyncDelay::new(dur);
Box::pin(Delay { async_delay })
fn new_delay(&self, dur: Duration) -> Self::Delay {
Compat::new(AsyncDelay::new(dur))
}

fn new_delay_at(&self, at: Instant) -> Pin<Box<dyn runtime_raw::Delay>> {
let async_delay = AsyncDelay::new_at(at);
Box::pin(Delay { async_delay })
fn new_delay_at(&self, at: Instant) -> Self::Delay {
Compat::new(AsyncDelay::new_at(at))
}

fn new_interval(&self, dur: Duration) -> Pin<Box<dyn runtime_raw::Interval>> {
let async_interval = AsyncInterval::new(dur);
Box::pin(Interval { async_interval })
fn new_interval(&self, dur: Duration) -> Self::Interval {
Compat::new(AsyncInterval::new(dur))
}
}
71 changes: 29 additions & 42 deletions runtime-native/src/not_wasm32/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,105 +1,92 @@
use futures::prelude::*;
use romio::raw::{AsyncReadReady, AsyncReady, AsyncWriteReady};

use super::Compat;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};

#[derive(Debug)]
pub(crate) struct TcpStream {
pub romio_stream: romio::tcp::TcpStream,
}

#[derive(Debug)]
pub(crate) struct TcpListener {
pub romio_listener: romio::tcp::TcpListener,
}

impl runtime_raw::TcpStream for TcpStream {
fn poll_write_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.romio_stream)
.poll_write_ready(cx)
.map_ok(|_| ())
impl runtime_raw::TcpStream for Compat<romio::TcpStream> {
fn poll_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_write_ready(cx).map_ok(|_| ())
}

fn poll_read_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.romio_stream)
.poll_read_ready(cx)
.map_ok(|_| ())
fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_read_ready(cx).map_ok(|_| ())
}

fn take_error(&self) -> io::Result<Option<io::Error>> {
Ok(None)
}

fn local_addr(&self) -> io::Result<SocketAddr> {
self.romio_stream.local_addr()
self.get_ref().local_addr()
}

fn peer_addr(&self) -> io::Result<SocketAddr> {
self.romio_stream.peer_addr()
self.get_ref().peer_addr()
}

fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
self.romio_stream.shutdown(how)
self.get_ref().shutdown(how)
}

#[cfg(unix)]
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
use std::os::unix::io::AsRawFd;
self.romio_stream.as_raw_fd()
self.get_ref().as_raw_fd()
}
}

impl AsyncRead for TcpStream {
impl AsyncRead for Compat<romio::TcpStream> {
fn poll_read(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.romio_stream).poll_read(cx, &mut buf)
self.get_pin_mut().poll_read(cx, &mut buf)
}
}

impl AsyncWrite for TcpStream {
impl AsyncWrite for Compat<romio::TcpStream> {
fn poll_write(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.romio_stream).poll_write(cx, &buf)
self.get_pin_mut().poll_write(cx, &buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.romio_stream).poll_flush(cx)
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.romio_stream).poll_close(cx)
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_close(cx)
}
}

impl runtime_raw::TcpListener for TcpListener {
impl runtime_raw::TcpListener for Compat<romio::TcpListener> {
type TcpStream = Compat<romio::TcpStream>;

fn local_addr(&self) -> io::Result<SocketAddr> {
self.romio_listener.local_addr()
self.get_ref().local_addr()
}

fn poll_accept(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> {
Pin::new(&mut self.romio_listener)
) -> Poll<io::Result<Self::TcpStream>> {
self.get_pin_mut()
.poll_ready(cx)
.map_ok(|(romio_stream, _)| {
Box::pin(TcpStream { romio_stream }) as Pin<Box<dyn runtime_raw::TcpStream>>
})
.map_ok(|(romio_stream, _)| Compat::new(romio_stream))
}

/// Extracts the raw file descriptor.
#[cfg(unix)]
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
use std::os::unix::io::AsRawFd;
self.romio_listener.as_raw_fd()
self.get_ref().as_raw_fd()
}
}
27 changes: 10 additions & 17 deletions runtime-native/src/not_wasm32/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,31 @@ use std::task::{Context, Poll};
use std::time::Instant;

use futures::prelude::*;
use futures::ready;
use futures_timer::{Delay as AsyncDelay, Interval as AsyncInterval};

#[derive(Debug)]
pub(crate) struct Delay {
pub(crate) async_delay: AsyncDelay,
}
use super::Compat;

impl runtime_raw::Delay for Delay {}
impl runtime_raw::Delay for Compat<AsyncDelay> {}

impl Future for Delay {
impl Future for Compat<AsyncDelay> {
type Output = Instant;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
futures::ready!(Pin::new(&mut self.async_delay).poll(cx)).unwrap();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(self.get_pin_mut().poll(cx)).unwrap();
Poll::Ready(Instant::now())
}
}

#[derive(Debug)]
pub(crate) struct Interval {
pub(crate) async_interval: AsyncInterval,
}

impl runtime_raw::Interval for Interval {}
impl runtime_raw::Interval for Compat<AsyncInterval> {}

impl Stream for Interval {
impl Stream for Compat<AsyncInterval> {
type Item = Instant;

#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
futures::ready!(Pin::new(&mut self.async_interval).poll_next(cx)).unwrap();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
ready!(self.get_pin_mut().poll_next(cx)).unwrap();
Poll::Ready(Some(Instant::now()))
}
}
Loading