Skip to content

Commit

Permalink
fix(transports): fix rebase errors
Browse files Browse the repository at this point in the history
  • Loading branch information
iskorotkov committed Jan 4, 2025
1 parent f8d1ac9 commit e08f69c
Showing 1 changed file with 76 additions and 63 deletions.
139 changes: 76 additions & 63 deletions src/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,45 +305,49 @@ fn id_of_output(output: &Output) -> Result<RequestId> {

#[cfg(test)]
mod tests {
use std::{future::Future, pin::Pin};

use super::*;
use crate::Error::Rpc;
use core::pin::Pin;
use futures::{lock::Mutex, Future};
use futures::lock::Mutex;
use http_body_util::{BodyExt, Full};
use hyper::{
body::HttpBody,
service::{make_service_fn, service_fn},
Body, Error, Method, Request, Response, Server,
body::{Bytes, Incoming},
server::conn::http1,
service::service_fn,
Method, Request, Response,
};
use hyper_util::rt::TokioIo;
use jsonrpc_core::ErrorCode;
use std::net::TcpListener;
use tokio::{task::JoinHandle, time::Instant};
use tokio::{net::TcpListener, task::JoinHandle, time::Instant};

type HyperResponse = Pin<Box<dyn Future<Output = hyper::Result<Response<Body>>> + Send>>;
type HyperResponse =
Pin<Box<dyn Future<Output = std::result::Result<Response<Full<Bytes>>, hyper::http::Error>> + Send + Sync>>;

type HyperHandler = Box<dyn Fn(Request<Body>) -> HyperResponse + Send + Sync>;
type HyperHandler = Box<dyn Fn(Request<Incoming>) -> HyperResponse + Send + Sync>;

fn get_available_port() -> Option<u16> {
Some(TcpListener::bind(("127.0.0.1", 0)).ok()?.local_addr().ok()?.port())
async fn get_available_port() -> Option<u16> {
Some(
TcpListener::bind(("127.0.0.1", 0))
.await
.ok()?
.local_addr()
.ok()?
.port(),
)
}

fn create_server(port: u16, handler: HyperHandler) -> JoinHandle<()> {
async fn create_server(port: u16, handler: HyperHandler) -> JoinHandle<()> {
let addr = format!("127.0.0.1:{}", port);
let listener = TcpListener::bind(addr).await.unwrap();
let handler = Arc::new(handler);
let service = make_service_fn(move |_| {
let handler = handler.clone();
async move {
let handler = handler.clone();
Ok::<_, Error>(service_fn(move |req| {
let handler = handler.clone();
async move { handler(req).await }
}))
tokio::task::spawn(async move {
loop {
let (stream, _) = listener.accept().await.unwrap();
let service = service_fn(handler.as_ref());
let io = TokioIo::new(stream);
http1::Builder::new().serve_connection(io, service).await.unwrap();
}
});

let server = Server::bind(&addr.parse().unwrap()).serve(service);
tokio::spawn(async move {
println!("Listening on http://{}", addr);
server.await.unwrap();
})
}

Expand All @@ -356,9 +360,9 @@ mod tests {
}

fn return_429(retry_after_value: Option<String>) -> HyperHandler {
Box::new(move |_req: Request<Body>| -> HyperResponse {
Box::new(move |_req: Request<Incoming>| -> HyperResponse {
let retry_after_value = retry_after_value.clone();
let response_body = Body::from(
let response_body = Bytes::from(
r#"{
"jsonrpc": "2.0",
"error": {
Expand All @@ -373,14 +377,14 @@ mod tests {
response = response.header("Retry-After", value)
}

let response = response.body(response_body).unwrap();
let response = response.body(Full::new(response_body)).unwrap();
Box::pin(async move { Ok(response) })
})
}

fn return_5xx(code: u16) -> HyperHandler {
Box::new(move |_req: Request<Body>| -> HyperResponse {
let response_body = Body::from(
Box::new(move |_req: Request<Incoming>| -> HyperResponse {
let response_body = Bytes::from(
r#"{
"jsonrpc": "2.0",
"error": {
Expand All @@ -390,12 +394,12 @@ mod tests {
}"#,
);

let response = Response::builder().status(code).body(response_body).unwrap();
let response = Response::builder().status(code).body(Full::new(response_body)).unwrap();
Box::pin(async move { Ok(response) })
})
}

fn check_and_return_mock_response(req: Request<Body>) -> HyperResponse {
fn check_and_return_mock_response(req: Request<Incoming>) -> HyperResponse {
let expected = r#"{"jsonrpc":"2.0","method":"eth_getAccounts","params":[],"id":0}"#;
let response = r#"{"jsonrpc":"2.0","id":0,"result":"x"}"#;

Expand All @@ -405,16 +409,15 @@ mod tests {
let mut body = req.into_body();

Box::pin(async move {
while let Some(Ok(chunk)) = body.data().await {
content.extend(&*chunk);
while let Some(Ok(chunk)) = body.frame().await {
content.extend(chunk.into_data().unwrap());
}
assert_eq!(std::str::from_utf8(&*content), Ok(expected));

Ok(Response::new(response.into()))
assert_eq!(std::str::from_utf8(&content), Ok(expected));
Response::builder().status(200).body(Full::new(response.into()))
})
}

fn return_error_response(_req: Request<Body>) -> HyperResponse {
fn return_error_response(_req: Request<Incoming>) -> HyperResponse {
let response = r#"{
"jsonrpc":"2.0",
"error":{
Expand All @@ -423,12 +426,16 @@ mod tests {
},
"id":null
}"#;
Box::pin(async move { Ok(Response::new(response.into())) })
let response = Response::builder()
.status(200)
.body(Full::new(response.into()))
.unwrap();
Box::pin(async move { Ok(response) })
}

fn return_sequence(handlers: Vec<HyperHandler>) -> HyperHandler {
let handlers = Arc::new(Mutex::new(handlers));
Box::new(move |_req: Request<Body>| -> HyperResponse {
Box::new(move |_req: Request<Incoming>| -> HyperResponse {
let handlers = handlers.clone();
Box::pin(async move {
let mut handlers = handlers.lock().await;
Expand All @@ -441,8 +448,8 @@ mod tests {
#[tokio::test]
async fn should_make_a_request() {
// given
let port = get_available_port().unwrap();
let _ = create_server(port, Box::new(check_and_return_mock_response));
let port = get_available_port().await.unwrap();
let _ = create_server(port, Box::new(check_and_return_mock_response)).await;
let client = create_client(port, Retries::default());

// when
Expand All @@ -457,8 +464,8 @@ mod tests {
#[tokio::test]
async fn catch_generic_json_error_for_batched_request() {
// given
let port = get_available_port().unwrap();
let _ = create_server(port, Box::new(return_error_response));
let port = get_available_port().await.unwrap();
let _ = create_server(port, Box::new(return_error_response)).await;
let client = create_client(port, Retries::default());

// when
Expand Down Expand Up @@ -505,14 +512,15 @@ mod tests {
#[tokio::test]
async fn status_code_429_with_retry_after_as_seconds() {
// given
let port = get_available_port().unwrap();
let port = get_available_port().await.unwrap();
let _ = create_server(
port,
return_sequence(vec![
return_429(Some("3".into())),
Box::new(check_and_return_mock_response),
]),
);
)
.await;
let client = create_client(
port,
Retries {
Expand All @@ -537,7 +545,7 @@ mod tests {
#[tokio::test]
async fn status_code_429_with_retry_after_as_date() {
// given
let port = get_available_port().unwrap();
let port = get_available_port().await.unwrap();
let started = Instant::now();
let retry_after_value: DateTime<Utc> = DateTime::from(Utc::now() + Duration::from_secs(3));
let _ = create_server(
Expand All @@ -546,7 +554,8 @@ mod tests {
return_429(Some(retry_after_value.to_rfc2822())),
Box::new(check_and_return_mock_response),
]),
);
)
.await;
let client = create_client(
port,
Retries {
Expand All @@ -570,11 +579,12 @@ mod tests {
#[tokio::test]
async fn status_code_429_with_invalid_retry_after() {
// given
let port = get_available_port().unwrap();
let port = get_available_port().await.unwrap();
let _ = create_server(
port,
return_sequence(vec![return_429(Some("retry some time later, idc".into()))]),
);
)
.await;
let client = create_client(
port,
Retries {
Expand All @@ -596,8 +606,8 @@ mod tests {
#[tokio::test]
async fn status_code_429_without_retry_after() {
// given
let port = get_available_port().unwrap();
let _ = create_server(port, return_sequence(vec![return_429(None)]));
let port = get_available_port().await.unwrap();
let _ = create_server(port, return_sequence(vec![return_429(None)])).await;
let client = create_client(
port,
Retries {
Expand All @@ -619,8 +629,8 @@ mod tests {
#[tokio::test]
async fn status_code_429_retry_after_disabled() {
// given
let port = get_available_port().unwrap();
let _ = create_server(port, return_sequence(vec![return_429(Some("3".into()))]));
let port = get_available_port().await.unwrap();
let _ = create_server(port, return_sequence(vec![return_429(Some("3".into()))])).await;
let client = create_client(
port,
Retries {
Expand All @@ -642,15 +652,16 @@ mod tests {
#[tokio::test]
async fn status_code_429_with_retries() {
// given
let port = get_available_port().unwrap();
let port = get_available_port().await.unwrap();
let _ = create_server(
port,
return_sequence(vec![
return_429(Some("3".into())), // sleep for 1 second as configured below
return_429(Some("3".into())), // sleep for 2 seconds (2x 1sec)
Box::new(check_and_return_mock_response),
]),
);
)
.await;
let client = create_client(
port,
Retries {
Expand All @@ -675,15 +686,16 @@ mod tests {
#[tokio::test]
async fn status_code_5xx_with_retries() {
// given
let port = get_available_port().unwrap();
let port = get_available_port().await.unwrap();
let _ = create_server(
port,
return_sequence(vec![
return_5xx(500), // sleep for 1 second as configured below
return_5xx(502), // sleep for 2 seconds (2x 1sec)
Box::new(check_and_return_mock_response),
]),
);
)
.await;
let client = create_client(
port,
Retries {
Expand All @@ -708,7 +720,7 @@ mod tests {
#[tokio::test]
async fn status_code_5xx_retries_exhausted() {
// given
let port = get_available_port().unwrap();
let port = get_available_port().await.unwrap();
let _ = create_server(
port,
return_sequence(vec![
Expand All @@ -717,7 +729,8 @@ mod tests {
return_5xx(503),
Box::new(check_and_return_mock_response),
]),
);
)
.await;
let client = create_client(
port,
Retries {
Expand All @@ -739,8 +752,8 @@ mod tests {
#[tokio::test]
async fn status_code_5xx_without_retries() {
// given
let port = get_available_port().unwrap();
let _ = create_server(port, return_sequence(vec![return_5xx(500)]));
let port = get_available_port().await.unwrap();
let _ = create_server(port, return_sequence(vec![return_5xx(500)])).await;
let client = create_client(
port,
Retries {
Expand Down

0 comments on commit e08f69c

Please sign in to comment.