Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] A layering system for the runtime #747

Closed
calavera opened this issue Dec 4, 2023 · 2 comments
Closed

[RFC] A layering system for the runtime #747

calavera opened this issue Dec 4, 2023 · 2 comments

Comments

@calavera
Copy link
Contributor

calavera commented Dec 4, 2023

The Rust Runtime for Lambda implements Lambda's interface for custom runtimes. Internally, the runtime uses a polling strategy to receive invoke events as JSON data from Lambda. Then, it transforms those JSON events into typed structures that are processed by the Rust code that our users implement.

The logic that polls for events, transforms them into structures, and calls the user defined call is all encapsulated in the Runtime::run function. This high coupling makes it very hard to extend the runtime, or even give people some extra control over what the runtime does. People need to copy parts of the runtime when they want to extend it, like we initially did with the streaming support. This extra overhead shows that the runtime lacks extensibility and control that some users need.

I'd like to propose to break the Runtime into Tower services, and add a layer system that allows users to wrap the Runtime's internals with user defined logic. The following example shows how users would define a layer and instruct the runtime to wrap our core logic around:

use lambda_runtime::{Error as LambdaError, Invocation, Runtime};
use futures_util::future::BoxFuture;
use tower::{Service, Layer};
use std::task::{Context, Poll};

#[derive(Clone)]
struct LambdaTelemetryLayer;

impl<S> Layer<S> for LambdaTelemetryLayer {
    type Service = LambdaTelemetryMiddleware<S>;

    fn layer(&self, inner: S) -> Self::Service {
        LambdaTelemetryMiddleware { inner }
    }
}

#[derive(Clone)]
struct LambdaTelemetryMiddleware<S> {
    inner: S,
}

impl<S> Service<Invocation> for LambdaTelemetryMiddleware<S>
where
    S: Service<Invocation, Response = ()> + Send + 'static,
    S::Future: Send + 'static,
    S::Error: LambdaError,
{
    type Response = ();
    type Error = S::Error;
    // `BoxFuture` is a type alias for `Pin<Box<dyn Future + Send + 'a>>`
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, invoke: Invocation) -> Self::Future {
        let future = self.inner.call(invoke);

        // Flush metrics here.

        Box::pin(async move {
            future.await
        })
    }
}

async fn my_handler(_event: LambdaEvent<serde_json::Value>) -> Result<(), Error> {
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    let func = service_fn(my_handler);

    Runtime::initialize(func)
        .layer(LambdaTelemetryLayer)
        .run()
        .await
}

If you're familiar with Axum, and Tower's middleware system, you'll notice that this is pretty much a copy of that implementation.

In the previous example, we're using a new type Invocation to pass the raw information received from Lambda. This type could be an alias of the http::Response that we receive, but I think it'd be more clear if it's an internally defined type:

struct Invocation {
    // The http Response Parts received from Lambda
    pub parts: http::response::Parts,
    // The http Response body received from Lambda
    pub body: hyper::Body,
}

This new type could include utility functions to extract some of the context information that we provide to the current lambda functions, before the function gets invoked, for example the Context.

Implementation details

To implement this layering system, we'd make the runtime implement Tower's Service trait to process the invocation from Lambda. This new service would carry the implementation that you can see in this block of code in the current Runtime.

Because we're implementing the Service trait, we could also cleanup some of that code:
We would not need to check if the handler is ready manually. That would be the responsibility of the Service::poll_ready function.
The panic handling logic could be removed and provided by an additional internal layer.

The current Runtime::run function would be reduced to something like this:

impl<C> Runtime<C>
where ... {
    async fn run(&self) -> Result<(), Error> {
        let client = &self.client;
        let stream = incoming(client);

        tokio::pin!(stream);
        while let Some(next_event_response) = stream.next().await {
            trace!("New event arrived (run loop)");
            let event = next_event_response?;
            let (parts, body) = event.into_parts();

            let invoke = Invocation { parts, body };

            self.handler.call(invoke).await?;
        }
        Ok(())
    }
}

And the Service implementation would be isolated to something like this:

