-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat: add lambda streaming support for remote invoke #5307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7da252f
1d98a0a
1fbbfd8
fac2a92
c97c6d5
73af887
bd05b4c
c51e08c
d8c11f6
9991b9c
a27b0fb
5f739ef
d99d0ee
d3bbcbc
976794f
da342d3
a375ab0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,9 +4,11 @@ | |
| import base64 | ||
| import json | ||
| import logging | ||
| from abc import ABC, abstractmethod | ||
| from json import JSONDecodeError | ||
| from typing import Any, Dict, cast | ||
| from typing import Any, cast | ||
|
|
||
| from botocore.eventstream import EventStream | ||
| from botocore.exceptions import ClientError, ParamValidationError | ||
| from botocore.response import StreamingBody | ||
|
|
||
|
|
@@ -26,12 +28,19 @@ | |
| LOG = logging.getLogger(__name__) | ||
| FUNCTION_NAME = "FunctionName" | ||
| PAYLOAD = "Payload" | ||
| EVENT_STREAM = "EventStream" | ||
| PAYLOAD_CHUNK = "PayloadChunk" | ||
| INVOKE_COMPLETE = "InvokeComplete" | ||
| LOG_RESULT = "LogResult" | ||
|
|
||
| INVOKE_MODE = "InvokeMode" | ||
| RESPONSE_STREAM = "RESPONSE_STREAM" | ||
|
|
||
| class LambdaInvokeExecutor(BotoActionExecutor): | ||
|
|
||
| class AbstractLambdaInvokeExecutor(BotoActionExecutor, ABC): | ||
| """ | ||
| Calls "invoke" method of "lambda" service with given input. | ||
| If a file location provided, the file handle will be passed as Payload object | ||
| Abstract class for different lambda invocation executors, see implementation for details. | ||
| For Payload parameter, if a file location provided, the file handle will be passed as Payload object | ||
| """ | ||
|
|
||
| _lambda_client: Any | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably out of scope of this PR - I'm not a big fan of using
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've tried setting its type to
We will have a refactoring task next week, we will try to re-introduce typing there. |
||
|
|
@@ -59,14 +68,9 @@ def validate_action_parameters(self, parameters: dict) -> None: | |
| def _execute_action(self, payload: str): | ||
| self.request_parameters[FUNCTION_NAME] = self._function_name | ||
| self.request_parameters[PAYLOAD] = payload | ||
| LOG.debug( | ||
| "Calling lambda_client.invoke with FunctionName:%s, Payload:%s, parameters:%s", | ||
| self._function_name, | ||
| payload, | ||
| self.request_parameters, | ||
| ) | ||
|
|
||
| try: | ||
| response = self._lambda_client.invoke(**self.request_parameters) | ||
| return self._execute_lambda_invoke(payload) | ||
| except ParamValidationError as param_val_ex: | ||
| raise InvalidResourceBotoParameterException( | ||
| f"Invalid parameter key provided." | ||
|
|
@@ -80,7 +84,40 @@ def _execute_action(self, payload: str): | |
| elif boto_utils.get_client_error_code(client_ex) == "InvalidRequestContentException": | ||
| raise InvalidResourceBotoParameterException(client_ex) from client_ex | ||
| raise ErrorBotoApiCallException(client_ex) from client_ex | ||
| return response | ||
|
|
||
| @abstractmethod | ||
| def _execute_lambda_invoke(self, payload: str): | ||
| pass | ||
|
|
||
|
|
||
| class LambdaInvokeExecutor(AbstractLambdaInvokeExecutor): | ||
| """ | ||
| Calls "invoke" method of "lambda" service with given input. | ||
| """ | ||
|
|
||
| def _execute_lambda_invoke(self, payload: str) -> dict: | ||
| LOG.debug( | ||
| "Calling lambda_client.invoke with FunctionName:%s, Payload:%s, parameters:%s", | ||
| self._function_name, | ||
| payload, | ||
| self.request_parameters, | ||
| ) | ||
| return cast(dict, self._lambda_client.invoke(**self.request_parameters)) | ||
mndeveci marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| class LambdaInvokeWithResponseStreamExecutor(AbstractLambdaInvokeExecutor): | ||
| """ | ||
| Calls "invoke_with_response_stream" method of "lambda" service with given input. | ||
| """ | ||
|
|
||
| def _execute_lambda_invoke(self, payload: str) -> dict: | ||
| LOG.debug( | ||
| "Calling lambda_client.invoke_with_response_stream with FunctionName:%s, Payload:%s, parameters:%s", | ||
| self._function_name, | ||
| payload, | ||
| self.request_parameters, | ||
| ) | ||
| return cast(dict, self._lambda_client.invoke_with_response_stream(**self.request_parameters)) | ||
mndeveci marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| class DefaultConvertToJSON(RemoteInvokeRequestResponseMapper): | ||
|
|
@@ -124,6 +161,31 @@ def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExe | |
| return remote_invoke_input | ||
|
|
||
|
|
||
| class LambdaStreamResponseConverter(RemoteInvokeRequestResponseMapper): | ||
| """ | ||
| This class helps to convert response from lambda invoke_with_response_stream API call. | ||
| That API call returns 'EventStream' which yields 'PayloadChunk's and 'InvokeComplete' as they become available. | ||
| This mapper, gets all 'PayloadChunk's and 'InvokeComplete' events and decodes them for next mapper. | ||
| """ | ||
|
|
||
| def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExecutionInfo: | ||
| LOG.debug("Mapping Lambda response to string object") | ||
| if not isinstance(remote_invoke_input.response, dict): | ||
| raise InvalideBotoResponseException("Invalid response type received from Lambda service, expecting dict") | ||
|
|
||
| event_stream: EventStream = remote_invoke_input.response.get(EVENT_STREAM, []) | ||
mndeveci marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| decoded_event_stream = [] | ||
| for event in event_stream: | ||
| if PAYLOAD_CHUNK in event: | ||
| decoded_payload_chunk = event.get(PAYLOAD_CHUNK).get(PAYLOAD).decode("utf-8") | ||
| decoded_event_stream.append({PAYLOAD_CHUNK: {PAYLOAD: decoded_payload_chunk}}) | ||
| if INVOKE_COMPLETE in event: | ||
| log_output = event.get(INVOKE_COMPLETE).get(LOG_RESULT, b"") | ||
| decoded_event_stream.append({INVOKE_COMPLETE: {LOG_RESULT: log_output}}) | ||
| remote_invoke_input.response[EVENT_STREAM] = decoded_event_stream | ||
| return remote_invoke_input | ||
|
|
||
|
|
||
| class LambdaResponseOutputFormatter(RemoteInvokeRequestResponseMapper): | ||
| """ | ||
| This class helps to format output response for lambda service that will be printed on the CLI. | ||
|
|
@@ -139,8 +201,8 @@ def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExe | |
| """ | ||
| if remote_invoke_input.output_format == RemoteInvokeOutputFormat.DEFAULT: | ||
| LOG.debug("Formatting Lambda output response") | ||
| boto_response = cast(Dict, remote_invoke_input.response) | ||
| log_field = boto_response.get("LogResult") | ||
| boto_response = cast(dict, remote_invoke_input.response) | ||
| log_field = boto_response.get(LOG_RESULT) | ||
| if log_field: | ||
| log_result = base64.b64decode(log_field).decode("utf-8") | ||
| remote_invoke_input.log_output = log_result | ||
|
|
@@ -152,3 +214,45 @@ def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExe | |
| remote_invoke_input.response = boto_response.get(PAYLOAD) | ||
|
|
||
| return remote_invoke_input | ||
|
|
||
|
|
||
| class LambdaStreamResponseOutputFormatter(RemoteInvokeRequestResponseMapper): | ||
| """ | ||
| This class helps to format streaming output response for lambda service that will be printed on the CLI. | ||
| It loops through EventStream elements and adds them to response, and once InvokeComplete is reached, it updates | ||
| log_output and response objects in remote_invoke_input. | ||
| """ | ||
|
|
||
| def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExecutionInfo: | ||
| """ | ||
| Maps the lambda response output to the type of output format specified as user input. | ||
| If output_format is original-boto-response, write the original boto API response | ||
| to stdout. | ||
| """ | ||
| if remote_invoke_input.output_format == RemoteInvokeOutputFormat.DEFAULT: | ||
| LOG.debug("Formatting Lambda output response") | ||
| boto_response = cast(dict, remote_invoke_input.response) | ||
| combined_response = "" | ||
| for event in boto_response.get(EVENT_STREAM, []): | ||
| if PAYLOAD_CHUNK in event: | ||
| payload_chunk = event.get(PAYLOAD_CHUNK).get(PAYLOAD) | ||
| combined_response = f"{combined_response}{payload_chunk}" | ||
| if INVOKE_COMPLETE in event: | ||
| log_result = base64.b64decode(event.get(INVOKE_COMPLETE).get(LOG_RESULT)).decode("utf-8") | ||
| remote_invoke_input.log_output = log_result | ||
| remote_invoke_input.response = combined_response | ||
| return remote_invoke_input | ||
|
|
||
|
|
||
| def _is_function_invoke_mode_response_stream(lambda_client: Any, function_name: str): | ||
| """ | ||
| Returns True if given function has RESPONSE_STREAM as InvokeMode, False otherwise | ||
| """ | ||
| try: | ||
| function_url_config = lambda_client.get_function_url_config(FunctionName=function_name) | ||
| function_invoke_mode = function_url_config.get(INVOKE_MODE) | ||
| LOG.debug("InvokeMode of function %s: %s", function_name, function_invoke_mode) | ||
| return function_invoke_mode == RESPONSE_STREAM | ||
| except ClientError as ex: | ||
| LOG.debug("Function %s, doesn't have Function URL configured, using regular invoke", function_name, exc_info=ex) | ||
| return False | ||
Uh oh!
There was an error while loading. Please reload this page.