-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SinkWriter
in analogy to StreamReader
to tokio_util
#5070
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You asked me how to avoid the Vec<u8>
allocation. The changes below should do it.
tokio-util/src/io/sink_writer.rs
Outdated
impl<S> SinkWriter<S> | ||
where | ||
S: Sink<Vec<u8>, Error = io::Error>, | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl<S> SinkWriter<S> | |
where | |
S: Sink<Vec<u8>, Error = io::Error>, | |
{ | |
impl<S> SinkWriter<S> { |
tokio-util/src/io/sink_writer.rs
Outdated
|
||
impl<S, E> AsyncWrite for SinkWriter<S> | ||
where | ||
S: Sink<Vec<u8>, Error = E>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
S: Sink<Vec<u8>, Error = E>, | |
for<'a> S: Sink<&'a [u8], Error = E>, |
tokio-util/src/io/sink_writer.rs
Outdated
) -> Poll<Result<usize, io::Error>> { | ||
match self.as_mut().project().inner.poll_ready(cx) { | ||
Poll::Ready(Ok(())) => { | ||
if let Err(e) = self.as_mut().project().inner.start_send(buf.to_vec()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if let Err(e) = self.as_mut().project().inner.start_send(buf.to_vec()) { | |
if let Err(e) = self.as_mut().project().inner.start_send(buf) { |
@Darksonn, as mentioned, this doesn't work (or is at least not that simple), see commit 3d7097b together with an additional trait bound - adding that leads essentially to case 2) described above. Removing these trait bounds is your proposal and results in no error if I comment out line 21 in |
That's because your Sink in the test doesn't implement |
The core problem here is that the mpsc channel you're using doesn't implement
You'll need to implement the |
Hi both, that is clear (I can at least read :) ). But by using So how does the implementation would need to change that it can be used like in the test here? As pushed initially, copying the |
Using When I call For your test case, the simplest way to implement a |
We're getting there, that is exactly what I was looking at. I also understand the requirement for the With that in mind, this quote here is not yet fully clear to me implication-wise:
Is a specific lifetime not especially covered by a Sink that expects 'any' lifetime? I'm more than happy to admit that I used that initially to make the linter happy and thought I roughly understand the consequences. |
You can't make
A specific lifetime With |
FWIW, my inclination would be to have a |
I think that if we add a wrapper like this: impl<'a, S> Sink<&'a [u8]> for CopyToBytes<S>
where
S: Sink<Bytes>,
{
type Error = S::Error;
...
} Then that's good enough. You would be able to do |
Thank you two for those considerations. From what I understand, I agree. Can you think of a construct fit for a separate test which would work with a |
Three non-toy
All of these are plausible things to want, and all of them do their own internal copies that don't benefit from being passed |
If you just need the sink for a test, you can write a sink that just appends the given bytes to a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And note that CopyToBytes
has the same "inadvisable" comment that needs thinking about.
StreamReader
has this comment because the unit of poll_read
is bytes, but the unit of the Stream
is Buf
s, and thus StreamReader
can have a partially read item buffered inside it, making it hard (but not impossible) to avoid getting buffers out of order if you mix StreamReader::poll_read
with Stream::next
.
SinkWriter
doesn't have any internal buffering, and thus doesn't have this issue - there's no way to get writes out of order with code that appears to handle it all in-order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks good, but the documentation needs work.
Co-authored-by: Alice Ryhl <aliceryhl@google.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks great to me - just documentation to tweak now.
Co-authored-by: Simon Farnsworth <simon@farnz.org.uk>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also suggest looking at the final docs generated by rustdoc
- I find I'm better at spotting errors in the docs when I do that, as compared to looking in the source or at rust-analyzer overlays.
Ah you're right, I was a bit careless there. I'll take some time and check the finalised documentation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise, now LGTM. @Darksonn and the rest of the Tokio team have the final say, though (I'm just a contributor).
tokio-util/src/io/sink_writer.rs
Outdated
) -> Poll<Result<usize, io::Error>> { | ||
match self.as_mut().project().inner.poll_ready(cx) { | ||
Poll::Ready(Ok(())) => { | ||
if let Err(e) = self.as_mut().project().inner.start_send(buf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I think this as_mut
is also not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both of those calls need a pinned mutable reference here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are passed a pinned mutable reference - indeed, looking at this code, I think there's room to refactor with let this = self.project()
before the match
, and then using this
instead of self.as_mut().project()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm that doesn't work I think - if I use this this
approach I will have an ownership problem since poll_ready
takes self
and doesn't borrow. So I have to project twice there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should only need to project once, and then use the projection throughout - inner
should have type Pin<&mut S>
after projection, so you can put the as_mut()
after this.inner
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that does indeed work - meaning I also don't need to restrict poll_write
to be mut
. Out of interest, what would (besides that) be the practical implication for this? Does it add a cost if I project twice versus pinning a reference twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Projection is free, so it doesn't really have any impact how you do it. Also, the part of the arguments that affects the signature is the type written after the colon, and adding mut
on the argument name is before the colon, so it doesn't actually affect the signature of the function.
tokio-util/src/io/copy_to_bytes.rs
Outdated
/// A helper which wraps a `Sink<Bytes>` and converts it into | ||
/// a `Sink<&'a [u8]>` by copying each byte slice into an owned [`Bytes`]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// A helper which wraps a `Sink<Bytes>` and converts it into | |
/// a `Sink<&'a [u8]>` by copying each byte slice into an owned [`Bytes`]. | |
/// A helper that wraps a `Sink<Bytes>` and converts it into | |
/// a `Sink<&'a [u8]>` by copying each byte slice into an owned [`Bytes`]. |
Poll::Pending => { | ||
cx.waker().wake_by_ref(); | ||
Poll::Pending | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a wake call here will be a busy loop that consumes 100% CPU waiting for it to become ready. We shouldn't do that.
The call to poll_ready
has returned Pending
, so it promises to wake the waker for us when poll_ready
can make progress.
Poll::Pending => { | |
cx.waker().wake_by_ref(); | |
Poll::Pending | |
} | |
Poll::Pending => Poll::Pending, |
tokio-util/src/io/sink_writer.rs
Outdated
/// This adapter implements [`AsyncWrite`] for `Sink<&[u8]>`. If you want to | ||
/// implement `Sink<_>` for [`AsyncWrite`], see the [`codec`] module; if you need to implement | ||
/// [`AsyncWrite`] for `Sink<Bytes>`, see [`CopyToBytes`]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// This adapter implements [`AsyncWrite`] for `Sink<&[u8]>`. If you want to | |
/// implement `Sink<_>` for [`AsyncWrite`], see the [`codec`] module; if you need to implement | |
/// [`AsyncWrite`] for `Sink<Bytes>`, see [`CopyToBytes`]. | |
/// This adapter implements [`AsyncWrite`] for `Sink<&[u8]>`. Because of the | |
/// lifetime, this trait is relatively rarely implemented. The main ways to | |
/// get a `Sink<&[u8]>` that you can use with this type are: | |
/// | |
/// * With the codec module by implementing the [`Encoder<&[u8]>`] trait. | |
/// * By wrapping a `Sink<Bytes>` in a [`CopyToBytes`]. | |
/// * Manually implementing `Sink<&[u8]>` directly. | |
/// | |
/// The opposite conversion of implementing `Sink<_>` for an [`AsyncWrite`] | |
/// is done using the [`codec`] module. | |
/// | |
/// [`Encoder<&[u8]>`]: crate::codec::Encoder |
I think the only missing changes here are documentation. Would you be willing to improve the documentation further? Otherwise I can also merge this and have it fixed in a follow-up PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a commit that improves the documentation. Thanks for the PR.
Per review comment tokio-rs#5070 (comment): > Adding a wake call here will be a busy loop that consumes 100% CPU > waiting for it to become ready. We shouldn't do that. Furthermore, according to https://docs.rs/futures-sink/latest/futures_sink/trait.Sink.html#tymethod.poll_ready, poll_ready will register current task to be notified. Discussion: https://discord.com/channels/500028886025895936/500336346770964480/1072534504981418024
Per review comment tokio-rs#5070 (comment): > Adding a wake call here will be a busy loop that consumes 100% CPU > waiting for it to become ready. We shouldn't do that. Furthermore, according to https://docs.rs/futures-sink/latest/futures_sink/trait.Sink.html#tymethod.poll_ready, poll_ready will make sure that the current task is notified. Discussion: https://discord.com/channels/500028886025895936/500336346770964480/1072534504981418024
Per review comment tokio-rs#5070 (comment): > Adding a wake call here will be a busy loop that consumes 100% CPU > waiting for it to become ready. We shouldn't do that. Furthermore, according to https://docs.rs/futures-sink/latest/futures_sink/trait.Sink.html#tymethod.poll_ready, poll_ready will make sure that the current task is notified. Discussion: https://discord.com/channels/500028886025895936/500336346770964480/1072534504981418024
Per review comment tokio-rs#5070 (comment): > Adding a wake call here will be a busy loop that consumes 100% CPU > waiting for it to become ready. We shouldn't do that. Furthermore, according to https://docs.rs/futures-sink/latest/futures_sink/trait.Sink.html#tymethod.poll_ready, poll_ready will make sure that the current task is notified. Discussion: https://discord.com/channels/500028886025895936/500336346770964480/1072534504981418024
Motivation
Tokio provides
StreamReader
, enabling us to treat streams as file-like interfaces throughAsyncRead
. This however only exists for streams, not for general sinks, for which this PR provides an implementation. This is a new attempt on #5019 .Solution
A sink is wrapped which takes owned buffers in form of a
Vec<u8>
. Each buffer is copied to a new vector, which is sent to the underlying sink.Attempted alternatives
I have tried three other designs:
This lead to non-trivial errors in the test case, mixing multiple lifetimes which I ultimately could not resolve:
This lead to the problem that the buffer lifetime could not be restricted by the lifetime of the
Sink
.The current proposal relies on copying the slice to a
Vec<u8>
and resolve the lifetime problems in that way.