Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncWrite::poll_write_buf is not invoked when using write_buf on a object that needs dereferencing #2610

Closed
eaufavor opened this issue Jun 11, 2020 · 5 comments
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-io Module: tokio/io

Comments

@eaufavor
Copy link

Version
tokio v0.2.21

Platform
Linux

Description
I noticed that poll_write is called instead of poll_write_buf when I call write_buf on a Boxed object.

I tried this code:

use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Buf, Bytes};
use tokio::io::{self, AsyncWrite, AsyncWriteExt};

struct MockWrite {}

impl AsyncWrite for MockWrite {
    fn poll_write(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        _buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        println!("poll_write");
        Poll::Ready(Ok(0))
    }

    fn poll_write_buf<B: Buf>(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        _buf: &mut B,
    ) -> Poll<io::Result<usize>> {
        println!("poll_write_buf");
        Poll::Ready(Ok(0))
    }

    #[inline]
    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
}


#[tokio::main(core_threads = 1)]
async fn main() {
    let mut buf = Bytes::from("1");
    let mut io = MockWrite{};
    io.write_buf(&mut buf).await.unwrap();

    let mut buf2 = Bytes::from("1");
    let mut io2 = Box::new(MockWrite{});
    io2.write_buf(&mut buf2).await.unwrap();

    let mut buf3 = Bytes::from("1");
    let mut io3 = MockWrite{};
    let mut io3 = &mut io3;
    (&mut io3).write_buf(&mut buf3).await.unwrap();
}

I expected to see this happen: (output of the code)

poll_write_buf
poll_write_buf
poll_write_buf

Instead, this happened:

poll_write_buf
poll_write
poll_write

The same issue applies to the read_buf in AsyncReadExt as well.

This issue may not impact the correctness of write_buf/read_buf but it may impact its performance since it disables vectored IO.

I believe I have a fix for it.

@eaufavor eaufavor added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Jun 11, 2020
@Darksonn Darksonn added the M-io Module: tokio/io label Jun 11, 2020
@Darksonn
Copy link
Contributor

Thanks! To fix it, you should modify this part of the code. Do you want to open a PR with that too?

eaufavor added a commit to eaufavor/tokio that referenced this issue Jun 11, 2020
Add the implementation of poll_{read,write}_buf in deref_async_{read,
write} macros so that the underlying poll_{read,write}_buf functions
are correctly called.

Fixes: tokio-rs#2610
@Darksonn
Copy link
Contributor

The compilation failure in #2611 explains why the buf version is not implemented: It is a generic function, so it cannot be called on a trait object. Sorry, this is not possible to fix due to how the Rust programming language works.

@eaufavor
Copy link
Author

eaufavor commented Jun 11, 2020

Sad!

I realized that I can have a workaround in my code by creating a minimal clone of AsyncWrite.
That solves my problem without requiring the real issue inside tokio to be fixed.

#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteVec<'a, W, B> {
    writer: &'a mut W,
    buf: &'a mut B,
}

pub trait AsyncWriteVec {
    fn poll_write_vec<B: Buf>(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        _buf: &mut B,
    ) -> Poll<io::Result<usize>>;

    fn write_vec <'a, B>(&'a mut self, src: &'a mut B) -> WriteVec<'a, Self, B>
    where
        Self: Sized,
        B: Buf,
    {
        WriteVec { writer: self, buf: src }
    }
}

impl<W, B> Future for WriteVec<'_, W, B>
where
    W: AsyncWriteVec,
    B: Buf,
{
    type Output = io::Result<usize>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
        // safety: no data is moved from self
        unsafe {
            let me = self.get_unchecked_mut();
            Pin::new_unchecked(&mut *me.writer).poll_write_vec(cx, &mut me.buf)
        }
    }
}

and

impl AsyncWriteVec for MockWrite {
    fn poll_write_vec<B: Buf>(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut B,
    ) -> Poll<io::Result<usize>> {
        self.poll_write_buf(cx, buf)
    }
}

Hopefully the snippet above could help others if they encounter the same issue.

@taiki-e
Copy link
Member

taiki-e commented Jun 12, 2020

@eaufavor

        // safety: no data is moved from self
        unsafe {
            let me = self.get_unchecked_mut();
            Pin::new_unchecked(&mut *me.writer).poll_write_vec(cx, &mut me.buf)
        }
 

BTW, this is a Pin<&mut Type> to Pin<Field> projection and is unsound if W is not Unpin (you can move W after WriteVec dropped):
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=2145ea8ea52a73e63f65b1d5b34850c2

The correct projection is Pin<&mut Type> to Pin<&mut Field>. In this case, it is Pin<&mut WriteVec<'_, W, B>> to Pin<&mut &mut W>, and it needs to add W: Unpin bounds to convert Pin<&mut &mut W> to Pin<&mut W>.

@taiki-e
Copy link
Member

taiki-e commented Jun 12, 2020

Filed #2612 as I confirmed that read_buf and write_buf have the same unsafe code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-io Module: tokio/io
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants