Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport-http): layer client #1227

Merged
merged 33 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
71a14b5
feat(transport): bare minimal reqwest-tower integration
yash-atreya Aug 30, 2024
7eea874
feat(transport-http): reqwest-tower layer client
yash-atreya Sep 2, 2024
98110e6
feat(transport-http): LayerClient
yash-atreya Sep 2, 2024
f4dbb7e
fix: feature gate layer transport
yash-atreya Sep 3, 2024
0d77863
rm logging layer
yash-atreya Sep 3, 2024
386d8cd
feat(transport-http): hyper layer transport
yash-atreya Sep 4, 2024
849a7ff
hyper layer transport test
yash-atreya Sep 9, 2024
f44eacd
test with tower-http layers
yash-atreya Sep 9, 2024
69641a6
rm reqwest layer transport
yash-atreya Sep 9, 2024
1c437de
Merge branch 'main' into yash/reqwest-tower-integration
yash-atreya Sep 10, 2024
1821332
rm trait bounds for new
yash-atreya Sep 11, 2024
5b42d91
nit
yash-atreya Sep 11, 2024
5f65b93
unify hyper transports
yash-atreya Sep 16, 2024
814bd60
rm TransportConnect for HyperLayerTransport
yash-atreya Sep 16, 2024
d9e14c0
make request generic
yash-atreya Sep 16, 2024
0b201f2
unify hyper transports
yash-atreya Sep 16, 2024
3c3d593
Merge branch 'main' into yash/reqwest-tower-integration
yash-atreya Sep 16, 2024
8e65d0f
nit
yash-atreya Sep 16, 2024
3310d05
nit
yash-atreya Sep 16, 2024
1f0c99b
nit
yash-atreya Sep 16, 2024
1971f60
rm unintended reqwest default
yash-atreya Sep 16, 2024
a029906
rename HyperLayerTransport to HyperTransport
yash-atreya Sep 16, 2024
d18e0bd
rename file
yash-atreya Sep 16, 2024
4d12209
Merge branch 'main' into yash/reqwest-tower-integration
yash-atreya Sep 17, 2024
c2f1432
nit
yash-atreya Sep 17, 2024
9f880db
fix: rm transport from HttpConnect
yash-atreya Sep 18, 2024
939ac7e
fix: rm url from HyperTransport, infer it from Http
yash-atreya Sep 18, 2024
563ecbb
clippy
yash-atreya Sep 18, 2024
cdfcd10
fix
yash-atreya Sep 18, 2024
edc580e
impl Http<Hyper>
yash-atreya Sep 18, 2024
8a38603
fix
yash-atreya Sep 18, 2024
9af4cae
rename
yash-atreya Sep 18, 2024
e16447f
nit
yash-atreya Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ reqwest.workspace = true
tokio = { workspace = true, features = ["macros"] }
tracing-subscriber = { workspace = true, features = ["fmt"] }
tempfile.workspace = true
tower.workspace = true

[features]
default = ["reqwest", "reqwest-default-tls"]
Expand Down
16 changes: 16 additions & 0 deletions crates/provider/src/provider/trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ mod tests {
use alloy_node_bindings::Anvil;
use alloy_primitives::{address, b256, bytes, keccak256};
use alloy_rpc_types_eth::{request::TransactionRequest, Block};
use alloy_transport_http::LoggingLayer;

fn init_tracing() {
let _ = tracing_subscriber::fmt::try_init();
Expand All @@ -1035,6 +1036,21 @@ mod tests {
assert_eq!(0, num);
}

#[tokio::test]
async fn test_layer_transport() {
init_tracing();
let anvil = Anvil::new().spawn();
let service =
tower::ServiceBuilder::new().layer(LoggingLayer).service(reqwest::Client::new());
let layer_transport = alloy_transport_http::LayerClient::new(anvil.endpoint_url(), service);

let rpc_client = alloy_rpc_client::RpcClient::new(layer_transport, true);

let provider = RootProvider::<_, Ethereum>::new(rpc_client);
let num = provider.get_block_number().await.unwrap();
assert_eq!(0, num);
}

#[tokio::test]
async fn test_builder_helper_fn_any_network() {
init_tracing();
Expand Down
140 changes: 140 additions & 0 deletions crates/transport-http/src/layer_transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use alloy_json_rpc::{RequestPacket, ResponsePacket};
use alloy_transport::{
utils::guess_local_url, TransportConnect, TransportError, TransportErrorKind, TransportFut,
};
use std::{future::Future, pin::Pin, task};
use tower::Service;
use tracing::{debug, debug_span, trace, Instrument};
use url::Url;

/// A [reqwest] client that can be used with tower layers.
#[derive(Debug, Clone)]
pub struct LayerClient<S> {
url: Url,
service: S,
}

impl<S> LayerClient<S>
where
S: Service<reqwest::Request, Response = reqwest::Response, Error = reqwest::Error>
+ Clone
+ Send
+ 'static,
S::Future: Send,
{
/// Create a new [LayerClient] with the given URL.
pub fn new(url: Url, service: S) -> Self {
Self { url, service }
}

/// Make a request using the tower service with layers.
pub fn request(&self, req: RequestPacket) -> TransportFut<'static> {
let this = self.clone();
let span = debug_span!("LayerClient", url = %self.url);
Box::pin(
async move {
let mut service = this.service.clone();

let raw_req = reqwest::Client::new()
.post(this.url.to_owned())
.json(&req)
.build()
.map_err(TransportErrorKind::custom)?;

let resp = service.call(raw_req).await.map_err(TransportErrorKind::custom)?;

let status = resp.status();

debug!(%status, "received response from server");

let body = resp.bytes().await.map_err(TransportErrorKind::custom)?;

debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body");
trace!(body = %String::from_utf8_lossy(&body), "response body");

if status != reqwest::StatusCode::OK {
return Err(TransportErrorKind::http_error(
status.as_u16(),
String::from_utf8_lossy(&body).into_owned(),
));
}

serde_json::from_slice(&body)
.map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(&body)))
}
.instrument(span),
)
}
}