impl<T> Service<Invocation> for Handler<T>
where
    T: for<'de> Deserialize<'de> + Send,
{
    type Response = ();

    type Error = Error;

    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

    #[inline]
    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
        self.fn_handler.poll_ready(cx)
    }

    fn call(&mut self, invoke: Invocation) -> Self::Future {
        #[cfg(debug_assertions)]
        if &invoke.parts.status == &http::StatusCode::NO_CONTENT {
            // Ignore the event if the status code is 204.
            // This is a way to keep the runtime alive when
            // there are no events pending to be processed.
            return Box::pin(future::ready(Ok(())));
        }

        let request_id = invoke.request_id.to_string();

        let (parts, body, ctx) = invoke.into_parts();
        let request_span = ctx.request_span();

        let fut = async move {
            let body = hyper::body::to_bytes(body).await?;
            trace!("response body - {}", std::str::from_utf8(&body)?);

            #[cfg(debug_assertions)]
            if parts.status.is_server_error() {
                error!("Lambda Runtime server returned an unexpected error");
                return Err(parts.status.to_string().into());
            }

            let lambda_event = match deserializer::deserialize(&body, ctx) {
                Ok(lambda_event) => lambda_event,
                Err(err) => {
                    let req = build_event_error_request(&request_id, err)?;
                    self.client
                        .call(req)
                        .await
                        .expect("Unable to send response to Runtime APIs");
                    return Ok(());
                }
            };

            let response = self.fn_handler.call(lambda_event).await;

            let req = match response {
                Ok(response) => {
                    trace!("Ok response from handler (run loop)");
                    EventCompletionRequest {
                        request_id: &request_id,
                        body: response,
                        _unused_b: PhantomData,
                        _unused_s: PhantomData,
                    }
                    .into_req()
                }
                Err(err) => build_event_error_request(&request_id, err),
            }?;

            self.client
                .call(req)
                .await
                .expect("Unable to send response to Runtime APIs");
            Ok(())
        }
        .instrument(request_span);
        Box::pin(fut)
    }
}

With that separation of responsibilities, the Runtime could implement a layering mechanism using Tower's Layer traits:

impl<C> Runtime<C> {
    pub fn layer<L>(self, layer: L) -> Runtime<S>
    where
        L: Layer<Handler> + Clone + Send + 'static,
        L::Service: Service<Invocation> + Clone + Send + 'static,
        <L::Service as Service<Invocation>>::Response: Send + 'static,
        <L::Service as Service<Invocation>>::Error: Into<Error> + 'static,
        <L::Service as Service<Invocation>>::Future: Send + 'static,
    {
        Runtime {
            client: self.client,
            config: self.config,
            handler: layer.layer(self.handler),
        }
    }
}

The new service handler would be initialized with the Runtime::initialize function, and it'd be the code that ends up calling the user provided function handler:

impl<T> Runtime<T> {
    pub fn initialize<D, F, R>(fn_handler: F) -> Self
    where
        F: Service<LambdaEvent<T>, Error = BoxError> + Send + 'static,
        F::Response: IntoFunctionResponse + 'static,
        F::Future: Future<Output = Result<F::Response, F::Error>> + Send + 'static,
        F::Error: FunctionResponseError,
        T: for<'de> Deserialize<'de>,
    {
        let config = Config::from_env()?;
        let client = Client::builder().build().expect("Unable to create a runtime client");

        Runtime {
            config,
            client,
            handler: Handler { fn_handler },
        }
    }
}

Caveats

I've tried to implement this and I've bumped into a lot of problems with our type system. It probably requires more knowledge about Tower than I currently have. It's not as trivial as taking the code that I drafted above and putting it in the project, hence this RFC.

Summary

By implementing this new layering system, the Rust runtime for Lambda would be more flexible and easy to extend. It could help users share layers easily because they could use the already established Tower's layering ecosystem. Issues like #691 would be fairly easy to implement and share with the community.

@calavera
Copy link
Contributor Author

Implemented in #845

Copy link

This issue is now closed. Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant