Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(http server): handle post and option HTTP requests properly. #637

Merged
merged 19 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions http-server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,12 @@ fn from_template<S: Into<hyper::Body>>(
pub fn ok_response(body: String) -> hyper::Response<hyper::Body> {
from_template(hyper::StatusCode::OK, body, JSON)
}

/// Create a response for unsupported content type.
pub fn unsupported_content_type() -> hyper::Response<hyper::Body> {
from_template(
hyper::StatusCode::UNSUPPORTED_MEDIA_TYPE,
"Supplied content type is not allowed. Content-Type: application/json is required\n".to_owned(),
TEXT,
)
}
237 changes: 125 additions & 112 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use futures_channel::mpsc;
use futures_util::{future::join_all, stream::StreamExt, FutureExt};
use hyper::server::{conn::AddrIncoming, Builder as HyperBuilder};
use hyper::service::{make_service_fn, service_fn};
use hyper::Error as HyperError;
use hyper::{Error as HyperError, Method};
use jsonrpsee_core::error::{Error, GenericTransportError};
use jsonrpsee_core::http_helpers::read_body;
use jsonrpsee_core::id_providers::NoopIdProvider;
Expand Down Expand Up @@ -302,116 +302,25 @@ impl<M: Middleware> Server<M> {
// two cases: a single RPC request or a batch of RPC requests.
async move {
if let Err(e) = access_control_is_valid(&access_control, &request) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this became hard to review, however validate origins and headers before checking the method kind.

return Ok::<_, HyperError>(e);
return Ok(e);
}

if let Err(e) = content_type_is_valid(&request) {
return Ok::<_, HyperError>(e);
}

let (parts, body) = request.into_parts();

let (body, mut is_single) = match read_body(&parts.headers, body, max_request_body_size).await {
Ok(r) => r,
Err(GenericTransportError::TooLarge) => return Ok::<_, HyperError>(response::too_large()),
Err(GenericTransportError::Malformed) => return Ok::<_, HyperError>(response::malformed()),
Err(GenericTransportError::Inner(e)) => {
tracing::error!("Internal error reading request body: {}", e);
return Ok::<_, HyperError>(response::internal_error());
}
};

let request_start = middleware.on_request();

// NOTE(niklasad1): it's a channel because it's needed for batch requests.
let (tx, mut rx) = mpsc::unbounded::<String>();
let sink = MethodSink::new_with_limit(tx, max_request_body_size);

type Notif<'a> = Notification<'a, Option<&'a RawValue>>;

// Single request or notification
if is_single {
if let Ok(req) = serde_json::from_slice::<Request>(&body) {
middleware.on_call(req.method.as_ref());

// NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here.
match methods.execute_with_resources(&sink, req, 0, &resources, &NoopIdProvider) {
Ok((name, MethodResult::Sync(success))) => {
middleware.on_result(name, success, request_start);
}
Ok((name, MethodResult::Async(fut))) => {
let success = fut.await;

middleware.on_result(name, success, request_start);
}
Err(name) => {
middleware.on_result(name.as_ref(), false, request_start);
}
}
} else if let Ok(_req) = serde_json::from_slice::<Notif>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
let (id, code) = prepare_error(&body);
sink.send_error(id, code.into());
// Only `POST` and `OPTIONS` methods are allowed.
match *request.method() {
Method::POST if content_type_is_json(&request) => {
process_validated_request(
request,
middleware,
methods,
resources,
max_request_body_size,
)
.await
}

// Batch of requests or notifications
} else if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&body) {
if !batch.is_empty() {
let middleware = &middleware;

join_all(batch.into_iter().filter_map(
move |req| match methods.execute_with_resources(
&sink,
req,
0,
&resources,
&NoopIdProvider,
) {
Ok((name, MethodResult::Sync(success))) => {
middleware.on_result(name, success, request_start);
None
}
Ok((name, MethodResult::Async(fut))) => Some(async move {
let success = fut.await;
middleware.on_result(name, success, request_start);
}),
Err(name) => {
middleware.on_result(name.as_ref(), false, request_start);
None
}
},
))
.await;
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
sink.send_error(Id::Null, ErrorCode::InvalidRequest.into());
}
} else if let Ok(_batch) = serde_json::from_slice::<Vec<Notif>>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
let (id, code) = prepare_error(&body);
sink.send_error(id, code.into());
Method::POST => Ok(response::unsupported_content_type()),
Method::OPTIONS => Ok(response::ok_response("".into())),
_ => Ok(response::method_not_allowed()),
}

// Closes the receiving half of a channel without dropping it. This prevents any further
// messages from being sent on the channel.
rx.close();
let response = if is_single {
rx.next().await.expect("Sender is still alive managed by us above; qed")
} else {
collect_batch_response(rx).await
};
tracing::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]);
middleware.on_response(request_start);
Ok::<_, HyperError>(response::ok_response(response))
}
}))
}
Expand Down Expand Up @@ -449,11 +358,8 @@ fn access_control_is_valid(
}

