Skip to content

Commit

Permalink
Body reading
Browse files Browse the repository at this point in the history
  • Loading branch information
algesten committed Jul 7, 2024
1 parent 6ade769 commit 47370ff
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 66 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ features = []
[features]

[dependencies]
hoot = { git = "https://github.com/algesten/hoot", rev = "3e52ff1" }
hoot = { git = "https://github.com/algesten/hoot", rev = "2e2eac5" }
http = "1.1.0"
log = "0.4.22"
once_cell = "1.19.0"
Expand Down
46 changes: 30 additions & 16 deletions src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use hoot::client::flow::RedirectAuthHeaders;
use http::{Request, Response, Uri};

use crate::body::RecvBody;
use crate::body::AsBody;
use crate::pool::{Connection, ConnectionPool};
use crate::recv::RecvBody;
use crate::resolver::{DefaultResolver, Resolver};
use crate::time::Instant;
use crate::transport::{Buffers, Socket, Transport};
Expand All @@ -15,9 +17,9 @@ use crate::{Body, Error};

#[derive(Debug)]
pub struct Agent {
config: AgentConfig,
pool: ConnectionPool,
resolver: Box<dyn Resolver>,
config: Arc<AgentConfig>,
pool: Arc<ConnectionPool>,
resolver: Arc<Box<dyn Resolver>>,
}

