Skip to content

Commit

Permalink
Removed all unsafe code
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerhawkes committed Jun 18, 2021
1 parent 90bd5cc commit 949ce69
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 60 deletions.
57 changes: 27 additions & 30 deletions futures-util/src/future/poll_immediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ use crate::FutureExt;
use core::pin::Pin;
use futures_core::task::{Context, Poll};
use futures_core::{FusedFuture, Future, Stream};
use pin_project_lite::pin_project;

/// Future for the [`poll_immediate`](poll_immediate()) function.
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PollImmediate<T>(Option<T>);
pin_project! {
/// Future for the [`poll_immediate`](poll_immediate()) function.
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PollImmediate<T> {
#[pin]
future: Option<T>
}
}

impl<T, F> Future for PollImmediate<F>
where
Expand All @@ -17,21 +23,22 @@ where

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
// # Safety
// This is the only time that this future will ever be polled.
let mut this = self.project();
let inner =
unsafe { self.get_unchecked_mut().0.take().expect("PollOnce polled after completion") };
crate::pin_mut!(inner);
this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion");
match inner.poll(cx) {
Poll::Ready(t) => Poll::Ready(Some(t)),
Poll::Ready(t) => {
this.future.set(None);
Poll::Ready(Some(t))
}
Poll::Pending => Poll::Ready(None),
}
}
}

impl<T: Future> FusedFuture for PollImmediate<T> {
fn is_terminated(&self) -> bool {
self.0.is_none()
self.future.is_none()
}
}

Expand Down Expand Up @@ -64,25 +71,14 @@ where
type Item = Poll<T>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe {
// # Safety
// We never move the inner value until it is done. We only get a reference to it.
let inner = &mut self.get_unchecked_mut().0;
let fut = match inner.as_mut() {
// inner is gone, so we can signal that the stream is closed.
None => return Poll::Ready(None),
Some(inner) => inner,
};
let fut = Pin::new_unchecked(fut);
Poll::Ready(Some(fut.poll(cx).map(|t| {
// # Safety
// The inner option value is done, so we need to drop it. We do it without moving it
// by using drop in place. We then write over the value without trying to drop it first
// This should uphold all the safety requirements of `Pin`
std::ptr::drop_in_place(inner);
std::ptr::write(inner, None);
let mut this = self.project();
match this.future.as_mut().as_pin_mut() {
// inner is gone, so we can signal that the stream is closed.
None => return Poll::Ready(None),
Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| {
this.future.set(None);
t
})))
}))),
}
}
}
Expand All @@ -103,7 +99,7 @@ where
/// # });
/// ```
pub fn poll_immediate<F: Future>(f: F) -> PollImmediate<F> {
assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate(Some(f)))
assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate { future: Some(f) })
}

/// Future for the [`poll_immediate_reuse`](poll_immediate_reuse()) function.
Expand All @@ -119,7 +115,8 @@ where

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, F>> {
let mut inner = self.get_mut().0.take().expect("PollOnceReuse polled after completion");
let mut inner =
self.get_mut().0.take().expect("PollImmediateReuse polled after completion");
match inner.poll_unpin(cx) {
Poll::Ready(t) => Poll::Ready(Ok(t)),
Poll::Pending => Poll::Ready(Err(inner)),
Expand Down
57 changes: 27 additions & 30 deletions futures-util/src/stream/poll_immediate.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,50 @@
use futures_core::task::{Context, Poll};
use futures_core::Stream;
use pin_project_lite::pin_project;
use std::pin::Pin;

/// Stream for the [`poll_immediate`](poll_immediate()) function.
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PollImmediate<S>(Option<S>);
pin_project! {
/// Stream for the [`poll_immediate`](poll_immediate()) function.
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PollImmediate<S> {
#[pin]
stream: Option<S>
}
}

impl<T, S> Stream for PollImmediate<S>
where
S: Stream<Item = T>,
S: Stream<Item = T>,
{
type Item = Poll<T>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe {
// # Safety
// We never move the inner value until it is done. We only get a reference to it.
let inner = &mut self.get_unchecked_mut().0;
let fut = match inner.as_mut() {
// inner is gone, so we can continue to signal that the stream is closed.
None => return Poll::Ready(None),
Some(inner) => inner,
};
let stream = Pin::new_unchecked(fut);
match stream.poll_next(cx) {
Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))),
Poll::Ready(None) => {
// # Safety
// The inner stream is done, so we need to drop it. We do it without moving it
// by using drop in place. We then write over the value without trying to drop it first
// This should uphold all the safety requirements of `Pin`
std::ptr::drop_in_place(inner);
std::ptr::write(inner, None);
Poll::Ready(None)
}
Poll::Pending => Poll::Ready(Some(Poll::Pending)),
let mut this = self.project();
let stream = match this.stream.as_mut().as_pin_mut() {
// inner is gone, so we can continue to signal that the stream is closed.
None => return Poll::Ready(None),
Some(inner) => inner,
};

match stream.poll_next(cx) {
Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))),
Poll::Ready(None) => {
this.stream.set(None);
Poll::Ready(None)
}
Poll::Pending => Poll::Ready(Some(Poll::Pending)),
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.0.as_ref().map_or((0, Some(0)), Stream::size_hint)
self.stream.as_ref().map_or((0, Some(0)), Stream::size_hint)
}
}

impl<S: Stream> super::FusedStream for PollImmediate<S> {
fn is_terminated(&self) -> bool {
self.0.is_none()
self.stream.is_none()
}
}

Expand Down Expand Up @@ -76,5 +73,5 @@ impl<S: Stream> super::FusedStream for PollImmediate<S> {
/// # });
/// ```
pub fn poll_immediate<S: Stream>(s: S) -> PollImmediate<S> {
super::assert_stream::<Poll<S::Item>, PollImmediate<S>>(PollImmediate(Some(s)))
super::assert_stream::<Poll<S::Item>, PollImmediate<S>>(PollImmediate { stream: Some(s) })
}

0 comments on commit 949ce69

Please sign in to comment.