/// Checks that content type of received request is valid for JSON-RPC.
fn content_type_is_valid(request: &hyper::Request<hyper::Body>) -> Result<(), hyper::Response<hyper::Body>> {
match *request.method() {
hyper::Method::POST if is_json(request.headers().get("content-type")) => Ok(()),
_ => Err(response::method_not_allowed()),
}
fn content_type_is_json(request: &hyper::Request<hyper::Body>) -> bool {
is_json(request.headers().get("content-type"))
}

/// Returns true if the `content_type` header indicates a valid JSON message.
Expand All @@ -469,3 +375,110 @@ fn is_json(content_type: Option<&hyper::header::HeaderValue>) -> bool {
_ => false,
}
}

/// Process a verified request, it implies a POST request with content type JSON.
async fn process_validated_request(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to a separate function to decrease the indentation level and become more readable...

request: hyper::Request<hyper::Body>,
middleware: impl Middleware,
methods: Methods,
resources: Resources,
max_request_body_size: u32,
) -> Result<hyper::Response<hyper::Body>, HyperError> {
let (parts, body) = request.into_parts();

let (body, mut is_single) = match read_body(&parts.headers, body, max_request_body_size).await {
Ok(r) => r,
Err(GenericTransportError::TooLarge) => return Ok(response::too_large()),
Err(GenericTransportError::Malformed) => return Ok(response::malformed()),
Err(GenericTransportError::Inner(e)) => {
tracing::error!("Internal error reading request body: {}", e);
return Ok(response::internal_error());
}
};

let request_start = middleware.on_request();

// NOTE(niklasad1): it's a channel because it's needed for batch requests.
let (tx, mut rx) = mpsc::unbounded::<String>();
let sink = MethodSink::new_with_limit(tx, max_request_body_size);

type Notif<'a> = Notification<'a, Option<&'a RawValue>>;

// Single request or notification
if is_single {
if let Ok(req) = serde_json::from_slice::<Request>(&body) {
middleware.on_call(req.method.as_ref());

// NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here.
match methods.execute_with_resources(&sink, req, 0, &resources, &NoopIdProvider) {
Ok((name, MethodResult::Sync(success))) => {
middleware.on_result(name, success, request_start);
}
Ok((name, MethodResult::Async(fut))) => {
let success = fut.await;

middleware.on_result(name, success, request_start);
}
Err(name) => {
middleware.on_result(name.as_ref(), false, request_start);
}
}
} else if let Ok(_req) = serde_json::from_slice::<Notif>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
let (id, code) = prepare_error(&body);
sink.send_error(id, code.into());
}

// Batch of requests or notifications
} else if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&body) {
if !batch.is_empty() {
let middleware = &middleware;

join_all(batch.into_iter().filter_map(move |req| {
match methods.execute_with_resources(&sink, req, 0, &resources, &NoopIdProvider) {
Ok((name, MethodResult::Sync(success))) => {
middleware.on_result(name, success, request_start);
None
}
Ok((name, MethodResult::Async(fut))) => Some(async move {
let success = fut.await;
middleware.on_result(name, success, request_start);
}),
Err(name) => {
middleware.on_result(name.as_ref(), false, request_start);
None
}
}
}))
.await;
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
sink.send_error(Id::Null, ErrorCode::InvalidRequest.into());
}
} else if let Ok(_batch) = serde_json::from_slice::<Vec<Notif>>(&body) {
return Ok(response::ok_response("".into()));
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
let (id, code) = prepare_error(&body);
sink.send_error(id, code.into());
}

// Closes the receiving half of a channel without dropping it. This prevents any further
// messages from being sent on the channel.
rx.close();
let response = if is_single {
rx.next().await.expect("Sender is still alive managed by us above; qed")
} else {
collect_batch_response(rx).await
};
tracing::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]);
middleware.on_response(request_start);
Ok(response::ok_response(response))
}