Skip to content

Commit

Permalink
Fix redirection and errors not being propagated before future is ready
Browse files Browse the repository at this point in the history
  • Loading branch information
sagebind committed Jun 8, 2019
1 parent 31ba23e commit f9cfc09
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 13 deletions.
6 changes: 4 additions & 2 deletions src/internal/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,17 +351,19 @@ impl AgentThread {

let handle = self.requests.remove(token);
let mut handle = self.multi.remove2(handle)?;
// handle.get_mut().complete();

// TODO
handle.get_mut().finish_response_and_complete();

Ok(())
}

fn fail_request(&mut self, token: usize, error: curl::Error) -> Result<(), Error> {
let handle = self.requests.remove(token);
let mut handle = self.multi.remove2(handle)?;
// handle.get_mut().fail(error);

// TODO
handle.get_mut().complete_with_error(error);

Ok(())
}
Expand Down
21 changes: 17 additions & 4 deletions src/internal/handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::body::Body;
use crate::{Body, Error};
use super::parse;
use super::response::ResponseProducer;
use curl::easy::{ReadError, InfoType, WriteError, SeekResult};
Expand Down Expand Up @@ -72,14 +72,25 @@ impl CurlHandler {
self.response_body_waker = Some(response_waker);
}

fn finish_response_and_complete(&mut self) {
// /// Guess if curl is about to perform a redirect.
// fn is_about_to_redirect(&self) -> bool {
// self.state.options.redirect_policy != RedirectPolicy::None
// && self.status_code.filter(http::StatusCode::is_redirection).is_some()
// && self.headers.contains_key("Location")
// }

pub(crate) fn finish_response_and_complete(&mut self) {
if let Some(body) = self.response_body_reader.take() {
// TODO: Extract and include Content-Length here.
self.producer.finish(Body::reader(body));
} else {
log::debug!("response already finished!");
}
}

pub fn complete_with_error(&mut self, error: impl Into<Error>) {
self.producer.complete_with_error(error);
}
}

impl curl::easy::Handler for CurlHandler {
Expand All @@ -100,13 +111,13 @@ impl curl::easy::Handler for CurlHandler {

// Is this a header line?
if let Some((name, value)) = parse::parse_header(data) {
// self.producer.headers.insert(name, value);
self.producer.headers.insert(name, value);
return true;
}

// Is this the end of the response header?
if data == b"\r\n" {
self.finish_response_and_complete();
// self.finish_response_and_complete();
// self.finalize_headers();
return true;
}
Expand Down Expand Up @@ -166,6 +177,8 @@ impl curl::easy::Handler for CurlHandler {
fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
log::trace!("received {} bytes of data", data.len());

self.finish_response_and_complete();

// if self.producer.is_closed() {
// log::debug!("aborting write, request is already closed");
// return Ok(0);
Expand Down
30 changes: 23 additions & 7 deletions src/internal/response.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use crate::body::Body;
use crate::error::Error;
use futures::channel::oneshot::*;
use futures::channel::oneshot;
use futures::prelude::*;
use http::Response;
use std::pin::Pin;
use std::task::*;

// A future for a response.
pub struct ResponseFuture {
receiver: Receiver<Response<Body>>,
receiver: oneshot::Receiver<Result<Response<Body>, Error>>,
}

impl ResponseFuture {
pub fn new() -> (Self, ResponseProducer) {
let (sender, receiver) = channel();
let (sender, receiver) = oneshot::channel();

let future = Self {
receiver,
Expand All @@ -38,8 +38,8 @@ impl Future for ResponseFuture {

match inner.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(response)) => Poll::Ready(Ok(response)),
Poll::Ready(Err(e)) => Poll::Ready(Err(Error::Canceled)),
Poll::Ready(Ok(result)) => Poll::Ready(result),
Poll::Ready(Err(oneshot::Canceled)) => Poll::Ready(Err(Error::Canceled)),
}
}
}
Expand All @@ -50,7 +50,7 @@ impl Future for ResponseFuture {
/// If dropped before the response is finished, the associated future will be
/// completed with a `Canceled` error.
pub struct ResponseProducer {
sender: Option<Sender<Response<Body>>>,
sender: Option<oneshot::Sender<Result<Response<Body>, Error>>>,

/// Status code of the response.
pub(crate) status_code: Option<http::StatusCode>,
Expand Down Expand Up @@ -98,7 +98,23 @@ impl ResponseProducer {
.unwrap();

match self.sender.take() {
Some(sender) => match sender.send(response) {
Some(sender) => match sender.send(Ok(response)) {
Ok(()) => true,
Err(_) => {
log::info!("response future cancelled");
false
},
}
None => {
log::warn!("response future already completed!");
false
},
}
}

pub fn complete_with_error(&mut self, error: impl Into<Error>) -> bool {
match self.sender.take() {
Some(sender) => match sender.send(Err(error.into())) {
Ok(()) => true,
Err(_) => {
log::info!("response future cancelled");
Expand Down

0 comments on commit f9cfc09

Please sign in to comment.