Skip to content

Commit

Permalink
Report error when we cannot deserialize the payload. (#520)
Browse files Browse the repository at this point in the history
Instead of panic, capture the error, and report it to the Lambda api. This is more friendly to operate.

Signed-off-by: David Calavera <david.calavera@gmail.com>

Signed-off-by: David Calavera <david.calavera@gmail.com>
  • Loading branch information
calavera authored Aug 31, 2022
1 parent fd2ea23 commit 756dfc4
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 64 deletions.
89 changes: 46 additions & 43 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,20 @@
//! Create a type that conforms to the [`tower::Service`] trait. This type can
//! then be passed to the the `lambda_runtime::run` function, which launches
//! and runs the Lambda runtime.
use hyper::client::{connect::Connection, HttpConnector};
use hyper::{
client::{connect::Connection, HttpConnector},
http::Request,
Body,
};
use lambda_runtime_api_client::Client;
use serde::{Deserialize, Serialize};
use std::{convert::TryFrom, env, fmt, future::Future, panic};
use std::{
convert::TryFrom,
env,
fmt::{self, Debug, Display},
future::Future,
panic,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::{Stream, StreamExt};
pub use tower::{self, service_fn, Service};
Expand All @@ -24,7 +34,6 @@ mod simulated;
mod types;

use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest};
use types::Diagnostic;
pub use types::{Context, LambdaEvent};

/// Error type that lambdas may result in
Expand Down Expand Up @@ -121,12 +130,20 @@ where

let ctx: Context = Context::try_from(parts.headers)?;
let ctx: Context = ctx.with_config(config);
let body = serde_json::from_slice(&body)?;
let request_id = &ctx.request_id.clone();

let xray_trace_id = &ctx.xray_trace_id.clone();
env::set_var("_X_AMZN_TRACE_ID", xray_trace_id);

let request_id = &ctx.request_id.clone();
let body = match serde_json::from_slice(&body) {
Ok(body) => body,
Err(err) => {
let req = build_event_error_request(request_id, err)?;
client.call(req).await.expect("Unable to send response to Runtime APIs");
return Ok(());
}
};

let req = match handler.ready().await {
Ok(handler) => {
let task =
Expand All @@ -141,48 +158,23 @@ where
}
.into_req()
}
Err(err) => {
error!("{:?}", err); // logs the error in CloudWatch
EventErrorRequest {
request_id,
diagnostic: Diagnostic {
error_type: type_name_of_val(&err).to_owned(),
error_message: format!("{}", err), // returns the error to the caller via Lambda API
},
}
.into_req()
}
Err(err) => build_event_error_request(request_id, err),
},
Err(err) => {
error!("{:?}", err);
EventErrorRequest {
request_id,
diagnostic: Diagnostic {
error_type: type_name_of_val(&err).to_owned(),
error_message: if let Some(msg) = err.downcast_ref::<&str>() {
format!("Lambda panicked: {}", msg)
} else {
"Lambda panicked".to_string()
},
},
}
.into_req()
let error_type = type_name_of_val(&err);
let msg = if let Some(msg) = err.downcast_ref::<&str>() {
format!("Lambda panicked: {}", msg)
} else {
"Lambda panicked".to_string()
};
EventErrorRequest::new(request_id, error_type, &msg).into_req()
}
}
}
Err(err) => {
error!("{:?}", err); // logs the error in CloudWatch
EventErrorRequest {
request_id,
diagnostic: Diagnostic {
error_type: type_name_of_val(&err).to_owned(),
error_message: format!("{}", err), // returns the error to the caller via Lambda API
},
}
.into_req()
}
};
let req = req?;
Err(err) => build_event_error_request(request_id, err),
}?;

client.call(req).await.expect("Unable to send response to Runtime APIs");
}
Ok(())
Expand Down Expand Up @@ -247,6 +239,17 @@ fn type_name_of_val<T>(_: T) -> &'static str {
std::any::type_name::<T>()
}

fn build_event_error_request<T>(request_id: &str, err: T) -> Result<Request<Body>, Error>
where
T: Display + Debug,
{
error!("{:?}", err); // logs the error in CloudWatch
let error_type = type_name_of_val(&err);
let msg = format!("{}", err);

EventErrorRequest::new(request_id, error_type, &msg).into_req()
}

#[cfg(test)]
mod endpoint_tests {
use crate::{
Expand Down Expand Up @@ -431,8 +434,8 @@ mod endpoint_tests {
let req = EventErrorRequest {
request_id: "156cb537-e2d4-11e8-9b34-d36013741fb9",
diagnostic: Diagnostic {
error_type: "InvalidEventDataError".to_string(),
error_message: "Error parsing event data".to_string(),
error_type: "InvalidEventDataError",
error_message: "Error parsing event data",
},
};
let req = req.into_req()?;
Expand Down
18 changes: 15 additions & 3 deletions lambda-runtime/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,19 @@ fn test_event_completion_request() {
// /runtime/invocation/{AwsRequestId}/error
pub(crate) struct EventErrorRequest<'a> {
pub(crate) request_id: &'a str,
pub(crate) diagnostic: Diagnostic,
pub(crate) diagnostic: Diagnostic<'a>,
}

impl<'a> EventErrorRequest<'a> {
pub(crate) fn new(request_id: &'a str, error_type: &'a str, error_message: &'a str) -> EventErrorRequest<'a> {
EventErrorRequest {
request_id,
diagnostic: Diagnostic {
error_type,
error_message,
},
}
}
}

impl<'a> IntoRequest for EventErrorRequest<'a> {
Expand All @@ -128,8 +140,8 @@ fn test_event_error_request() {
let req = EventErrorRequest {
request_id: "id",
diagnostic: Diagnostic {
error_type: "InvalidEventDataError".to_string(),
error_message: "Error parsing event data".to_string(),
error_type: "InvalidEventDataError",
error_message: "Error parsing event data",
},
};
let req = req.into_req().unwrap();
Expand Down
37 changes: 19 additions & 18 deletions lambda-runtime/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,9 @@ use std::{collections::HashMap, convert::TryFrom};

#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Diagnostic {
pub(crate) error_type: String,
pub(crate) error_message: String,
}

#[test]
fn round_trip_lambda_error() -> Result<(), Error> {
use serde_json::{json, Value};
let expected = json!({
"errorType": "InvalidEventDataError",
"errorMessage": "Error parsing event data.",
});

let actual: Diagnostic = serde_json::from_value(expected.clone())?;
let actual: Value = serde_json::to_value(actual)?;
assert_eq!(expected, actual);

Ok(())
pub(crate) struct Diagnostic<'a> {
pub(crate) error_type: &'a str,
pub(crate) error_message: &'a str,
}

/// The request ID, which identifies the request that triggered the function invocation. This header
Expand Down Expand Up @@ -191,6 +176,22 @@ impl<T> LambdaEvent<T> {
mod test {
use super::*;

#[test]
fn round_trip_lambda_error() {
use serde_json::{json, Value};
let expected = json!({
"errorType": "InvalidEventDataError",
"errorMessage": "Error parsing event data.",
});

let actual = Diagnostic {
error_type: "InvalidEventDataError",
error_message: "Error parsing event data.",
};
let actual: Value = serde_json::to_value(actual).expect("failed to serialize diagnostic");
assert_eq!(expected, actual);
}

#[test]
fn context_with_expected_values_and_types_resolves() {
let mut headers = HeaderMap::new();
Expand Down

0 comments on commit 756dfc4

Please sign in to comment.