impl<S> TransportConnect for LayerClient<S>
where
S: Service<reqwest::Request, Response = reqwest::Response, Error = reqwest::Error>
+ Clone
+ Send
+ 'static
+ Sync,
S::Future: Send,
{
type Transport = LayerClient<S>;

fn is_local(&self) -> bool {
guess_local_url(self.url.as_str())
}

fn get_transport<'a: 'b, 'b>(
&'a self,
) -> alloy_transport::Pbf<'b, Self::Transport, TransportError> {
Box::pin(async move { Ok(LayerClient::new(self.url.clone(), self.service.clone())) })
}
}

impl<S> Service<RequestPacket> for LayerClient<S>
where
S: Service<reqwest::Request, Response = reqwest::Response, Error = reqwest::Error>
+ Clone
+ Send
+ 'static,
S::Future: Send,
{
type Response = ResponsePacket;
type Error = TransportError;
type Future = TransportFut<'static>;

fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
task::Poll::Ready(Ok(()))
}

fn call(&mut self, req: RequestPacket) -> Self::Future {
self.request(req)
}
}

/// Future for reqwest responses.
pub type ReqwestResponseFut<T = reqwest::Response, E = reqwest::Error> =
Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'static>>;

// impl<S> Service<reqwest::Request> for LayerClient<S>
// where
// S: Service<reqwest::Request, Response = reqwest::Response, Error = reqwest::Error>
// + Clone
// + Send
// + 'static,
// S::Future: Send,
// {
// type Response = reqwest::Response;
// type Error = reqwest::Error;
// type Future = ReqwestResponseFut<reqwest::Response, reqwest::Error>;

// fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>>
// { task::Poll::Ready(Ok(()))
// }

// fn call(&mut self, req: reqwest::Request) -> Self::Future {
// let fut = self.service.call(req);
// Box::pin(async move {
// let resp = fut.await?;
// Ok(resp)
// })
// }
// }
45 changes: 45 additions & 0 deletions crates/transport-http/src/layers/logging.rs
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use reqwest::{Error, Request, Response};
use std::task::{Context, Poll};
use tower::{Layer, Service};

/// A logging layer for the HTTP transport.
#[derive(Debug, Clone)]
pub struct LoggingLayer;

impl<S> Layer<S> for LoggingLayer {
type Service = LoggingService<S>;

fn layer(&self, inner: S) -> Self::Service {
LoggingService { inner }
}
}

/// A service that logs requests and responses.
#[derive(Debug, Clone)]
pub struct LoggingService<S> {
inner: S,
}

impl<S> Service<Request> for LoggingService<S>
where
S: Service<Request, Response = Response, Error = Error>,
S::Future: Send + 'static,
{
type Response = Response;
type Error = Error;
type Future = crate::ReqwestResponseFut<Response, Error>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request) -> Self::Future {
tracing::info!("LoggingLayer(Request) {:?}", req);

let future = self.inner.call(req);
Box::pin(async move {
let resp = future.await?;
Ok(resp)
})
}
}
3 changes: 3 additions & 0 deletions crates/transport-http/src/layers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod logging;

pub use logging::*;
8 changes: 8 additions & 0 deletions crates/transport-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ mod reqwest_transport;
#[doc(inline)]
pub use reqwest_transport::*;

mod layer_transport;

pub use layer_transport::*;

mod layers;

pub use layers::{LoggingLayer, LoggingService};

#[cfg(feature = "reqwest")]
pub use reqwest;

Expand Down
Loading