Skip to content
This repository was archived by the owner on Aug 16, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ readme = "README.md"
[dependencies]
env_logger = "0.7.1"
log = "0.4.8"
anyhow = "1.0.28"
wascc-codec = "0.5.2"
reqwest = { version = "0.10.3", features = ["blocking", "json"] }
serde = "1.0.104"
serde_json = "1.0.48"
reqwest = { version = "0.10.4", features = ["blocking", "json"] }
serde = "1.0.105"
serde_json = "1.0.50"
codec = { path = "../codec" }
59 changes: 36 additions & 23 deletions provider/src/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,58 @@
// waSCC AWS Lambda Runtime Provider
//

use reqwest::header::USER_AGENT;
use serde_json;
use std::error::Error;

// Represents an invocation event.
pub struct InvocationEvent {
/// Represents an invocation event.
pub(crate) struct InvocationEvent {
body: Vec<u8>,
request_id: Option<String>,
trace_id: Option<String>,
}

// Represents an invocation response.
pub struct InvocationResponse {
/// Represents an invocation response.
pub(crate) struct InvocationResponse {
body: Vec<u8>,
request_id: String,
}

// Represents an invocation error.
pub struct InvocationError {
/// Represents an invocation error.
pub(crate) struct InvocationError {
error: Box<dyn Error>,
request_id: String,
}

// Represents an AWS Lambda runtime HTTP client.
pub struct RuntimeClient {
/// Represents an AWS Lambda runtime client.
pub(crate) struct Client {
endpoint: String,
http_client: reqwest::blocking::Client,
user_agent: String,
}

impl RuntimeClient {
// Creates a new `RuntimeClient` with the specified AWS Lambda runtime API endpoint.
impl Client {
/// Creates a new `RuntimeClient` with the specified AWS Lambda runtime API endpoint.
pub fn new(endpoint: &str) -> Self {
RuntimeClient {
Client {
endpoint: endpoint.into(),
http_client: reqwest::blocking::Client::new(),
user_agent: format!("AWS_Lambda_waSCC/{}", env!("CARGO_PKG_VERSION")),
}
}

// Returns the next AWS Lambda invocation event.
pub fn next_invocation_event(&self) -> Result<Option<InvocationEvent>, Box<dyn Error>> {
/// Returns the next AWS Lambda invocation event.
pub fn next_invocation_event(&self) -> anyhow::Result<Option<InvocationEvent>> {
// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-next
let url = format!(
"http://{}/2018-06-01/runtime/invocation/next",
self.endpoint
);
let mut resp = self.http_client.get(&url).send()?;
let mut resp = self
.http_client
.get(&url)
.header(USER_AGENT, self.user_agent.clone())
.send()?;
let status = resp.status();
info!(
"GET {} {} {}",
Expand All @@ -85,8 +92,8 @@ impl RuntimeClient {
Ok(Some(event))
}

// Sends an invocation error to the AWS Lambda runtime.
pub fn send_invocation_error(&self, error: InvocationError) -> Result<(), Box<dyn Error>> {
/// Sends an invocation error to the AWS Lambda runtime.
pub fn send_invocation_error(&self, error: InvocationError) -> anyhow::Result<()> {
// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-invokeerror
let url = format!(
"http://{}/2018-06-01/runtime/invocation/{}/error",
Expand All @@ -95,6 +102,7 @@ impl RuntimeClient {
let resp = self
.http_client
.post(&url)
.header(USER_AGENT, self.user_agent.clone())
.json(&serde_json::json!({
"errorMessage": error.error.to_string(),
}))
Expand All @@ -110,14 +118,19 @@ impl RuntimeClient {
Ok(())
}

// Sends an invocation response to the AWS Lambda runtime.
pub fn send_invocation_response(&self, resp: InvocationResponse) -> Result<(), Box<dyn Error>> {
/// Sends an invocation response to the AWS Lambda runtime.
pub fn send_invocation_response(&self, resp: InvocationResponse) -> anyhow::Result<()> {
// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-response
let url = format!(
"http://{}/2018-06-01/runtime/invocation/{}/response",
self.endpoint, resp.request_id
);
let resp = self.http_client.post(&url).body(resp.body).send()?;
let resp = self
.http_client
.post(&url)
.header(USER_AGENT, self.user_agent.clone())
.body(resp.body)
.send()?;
let status = resp.status();
info!(
"POST {} {} {}",
Expand All @@ -131,7 +144,7 @@ impl RuntimeClient {
}

impl InvocationEvent {
// Creates a new `InvocationEvent` with the specified body.
/// Creates a new `InvocationEvent` with the specified body.
pub fn new(body: Vec<u8>) -> Self {
InvocationEvent {
body: body,
Expand All @@ -140,17 +153,17 @@ impl InvocationEvent {
}
}

// Returns the event body.
/// Returns the event body.
pub fn body(&self) -> &Vec<u8> {
self.body.as_ref()
}

// Returns any request ID.
/// Returns any request ID.
pub fn request_id(&self) -> Option<&str> {
self.request_id.as_deref()
}

// Returns any trace ID.
/// Returns any trace ID.
pub fn trace_id(&self) -> Option<&str> {
self.trace_id.as_deref()
}
Expand Down
16 changes: 11 additions & 5 deletions provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// waSCC AWS Lambda Runtime Provider
//

#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate log;
#[macro_use]
Expand Down Expand Up @@ -46,7 +48,7 @@ pub struct AwsLambdaRuntimeProvider {

/// Polls the Lambda event machinery.
struct Poller {
client: lambda::RuntimeClient,
client: lambda::Client,
dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
module_id: String,
shutdown: Arc<RwLock<HashMap<String, bool>>>,
Expand All @@ -73,13 +75,17 @@ impl AwsLambdaRuntimeProvider {
}

/// Starts polling the Lambda event machinery.
fn start_polling(&self, config: CapabilityConfiguration) -> Result<(), Box<dyn Error>> {
fn start_polling(&self, config: CapabilityConfiguration) -> anyhow::Result<()> {
info!("awslambda:runtime start_polling");

let dispatcher = Arc::clone(&self.dispatcher);
let endpoint = match config.values.get("AWS_LAMBDA_RUNTIME_API") {
Some(ep) => String::from(ep),
None => return Err("Missing configuration value: AWS_LAMBDA_RUNTIME_API".into()),
None => {
return Err(anyhow!(
"Missing configuration value: AWS_LAMBDA_RUNTIME_API"
))
}
};
let module_id = config.module;
let shutdown = Arc::clone(&self.shutdown);
Expand All @@ -97,7 +103,7 @@ impl AwsLambdaRuntimeProvider {
}

/// Stops any running Lambda poller.
fn stop_polling(&self, config: CapabilityConfiguration) -> Result<(), Box<dyn Error>> {
fn stop_polling(&self, config: CapabilityConfiguration) -> anyhow::Result<()> {
info!("awslambda:runtime stop_polling");

let module_id = &config.module;
Expand Down Expand Up @@ -165,7 +171,7 @@ impl Poller {
shutdown: Arc<RwLock<HashMap<String, bool>>>,
) -> Self {
Poller {
client: lambda::RuntimeClient::new(endpoint),
client: lambda::Client::new(endpoint),
dispatcher,
module_id: module_id.into(),
shutdown,
Expand Down
1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ readme = "README.md"
[dependencies]
env_logger = "0.7.1"
log = "0.4.8"
anyhow = "1.0.28"
provider = { path = "../provider" }
wascc-host = { version = "0.5.3", features = ["manifest", "gantry"] }

Expand Down
20 changes: 13 additions & 7 deletions runtime/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
// waSCC AWS Lambda Runtime
//

#[macro_use]
extern crate anyhow;

extern crate provider;

use env_logger;
Expand All @@ -28,7 +31,7 @@ use wascc_host::{host, HostManifest, NativeCapability};
const MANIFEST_FILE: &str = "manifest.yaml";

/// Entry point.
fn main() -> Result<(), Box<dyn Error>> {
fn main() -> anyhow::Result<()> {
// No timestamp in the log format as CloudWatch already adds it.
if env_logger::builder()
.format_timestamp(None)
Expand All @@ -41,15 +44,18 @@ fn main() -> Result<(), Box<dyn Error>> {
info!("aws-lambda-wascc-runtime starting");

let rt = provider::AwsLambdaRuntimeProvider::new();
host::add_native_capability(NativeCapability::from_instance(rt)?)?;
let cap = NativeCapability::from_instance(rt)
.map_err(|e| anyhow!("Failed to create Lambda runtime provider: {}", e))?;
host::add_native_capability(cap)
.map_err(|e| anyhow!("Failed to load Lambda runtime provider: {}", e))?;

// Load from well-known manifest file and expand any environment variables.
if let Some(cwd) = std::env::current_dir()?.to_str() {
info!("Loading {} from {}", MANIFEST_FILE, cwd);
}

// Load from well-known manifest file and expand any environment variables.
let manifest = HostManifest::from_yaml(MANIFEST_FILE, true)?;
host::apply_manifest(manifest)?;
let manifest = HostManifest::from_yaml(MANIFEST_FILE, true)
.map_err(|e| anyhow!("Failed to load manifest file: {}", e))?;
host::apply_manifest(manifest).map_err(|e| anyhow!("Failed to apply manifest: {}", e))?;

autoconfigure_runtime()?;

Expand All @@ -62,7 +68,7 @@ fn main() -> Result<(), Box<dyn Error>> {
}

/// Autoconfigures any actors that have the awslambda:runtime capability.
fn autoconfigure_runtime() -> Result<(), Box<dyn Error>> {
fn autoconfigure_runtime() -> anyhow::Result<()> {
let mut values = HashMap::new();
// https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html#configuration-envvars-runtime
let keys = vec![
Expand Down