/// Config as built by AgentBuilder and then static for the lifetime of the Agent.
Expand Down Expand Up @@ -118,9 +120,9 @@ impl Default for AgentConfig {
impl Agent {
pub fn new(config: AgentConfig, pool: impl Transport, resolver: impl Resolver) -> Self {
Agent {
config,
pool: ConnectionPool::new(pool),
resolver: Box::new(resolver),
config: Arc::new(config),
pool: Arc::new(ConnectionPool::new(pool)),

Check failure on line 124 in src/agent.rs

View workflow job for this annotation

GitHub Actions / Lint

usage of an `Arc` that is not `Send` and `Sync`

Check failure on line 124 in src/agent.rs

View workflow job for this annotation

GitHub Actions / Lint

usage of an `Arc` that is not `Send` and `Sync`
resolver: Arc::new(Box::new(resolver)),
}
}

Expand All @@ -132,23 +134,31 @@ impl Agent {
)
}

pub fn run(&self, request: Request<impl AsBody>) -> Result<Response<RecvBody>, Error> {
let (parts, mut body) = request.into_parts();
let body = body.as_body();
let request = Request::from_parts(parts, ());

self.do_run(request, body, Instant::now)
}

// TODO(martin): Can we improve this signature? The ideal would be:
// fn run(&self, request: Request<impl Body>) -> Result<Response<impl Body>, Error>

// TODO(martin): One design idea is to be able to create requests in one thread, then
// actually run them to completion in another. &mut self here makes it impossible to use
// Agent in such a design. Is that a concern?
pub(crate) fn run(
&mut self,
request: &Request<()>,
pub(crate) fn do_run(
&self,
request: Request<()>,
body: Body,
current_time: impl Fn() -> Instant,
current_time: impl Fn() -> Instant + 'static,
) -> Result<Response<RecvBody>, Error> {
let mut unit = Unit::new(&self.config, current_time(), request, body)?;
let mut unit = Unit::new(self.config.clone(), current_time(), request, body)?;

let mut addr = None;
let mut connection: Option<Connection> = None;
let mut response = None;
let mut response;

loop {
// The buffer is owned by the connection. Before we have an open connection,
Expand All @@ -161,7 +171,6 @@ impl Agent {
match unit.poll_event(current_time(), buffers)? {
Event::Reset { must_close } => {
addr = None;
response = None;

if let Some(c) = connection.take() {
if must_close {
Expand Down Expand Up @@ -236,9 +245,14 @@ impl Agent {
}

let response = response.expect("above loop to exit when there is a response");
let connection = connection.expect("connection to be open");
let unit = unit.release_body();

todo!()
let (parts, _) = response.into_parts();
let recv_body = RecvBody::new(unit, connection, current_time);
let response = Response::from_parts(parts, recv_body);

Ok(response)
}
}

Expand All @@ -247,7 +261,7 @@ pub struct RustlConnectionPool;

impl Transport for RustlConnectionPool {
fn connect(
&mut self,
&self,
_uri: &Uri,
addr: SocketAddr,

Check failure on line 266 in src/agent.rs

View workflow job for this annotation

GitHub Actions / Test

unused variable: `addr`

Check failure on line 266 in src/agent.rs

View workflow job for this annotation

GitHub Actions / Lint

unused variable: `addr`

Check failure on line 266 in src/agent.rs

View workflow job for this annotation

GitHub Actions / Lint

unused variable: `addr`

Check warning on line 266 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (beta)

unused variable: `addr`

Check warning on line 266 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (beta)

unused variable: `addr`

Check warning on line 266 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (stable)

unused variable: `addr`

Check warning on line 266 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (stable)

unused variable: `addr`

Check failure on line 266 in src/agent.rs

View workflow job for this annotation

GitHub Actions / Test

unused variable: `addr`

Check warning on line 266 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (stable)

unused variable: `addr`

Check warning on line 266 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (stable)

unused variable: `addr`

Check warning on line 266 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (beta)

unused variable: `addr`

Check warning on line 266 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (beta)

unused variable: `addr`
timeout: Duration,

Check failure on line 267 in src/agent.rs

View workflow job for this annotation

GitHub Actions / Test

unused variable: `timeout`

Check failure on line 267 in src/agent.rs

View workflow job for this annotation

GitHub Actions / Lint

unused variable: `timeout`

Check failure on line 267 in src/agent.rs

View workflow job for this annotation

GitHub Actions / Lint

unused variable: `timeout`

Check warning on line 267 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (beta)

unused variable: `timeout`

Check warning on line 267 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (beta)

unused variable: `timeout`

Check warning on line 267 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (stable)

unused variable: `timeout`

Check warning on line 267 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (stable)

unused variable: `timeout`

Check failure on line 267 in src/agent.rs

View workflow job for this annotation

GitHub Actions / Test

unused variable: `timeout`

Check warning on line 267 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (stable)

unused variable: `timeout`

Check warning on line 267 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (stable)

unused variable: `timeout`

Check warning on line 267 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (beta)

unused variable: `timeout`

Check warning on line 267 in src/agent.rs

View workflow job for this annotation

GitHub Actions / build_versions (beta)

unused variable: `timeout`
Expand Down
17 changes: 8 additions & 9 deletions src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::fs::File;
use std::io::{self, Read, Stdin};
use std::net::TcpStream;

use crate::recv::RecvBody;

pub struct Body<'a> {
inner: BodyInner<'a>,
ended: bool,
Expand Down Expand Up @@ -105,14 +107,6 @@ use std::os::unix::net::UnixStream;
#[cfg(target_family = "unix")]
impl_into_body!(UnixStream, Reader);

pub struct RecvBody;

impl Read for RecvBody {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
todo!()
}
}

impl<'a> From<BodyInner<'a>> for Body<'a> {
fn from(inner: BodyInner<'a>) -> Self {
Body {
Expand All @@ -122,7 +116,12 @@ impl<'a> From<BodyInner<'a>> for Body<'a> {
}
}

impl_into_body!(RecvBody, Reader);
impl Private for RecvBody {}
impl AsBody for RecvBody {
fn as_body(&mut self) -> Body {
BodyInner::Reader(self).into()
}
}

impl Private for Response<RecvBody> {}
impl AsBody for Response<RecvBody> {
Expand Down
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ pub enum Error {
RedirectFailed,
}

impl Error {
pub(crate) fn into_io(self) -> io::Error {
if let Self::Io(e) = self {
e
} else {
io::Error::new(io::ErrorKind::Other, self)
}
}
}

#[derive(Debug)]
pub enum TimeoutReason {
Resolver,
Expand Down
14 changes: 5 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@
use std::convert::TryFrom;
use std::sync::atomic::{AtomicBool, Ordering};

use body::{AsBody, RecvBody};
use body::AsBody;
/// Re-exported http-crate.
pub use http;

use http::{Method, Request, Response, Uri};
use once_cell::sync::Lazy;
use recv::RecvBody;
use request::RequestBuilder;

mod agent;
mod body;
mod error;
mod pool;
mod recv;
mod request;
pub mod resolver;
mod time;
Expand All @@ -26,16 +28,10 @@ mod unit;
pub use agent::{Agent, AgentConfig};
pub use body::Body;
pub use error::Error;
use time::Instant;

pub fn run(request: Request<impl AsBody>) -> Result<Response<RecvBody>, Error> {
let mut agent = Agent::new_default();

let (parts, mut body) = request.into_parts();
let body = body.as_body();
let request = Request::from_parts(parts, ());

agent.run(&request, body, Instant::now)
let agent = Agent::new_default();
agent.run(request)
}

fn builder<T>(method: Method, uri: T) -> RequestBuilder
Expand Down
2 changes: 1 addition & 1 deletion src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl ConnectionPool {
}

pub fn connect(
&mut self,
&self,
uri: &Uri,
addr: SocketAddr,
timeout: Duration,
Expand Down
63 changes: 63 additions & 0 deletions src/recv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::io::{self, Read};

use crate::pool::Connection;
use crate::time::Instant;
use crate::transport::Buffers;
use crate::unit::{Event, Input, Unit};
use crate::Error;

pub struct RecvBody {
unit: Unit<'static>,
connection: Connection,
current_time: Box<dyn Fn() -> Instant>,
}
impl RecvBody {
pub(crate) fn new(
unit: Unit<'static>,
connection: Connection,
current_time: impl Fn() -> Instant + 'static,
) -> Self {
RecvBody {
unit,
connection,
current_time: Box::new(current_time),
}
}

fn do_read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
let buffers = self.connection.borrow_buffers();
let event = self.unit.poll_event((self.current_time)(), buffers)?;

let timeout = match event {
Event::AwaitInput { timeout, is_body } => {
assert!(is_body);
timeout
}
_ => unreachable!("expected event AwaitInput"),
};

let Buffers { input, .. } = self.connection.await_input(timeout, true)?;
let input_used =
self.unit
.handle_input((self.current_time)(), Input::Input { input }, buf)?;
self.connection.consume_input(input_used);

let buffers = self.connection.borrow_buffers();
let event = self.unit.poll_event((self.current_time)(), buffers)?;

let output_used = match event {
Event::ResponseBody { amount } => amount,
_ => unreachable!("expected event ResponseBody"),
};

Ok(output_used)
}
}

impl Read for RecvBody {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.do_read(buf).map_err(|e| e.into_io())?;

Ok(0)
}
}
11 changes: 4 additions & 7 deletions src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::convert::TryFrom;

use http::{HeaderName, HeaderValue, Method, Request, Response, Uri};

use crate::body::{AsBody, RecvBody};
use crate::body::AsBody;
use crate::recv::RecvBody;
use crate::time::Instant;
use crate::{Agent, Body, Error};

Expand Down Expand Up @@ -86,12 +87,8 @@ impl RequestBuilder {
}
}

fn do_call(
mut agent: Agent,
request: Request<()>,
body: Body,
) -> Result<Response<RecvBody>, Error> {
let response = agent.run(&request, body, Instant::now)?;
fn do_call(agent: Agent, request: Request<()>, body: Body) -> Result<Response<RecvBody>, Error> {
let response = agent.do_run(request, body, Instant::now)?;
Ok(response)
}

Expand Down
2 changes: 1 addition & 1 deletion src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::Error;

pub trait Transport: Debug + 'static {
fn connect(
&mut self,
&self,
uri: &Uri,
addr: SocketAddr,
timeout: Duration,
Expand Down
Loading

0 comments on commit 47370ff

Please sign in to comment.