Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report error when we cannot deserialize the payload. #520

Merged
merged 1 commit into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 46 additions & 43 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,20 @@
//! Create a type that conforms to the [`tower::Service`] trait. This type can
//! then be passed to the the `lambda_runtime::run` function, which launches
//! and runs the Lambda runtime.
use hyper::client::{connect::Connection, HttpConnector};
use hyper::{
client::{connect::Connection, HttpConnector},
http::Request,
Body,
};
use lambda_runtime_api_client::Client;
use serde::{Deserialize, Serialize};
use std::{convert::TryFrom, env, fmt, future::Future, panic};
use std::{
convert::TryFrom,
env,
fmt::{self, Debug, Display},
future::Future,
panic,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::{Stream, StreamExt};
pub use tower::{self, service_fn, Service};
Expand All @@ -24,7 +34,6 @@ mod simulated;
mod types;

use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest};
use types::Diagnostic;
pub use types::{Context, LambdaEvent};

/// Error type that lambdas may result in
Expand Down Expand Up @@ -121,12 +130,20 @@ where

let ctx: Context = Context::try_from(parts.headers)?;
let ctx: Context = ctx.with_config(config);
let body = serde_json::from_slice(&body)?;
let request_id = &ctx.request_id.clone();

let xray_trace_id = &ctx.xray_trace_id.clone();
env::set_var("_X_AMZN_TRACE_ID", xray_trace_id);

let request_id = &ctx.request_id.clone();
let body = match serde_json::from_slice(&body) {
Ok(body) => body,
Err(err) => {
let req = build_event_error_request(request_id, err)?;
client.call(req).await.expect("Unable to send response to Runtime APIs");
return Ok(());
}
};

let req = match handler.ready().await {
Ok(handler) => {
let task =
Expand All @@ -141,48 +158,23 @@ where
}
.into_req()
}
Err(err) => {
error!("{:?}", err); // logs the error in CloudWatch
EventErrorRequest {
request_id,
diagnostic: Diagnostic {
error_type: type_name_of_val(&err).to_owned(),
error_message: format!("{}", err), // returns the error to the caller via Lambda API
},
}
.into_req()
}
Err(err) => build_event_error_request(request_id, err),
},
Err(err) => {
error!("{:?}", err);
EventErrorRequest {
request_id,
diagnostic: Diagnostic {
error_type: type_name_of_val(&err).to_owned(),
error_message: if let Some(msg) = err.downcast_ref::<&str>() {
format!("Lambda panicked: {}", msg)
} else {
"Lambda panicked".to_string()
},
},
}
.into_req()
let error_type = type_name_of_val(&err);
let msg = if let Some(msg) = err.downcast_ref::<&str>() {
format!("Lambda panicked: {}", msg)
} else {
"Lambda panicked".to_string()
};
EventErrorRequest::new(request_id, error_type, &msg).into_req()
}
}
}
Err(err) => {
error!("{:?}", err); // logs the error in CloudWatch
EventErrorRequest {
request_id,
diagnostic: Diagnostic {
error_type: type_name_of_val(&err).to_owned(),
error_message: format!("{}", err), // returns the error to the caller via Lambda API
},
}
.into_req()
}
};
let req = req?;
Err(err) => build_event_error_request(request_id, err),
}?;

client.call(req).await.expect("Unable to send response to Runtime APIs");
}
Ok(())
Expand Down Expand Up @@ -247,6 +239,17 @@ fn type_name_of_val<T>(_: T) -> &'static str {
std::any::type_name::<T>()
}

fn build_event_error_request<T>(request_id: &str, err: T) -> Result<Request<Body>, Error>
where
T: Display + Debug,
{
error!("{:?}", err); // logs the error in CloudWatch
let error_type = type_name_of_val(&err);
let msg = format!("{}", err);

EventErrorRequest::new(request_id, error_type, &msg).into_req()
}

#[cfg(test)]
mod endpoint_tests {
use crate::{
Expand Down Expand Up @@ -431,8 +434,8 @@ mod endpoint_tests {
let req = EventErrorRequest {
request_id: "156cb537-e2d4-11e8-9b34-d36013741fb9",
diagnostic: Diagnostic {
error_type: "InvalidEventDataError".to_string(),
error_message: "Error parsing event data".to_string(),
error_type: "InvalidEventDataError",
error_message: "Error parsing event data",
},
};
let req = req.into_req()?;
Expand Down
18 changes: 15 additions & 3 deletions lambda-runtime/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,19 @@ fn test_event_completion_request() {
// /runtime/invocation/{AwsRequestId}/error
pub(crate) struct EventErrorRequest<'a> {
pub(crate) request_id: &'a str,
pub(crate) diagnostic: Diagnostic,
pub(crate) diagnostic: Diagnostic<'a>,
}

impl<'a> EventErrorRequest<'a> {
pub(crate) fn new(request_id: &'a str, error_type: &'a str, error_message: &'a str) -> EventErrorRequest<'a> {
EventErrorRequest {
request_id,
diagnostic: Diagnostic {
error_type,
error_message,
},
}
}
}

impl<'a> IntoRequest for EventErrorRequest<'a> {
Expand All @@ -128,8 +140,8 @@ fn test_event_error_request() {
let req = EventErrorRequest {
request_id: "id",
diagnostic: Diagnostic {
error_type: "InvalidEventDataError".to_string(),
error_message: "Error parsing event data".to_string(),
error_type: "InvalidEventDataError",
error_message: "Error parsing event data",
},
};
let req = req.into_req().unwrap();
Expand Down
37 changes: 19 additions & 18 deletions lambda-runtime/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,9 @@ use std::{collections::HashMap, convert::TryFrom};

#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Diagnostic {
pub(crate) error_type: String,
pub(crate) error_message: String,
}

#[test]
fn round_trip_lambda_error() -> Result<(), Error> {
use serde_json::{json, Value};
let expected = json!({
"errorType": "InvalidEventDataError",
"errorMessage": "Error parsing event data.",
});

let actual: Diagnostic = serde_json::from_value(expected.clone())?;
let actual: Value = serde_json::to_value(actual)?;
assert_eq!(expected, actual);

Ok(())
pub(crate) struct Diagnostic<'a> {
pub(crate) error_type: &'a str,
pub(crate) error_message: &'a str,
}

/// The request ID, which identifies the request that triggered the function invocation. This header
Expand Down Expand Up @@ -191,6 +176,22 @@ impl<T> LambdaEvent<T> {
mod test {
use super::*;

#[test]
fn round_trip_lambda_error() {
use serde_json::{json, Value};
let expected = json!({
"errorType": "InvalidEventDataError",
"errorMessage": "Error parsing event data.",
});

let actual = Diagnostic {
error_type: "InvalidEventDataError",
error_message: "Error parsing event data.",
};
let actual: Value = serde_json::to_value(actual).expect("failed to serialize diagnostic");
assert_eq!(expected, actual);
}

#[test]
fn context_with_expected_values_and_types_resolves() {
let mut headers = HeaderMap::new();
Expand Down