Skip to content

Commit fc51300

Browse files
authored
Check that lambda_https inner service is ready before calling it (#538)
* Check that `lambda_http`s inner service is ready before calling it This allows for service [backpressure](https://docs.rs/tower/0.4.13/tower/trait.Service.html#backpressure). Additionally: > Services are permitted to panic if `call` is invoked without obtaining `Poll::Ready(Ok(()))` from `poll_ready`. [Source](https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services) * Check that `lambda_extension`s inner service is ready before calling it This allows for service [backpressure](https://docs.rs/tower/0.4.13/tower/trait.Service.html#backpressure). Additionally: > Services are permitted to panic if `call` is invoked without obtaining `Poll::Ready(Ok(()))` from `poll_ready`. [Source](https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services) * Check that `lambda_runtime`s inner service is ready before calling it This allows for service [backpressure](https://docs.rs/tower/0.4.13/tower/trait.Service.html#backpressure). Additionally: > Services are permitted to panic if `call` is invoked without obtaining `Poll::Ready(Ok(()))` from `poll_ready`. [Source](https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services)
1 parent d7ab6ae commit fc51300

File tree

5 files changed

+153
-14
lines changed

5 files changed

+153
-14
lines changed

Diff for: lambda-extension/src/extension.rs

+21-4
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1-
use crate::{logs::*, requests, Error, ExtensionError, LambdaEvent, NextEvent};
2-
use hyper::{server::conn::AddrStream, Server};
3-
use lambda_runtime_api_client::Client;
41
use std::{
52
convert::Infallible, fmt, future::ready, future::Future, net::SocketAddr, path::PathBuf, pin::Pin, sync::Arc,
63
};
4+
5+
use hyper::{server::conn::AddrStream, Server};
6+
use lambda_runtime_api_client::Client;
77
use tokio::sync::Mutex;
88
use tokio_stream::StreamExt;
9-
use tower::{service_fn, MakeService, Service};
9+
use tower::{service_fn, MakeService, Service, ServiceExt};
1010
use tracing::{error, trace};
1111

12+
use crate::{logs::*, requests, Error, ExtensionError, LambdaEvent, NextEvent};
13+
1214
const DEFAULT_LOG_PORT_NUMBER: u16 = 9002;
1315

1416
/// An Extension that runs event and log processors
@@ -199,6 +201,21 @@ where
199201

200202
let event = LambdaEvent::new(extension_id, event);
201203

204+
let ep = match ep.ready().await {
205+
Ok(ep) => ep,
206+
Err(error) => {
207+
println!("Inner service is not ready: {:?}", error);
208+
let req = if is_invoke {
209+
requests::init_error(extension_id, &error.to_string(), None)?
210+
} else {
211+
requests::exit_error(extension_id, &error.to_string(), None)?
212+
};
213+
214+
client.call(req).await?;
215+
return Err(error.into());
216+
}
217+
};
218+
202219
let res = ep.call(event).await;
203220
if let Err(error) = res {
204221
println!("{:?}", error);

Diff for: lambda-http/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ where
155155
type Error = E;
156156
type Future = TransformResponse<'a, R, Self::Error>;
157157

158-
fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
159-
core::task::Poll::Ready(Ok(()))
158+
fn poll_ready(&mut self, cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
159+
self.service.poll_ready(cx)
160160
}
161161

162162
fn call(&mut self, req: LambdaEvent<LambdaRequest>) -> Self::Future {

Diff for: lambda-integration-tests/src/bin/extension-trait.rs

+42-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,36 @@
1-
use lambda_extension::{Error, LambdaEvent, NextEvent, Service};
21
use std::{
32
future::{ready, Future},
43
pin::Pin,
4+
sync::atomic::{AtomicBool, Ordering},
55
};
6+
7+
use lambda_extension::{Error, LambdaEvent, NextEvent, Service};
68
use tracing::info;
79

8-
#[derive(Default)]
910
struct MyExtension {
1011
invoke_count: usize,
12+
ready: AtomicBool,
13+
}
14+
15+
impl Default for MyExtension {
16+
fn default() -> Self {
17+
Self {
18+
invoke_count: usize::default(),
19+
// New instances are not ready to be called until polled.
20+
ready: false.into(),
21+
}
22+
}
23+
}
24+
25+
impl Clone for MyExtension {
26+
fn clone(&self) -> Self {
27+
Self {
28+
invoke_count: self.invoke_count,
29+
// Cloned instances may not be immediately ready to be called.
30+
// https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services
31+
ready: false.into(),
32+
}
33+
}
1134
}
1235

1336
impl Service<LambdaEvent> for MyExtension {
@@ -16,6 +39,12 @@ impl Service<LambdaEvent> for MyExtension {
1639
type Response = ();
1740

1841
fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
42+
if self.ready.swap(true, Ordering::SeqCst) {
43+
info!("[extension] Service was already ready");
44+
} else {
45+
info!("[extension] Service is now ready");
46+
};
47+
1948
core::task::Poll::Ready(Ok(()))
2049
}
2150

@@ -30,6 +59,17 @@ impl Service<LambdaEvent> for MyExtension {
3059
}
3160
}
3261

62+
// After being called once, the service is no longer ready until polled again.
63+
if self.ready.swap(false, Ordering::SeqCst) {
64+
info!("[extension] The service is ready");
65+
} else {
66+
// https://docs.rs/tower/latest/tower/trait.Service.html#backpressure
67+
// https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
68+
// > Services are permitted to panic if `call` is invoked without obtaining
69+
// > `Poll::Ready(Ok(()))` from `poll_ready`.
70+
panic!("[extension] The service is not ready; `.poll_ready()` must be called first");
71+
}
72+
3373
Box::pin(ready(Ok(())))
3474
}
3575
}

Diff for: lambda-integration-tests/src/bin/http-trait.rs

+43-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,36 @@
1-
use lambda_http::{Body, Error, Request, RequestExt, Response, Service};
21
use std::{
32
future::{ready, Future},
43
pin::Pin,
4+
sync::atomic::{AtomicBool, Ordering},
55
};
6+
7+
use lambda_http::{Body, Error, Request, RequestExt, Response, Service};
68
use tracing::info;
79

8-
#[derive(Default)]
910
struct MyHandler {
1011
invoke_count: usize,
12+
ready: AtomicBool,
13+
}
14+
15+
impl Default for MyHandler {
16+
fn default() -> Self {
17+
Self {
18+
invoke_count: usize::default(),
19+
// New instances are not ready to be called until polled.
20+
ready: false.into(),
21+
}
22+
}
23+
}
24+
25+
impl Clone for MyHandler {
26+
fn clone(&self) -> Self {
27+
Self {
28+
invoke_count: self.invoke_count,
29+
// Cloned instances may not be immediately ready to be called.
30+
// https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services
31+
ready: false.into(),
32+
}
33+
}
1134
}
1235

1336
impl Service<Request> for MyHandler {
@@ -16,13 +39,31 @@ impl Service<Request> for MyHandler {
1639
type Response = Response<Body>;
1740

1841
fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
42+
if self.ready.swap(true, Ordering::SeqCst) {
43+
info!("[http-trait] Service was already ready");
44+
} else {
45+
info!("[http-trait] Service is now ready");
46+
};
47+
1948
core::task::Poll::Ready(Ok(()))
2049
}
2150

2251
fn call(&mut self, request: Request) -> Self::Future {
2352
self.invoke_count += 1;
2453
info!("[http-trait] Received event {}: {:?}", self.invoke_count, request);
2554
info!("[http-trait] Lambda context: {:?}", request.lambda_context());
55+
56+
// After being called once, the service is no longer ready until polled again.
57+
if self.ready.swap(false, Ordering::SeqCst) {
58+
info!("[http-trait] The service is ready");
59+
} else {
60+
// https://docs.rs/tower/latest/tower/trait.Service.html#backpressure
61+
// https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
62+
// > Services are permitted to panic if `call` is invoked without obtaining
63+
// > `Poll::Ready(Ok(()))` from `poll_ready`.
64+
panic!("[http-trait] The service is not ready; `.poll_ready()` must be called first");
65+
}
66+
2667
Box::pin(ready(Ok(Response::builder()
2768
.status(200)
2869
.body(Body::from("Hello, World!"))

Diff for: lambda-integration-tests/src/bin/runtime-trait.rs

+45-4
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
use lambda_runtime::{Error, LambdaEvent, Service};
2-
use serde::{Deserialize, Serialize};
31
use std::{
42
future::{ready, Future},
53
pin::Pin,
4+
sync::atomic::{AtomicBool, Ordering},
65
};
6+
7+
use lambda_runtime::{Error, LambdaEvent, Service};
8+
use serde::{Deserialize, Serialize};
79
use tracing::info;
810

911
#[derive(Deserialize, Debug)]
@@ -16,9 +18,30 @@ struct Response {
1618
message: String,
1719
}
1820

19-
#[derive(Default)]
2021
struct MyHandler {
2122
invoke_count: usize,
23+
ready: AtomicBool,
24+
}
25+
26+
impl Default for MyHandler {
27+
fn default() -> Self {
28+
Self {
29+
invoke_count: usize::default(),
30+
// New instances are not ready to be called until polled.
31+
ready: false.into(),
32+
}
33+
}
34+
}
35+
36+
impl Clone for MyHandler {
37+
fn clone(&self) -> Self {
38+
Self {
39+
invoke_count: self.invoke_count,
40+
// Cloned instances may not be immediately ready to be called.
41+
// https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services
42+
ready: false.into(),
43+
}
44+
}
2245
}
2346

2447
impl Service<LambdaEvent<Request>> for MyHandler {
@@ -27,12 +50,30 @@ impl Service<LambdaEvent<Request>> for MyHandler {
2750
type Response = Response;
2851

2952
fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
53+
if self.ready.swap(true, Ordering::SeqCst) {
54+
info!("[runtime-trait] Service was already ready");
55+
} else {
56+
info!("[runtime-trait] Service is now ready");
57+
};
58+
3059
core::task::Poll::Ready(Ok(()))
3160
}
3261

3362
fn call(&mut self, request: LambdaEvent<Request>) -> Self::Future {
3463
self.invoke_count += 1;
35-
info!("[handler] Received event {}: {:?}", self.invoke_count, request);
64+
info!("[runtime-trait] Received event {}: {:?}", self.invoke_count, request);
65+
66+
// After being called once, the service is no longer ready until polled again.
67+
if self.ready.swap(false, Ordering::SeqCst) {
68+
info!("[runtime-trait] The service is ready");
69+
} else {
70+
// https://docs.rs/tower/latest/tower/trait.Service.html#backpressure
71+
// https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
72+
// > Services are permitted to panic if `call` is invoked without obtaining
73+
// > `Poll::Ready(Ok(()))` from `poll_ready`.
74+
panic!("[runtime-trait] The service is not ready; `.poll_ready()` must be called first");
75+
}
76+
3677
Box::pin(ready(Ok(Response {
3778
message: request.payload.command.to_uppercase(),
3879
})))

0 commit comments

Comments
 (0)