-
Notifications
You must be signed in to change notification settings - Fork 408
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: RFC: Validate incoming and outgoing events utility #95
- Loading branch information
Ran Isenberg
committed
Aug 22, 2020
1 parent
81539a0
commit 7da981f
Showing
7 changed files
with
422 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
"""Validation utility | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
from datetime import date, datetime | ||
from typing import Any, Dict, List, Optional | ||
|
||
from pydantic import BaseModel, root_validator | ||
from typing_extensions import Literal | ||
|
||
|
||
class DynamoScheme(BaseModel): | ||
ApproximateCreationDateTime: date | ||
Keys: Dict[Literal["id"], Dict[Literal["S"], str]] | ||
NewImage: Optional[Dict[str, Any]] = {} | ||
OldImage: Optional[Dict[str, Any]] = {} | ||
SequenceNumber: str | ||
SizeBytes: int | ||
StreamViewType: Literal["NEW_AND_OLD_IMAGES", "KEYS_ONLY", "NEW_IMAGE", "OLD_IMAGE"] | ||
|
||
@root_validator | ||
def check_one_image_exists(cls, values): | ||
newimg, oldimg = values.get("NewImage"), values.get("OldImage") | ||
stream_type = values.get("StreamViewType") | ||
if stream_type == "NEW_AND_OLD_IMAGES" and not newimg and not oldimg: | ||
raise TypeError("DynamoDB streams schema failed validation, missing both new & old stream images") | ||
return values | ||
|
||
|
||
class DynamoRecordSchema(BaseModel): | ||
eventID: str | ||
eventName: Literal["INSERT", "MODIFY", "REMOVE"] | ||
eventVersion: float | ||
eventSource: Literal["aws:dynamodb"] | ||
awsRegion: str | ||
eventSourceARN: str | ||
dynamodb: DynamoScheme | ||
|
||
|
||
class DynamoDBSchema(BaseModel): | ||
Records: List[DynamoRecordSchema] | ||
|
||
|
||
class EventBridgeSchema(BaseModel): | ||
version: str | ||
id: str # noqa: A003,VNE003 | ||
source: str | ||
account: int | ||
time: datetime | ||
region: str | ||
resources: List[str] | ||
detail: Dict[str, Any] | ||
|
||
|
||
class SqsSchema(BaseModel): | ||
todo: str | ||
|
||
|
||
class SnsSchema(BaseModel): | ||
todo: str |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
import logging | ||
from abc import ABC, abstractmethod | ||
from typing import Any, Callable, Dict | ||
|
||
from pydantic import BaseModel, ValidationError | ||
|
||
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator | ||
from aws_lambda_powertools.validation.schemas import DynamoDBSchema, EventBridgeSchema | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class Envelope(ABC): | ||
def _parse_user_dict_schema(self, user_event: Dict[str, Any], inbound_schema_model: BaseModel) -> Any: | ||
logger.debug("parsing user dictionary schema") | ||
try: | ||
return inbound_schema_model(**user_event) | ||
except (ValidationError, TypeError): | ||
logger.exception("Valdation exception while extracting user custom schema") | ||
raise | ||
|
||
def _parse_user_json_string_schema(self, user_event: str, inbound_schema_model: BaseModel) -> Any: | ||
logger.debug("parsing user dictionary schema") | ||
try: | ||
return inbound_schema_model.parse_raw(user_event) | ||
except (ValidationError, TypeError): | ||
logger.exception("Valdation exception while extracting user custom schema") | ||
raise | ||
|
||
@abstractmethod | ||
def parse(self, event: Dict[str, Any], inbound_schema_model: BaseModel) -> Any: | ||
return NotImplemented | ||
|
||
|
||
class UserEnvelope(Envelope): | ||
def parse(self, event: Dict[str, Any], inbound_schema_model: BaseModel) -> Any: | ||
try: | ||
return inbound_schema_model(**event) | ||
except (ValidationError, TypeError): | ||
logger.exception("Valdation exception received from input user custom envelope event") | ||
raise | ||
|
||
|
||
class EventBridgeEnvelope(Envelope): | ||
def parse(self, event: Dict[str, Any], inbound_schema_model: BaseModel) -> Any: | ||
try: | ||
parsed_envelope = EventBridgeSchema(**event) | ||
except (ValidationError, TypeError): | ||
logger.exception("Valdation exception received from input eventbridge event") | ||
raise | ||
return self._parse_user_dict_schema(parsed_envelope.detail, inbound_schema_model) | ||
|
||
|
||
class DynamoDBEnvelope(Envelope): | ||
def parse(self, event: Dict[str, Any], inbound_schema_model: BaseModel) -> Any: | ||
try: | ||
parsed_envelope = DynamoDBSchema(**event) | ||
except (ValidationError, TypeError): | ||
logger.exception("Valdation exception received from input dynamodb stream event") | ||
raise | ||
output = [] | ||
for record in parsed_envelope.Records: | ||
parsed_new_image = ( | ||
{} | ||
if not record.dynamodb.NewImage | ||
else self._parse_user_dict_schema(record.dynamodb.NewImage, inbound_schema_model) | ||
) # noqa: E501 | ||
parsed_old_image = ( | ||
{} | ||
if not record.dynamodb.OldImage | ||
else self._parse_user_dict_schema(record.dynamodb.OldImage, inbound_schema_model) | ||
) # noqa: E501 | ||
output.append({"new": parsed_new_image, "old": parsed_old_image}) | ||
return output | ||
|
||
|
||
@lambda_handler_decorator | ||
def validator( | ||
handler: Callable[[Dict, Any], Any], | ||
event: Dict[str, Any], | ||
context: Dict[str, Any], | ||
inbound_schema_model: BaseModel, | ||
outbound_schema_model: BaseModel, | ||
envelope: Envelope, | ||
) -> Any: | ||
"""Decorator to create validation for lambda handlers events - both inbound and outbound | ||
As Lambda follows (event, context) signature we can remove some of the boilerplate | ||
and also capture any exception any Lambda function throws or its response as metadata | ||
Example | ||
------- | ||
**Lambda function using validation decorator** | ||
@validator(inbound=inbound_schema_model, outbound=outbound_schema_model) | ||
def handler(parsed_event_model, context): | ||
... | ||
Parameters | ||
---------- | ||
todo add | ||
Raises | ||
------ | ||
err | ||
TypeError or pydantic.ValidationError or any exception raised by the lambda handler itself | ||
""" | ||
lambda_handler_name = handler.__name__ | ||
logger.debug("Validating inbound schema") | ||
parsed_event_model = envelope.parse(event, inbound_schema_model) | ||
try: | ||
logger.debug(f"Calling handler {lambda_handler_name}") | ||
response = handler({"orig": event, "custom": parsed_event_model}, context) | ||
logger.debug("Received lambda handler response successfully") | ||
logger.debug(response) | ||
except Exception: | ||
logger.exception(f"Exception received from {lambda_handler_name}") | ||
raise | ||
|
||
try: | ||
logger.debug("Validating outbound response schema") | ||
outbound_schema_model(**response) | ||
except (ValidationError, TypeError): | ||
logger.exception(f"Validation exception received from {lambda_handler_name} response event") | ||
raise | ||
return response |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.