Skip to content

Commit

Permalink
feat(body): change Sender::send_data to an async fn.
Browse files Browse the repository at this point in the history
The previous version is renamed to `try_send_data`.

BREAKING CHANGE: Usage of `send_data` should either be changed to
  async/await or use `try_send_data`.
  • Loading branch information
seanmonstar committed Aug 30, 2019
1 parent 0331219 commit 62a96c0
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
22 changes: 16 additions & 6 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use http_body::{SizeHint, Body as HttpBody};
use http::HeaderMap;

use crate::common::{Future, Never, Pin, Poll, task};
use super::internal::{FullDataArg, FullDataRet};
use super::{Chunk, Payload};
use super::Chunk;
use crate::upgrade::OnUpgrade;

type BodySender = mpsc::Sender<Result<Chunk, crate::Error>>;
Expand Down Expand Up @@ -467,14 +466,25 @@ impl Sender {
self.tx.poll_ready(cx).map_err(|_| crate::Error::new_closed())
}

/// Sends data on this channel.
/// Send data on this channel when it is ready.
pub async fn send_data(&mut self, chunk: Chunk) -> crate::Result<()> {
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await?;
self.tx.try_send(Ok(chunk)).map_err(|_| crate::Error::new_closed())
}

/// Try to send data on this channel.
///
/// This should be called after `poll_ready` indicated the channel
/// could accept another `Chunk`.
/// # Errors
///
/// Returns `Err(Chunk)` if the channel could not (currently) accept
/// another `Chunk`.
pub fn send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> {
///
/// # Note
///
/// This is mostly useful for when trying to send from some other thread
/// that doesn't have an async context. If in an async context, prefer
/// [`send_data`][] instead.
pub fn try_send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> {
self.tx
.try_send(Ok(chunk))
.map_err(|err| err.into_inner().expect("just sent Ok"))
Expand Down
5 changes: 2 additions & 3 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use http::{Request, Response, StatusCode};
use tokio_io::{AsyncRead, AsyncWrite};

use crate::body::{Body, Payload};
use crate::body::internal::FullDataArg;
use crate::common::{Future, Never, Poll, Pin, Unpin, task};
use crate::proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead};
use super::Http1Transaction;
Expand Down Expand Up @@ -169,7 +168,7 @@ where
}
match self.conn.poll_read_body(cx) {
Poll::Ready(Some(Ok(chunk))) => {
match body.send_data(chunk) {
match body.try_send_data(chunk) {
Ok(()) => {
self.body_tx = Some(body);
},
Expand Down Expand Up @@ -249,7 +248,7 @@ where
return Poll::Ready(Ok(()));
} else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() {
if let Some(msg) = ready!(self.dispatch.poll_msg(cx)) {
let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;
let (head, body) = msg.map_err(crate::Error::new_user_service)?;

let body_type = if body.is_end_stream() {
self.body_rx.set(None);
Expand Down
2 changes: 1 addition & 1 deletion tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,7 @@ mod conn {

let (mut sender, body) = Body::channel();
let sender = thread::spawn(move || {
sender.send_data("hello".into()).ok().unwrap();
sender.try_send_data("hello".into()).expect("try_send_data");
Runtime::new().unwrap().block_on(rx).unwrap();
sender.abort();
});
Expand Down
2 changes: 1 addition & 1 deletion tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ mod response_body_lengths {
},
Bd::Unknown(b) => {
let (mut tx, body) = hyper::Body::channel();
tx.send_data(b.into()).expect("send_data");
tx.try_send_data(b.into()).expect("try_send_data");
reply.body_stream(body);
b
},
Expand Down

0 comments on commit 62a96c0

Please sign in to comment.