From d40826c0049def9e0c83d01f3ad3d397ca026c2a Mon Sep 17 00:00:00 2001 From: Pablo Arroyo Loma Date: Mon, 29 Aug 2016 15:17:24 +0200 Subject: [PATCH] Add handler support for SNS, dynamodb and kinesis events --- test_settings.json | 19 +++++++-- test_settings.py | 27 ++++++++++++- tests/tests.py | 98 ++++++++++++++++++++++++++++++++++++++++++++-- zappa/cli.py | 10 +++++ zappa/handler.py | 38 +++++++++++++----- 5 files changed, 173 insertions(+), 19 deletions(-) diff --git a/test_settings.json b/test_settings.json index 4a655151c..54cfc25a1 100644 --- a/test_settings.json +++ b/test_settings.json @@ -11,10 +11,21 @@ "debug": true, "parameter_depth": 2, "prebuild_script": "test_settings.prebuild_me", - "events": [{ - "function": "tests.test_app.schedule_me", - "expression": "rate(1 minute)" - }] + "events": [ + { + "function": "tests.test_app.schedule_me", + "expression": "rate(1 minute)" + }, + { + "function": "test.test_app.method", + "event_source": { + "arn": "arn:aws:sns:::1", + "events": [ + "sns:Publish" + ] + } + } + ] }, "devor": { "s3_bucket": "lmbda", diff --git a/test_settings.py b/test_settings.py index 82b642969..458922b65 100644 --- a/test_settings.py +++ b/test_settings.py @@ -16,16 +16,39 @@ #} # +AWS_EVENT_MAPPING = { + 'arn:aws:s3:1': 'test_settings.aws_s3_event', + 'arn:aws:sns:1': 'test_settings.aws_sns_event', + 'arn:aws:dynamodb:1': 'test_settings.aws_dynamodb_event', + 'arn:aws:kinesis:1': 'test_settings.aws_kinesis_event' +} + ENVIRONMENT_VARIABLES={'testenv': 'envtest'} + def prebuild_me(): print("This is a prebuild script!") + def callback(self): print("this is a callback") -def aws_event(event, contect): - print("AWS EVENT") + +def aws_s3_event(event, content): + return "AWS S3 EVENT" + + +def aws_sns_event(event, content): + return "AWS SNS EVENT" + + +def aws_dynamodb_event(event, content): + return "AWS DYNAMODB EVENT" + + +def aws_kinesis_event(event, content): + return "AWS KINESIS EVENT" + def command(): print("command") diff --git a/tests/tests.py b/tests/tests.py index 133ab82e0..5d6fb1c60 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -397,19 +397,111 @@ def test_handler(self, session): } lh.handler(event, None) - # Test AWS event + # Test AWS S3 event event = { u'account': u'72333333333', u'region': u'us-east-1', u'detail': {}, - u'Records': [{'s3': {'configurationId': 'test_settings.aws_event'}}], + u'Records': [{'s3': {'configurationId': 'test_settings.aws_s3_event'}}], u'source': u'aws.events', u'version': u'0', u'time': u'2016-05-10T21:05:39Z', u'id': u'0d6a6db0-d5e7-4755-93a0-750a8bf49d55', u'resources': [u'arn:aws:events:us-east-1:72333333333:rule/tests.test_app.schedule_me'] } - lh.handler(event, None) + self.assertEqual("AWS S3 EVENT", lh.handler(event, None)) + + # Test AWS SNS event + event = { + u'account': u'72333333333', + u'region': u'us-east-1', + u'detail': {}, + u'Records': [ + { + u'EventVersion': u'1.0', + u'EventSource': u'aws:sns', + u'EventSubscriptionArn': u'arn:aws:sns:EXAMPLE', + u'Sns': { + u'SignatureVersion': u'1', + u'Timestamp': u'1970-01-01T00:00:00.000Z', + u'Signature': u'EXAMPLE', + u'SigningCertUrl': u'EXAMPLE', + u'MessageId': u'95df01b4-ee98-5cb9-9903-4c221d41eb5e', + u'Message': u'Hello from SNS!', + u'Subject': u'TestInvoke', + u'Type': u'Notification', + u'UnsubscribeUrl': u'EXAMPLE', + u'TopicArn': u'arn:aws:sns:1', + u'MessageAttributes': { + u'Test': {u'Type': u'String', u'Value': u'TestString'}, + u'TestBinary': {u'Type': u'Binary', u'Value': u'TestBinary'} + } + } + } + ] + } + self.assertEqual("AWS SNS EVENT", lh.handler(event, None)) + + # Test AWS DynamoDB event + event = { + u'Records': [ + { + u'eventID': u'1', + u'eventVersion': u'1.0', + u'dynamodb': { + u'Keys': {u'Id': {u'N': u'101'}}, + u'NewImage': {u'Message': {u'S': u'New item!'}, u'Id': {u'N': u'101'}}, + u'StreamViewType': u'NEW_AND_OLD_IMAGES', + u'SequenceNumber': u'111', u'SizeBytes': 26 + }, + u'awsRegion': u'us-west-2', + u'eventName': u'INSERT', + u'eventSourceARN': u'arn:aws:dynamodb:1', + u'eventSource': u'aws:dynamodb' + } + ] + } + self.assertEqual("AWS DYNAMODB EVENT", lh.handler(event, None)) + + # Test AWS kinesis event + event = { + u'Records': [ + { + u'eventID': u'shardId-000000000000:49545115243490985018280067714973144582180062593244200961', + u'eventVersion': u'1.0', + u'kinesis': { + u'partitionKey': u'partitionKey-3', + u'data': u'SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=', + u'kinesisSchemaVersion': u'1.0', + u'sequenceNumber': u'49545115243490985018280067714973144582180062593244200961' + }, + u'invokeIdentityArn': u'arn:aws:iam::EXAMPLE', + u'eventName': u'aws:kinesis:record', + u'eventSourceARN': u'arn:aws:kinesis:1', + u'eventSource': u'aws:kinesis', + u'awsRegion': u'us-east-1' + } + ] + } + self.assertEqual("AWS KINESIS EVENT", lh.handler(event, None)) + + # Unhandled event + event = { + u'Records': [ + { + u'eventID': u'shardId-000000000000:49545115243490985018280067714973144582180062593244200961', + u'eventVersion': u'1.0', + u'kinesis': { + u'partitionKey': u'partitionKey-3', + u'data': u'SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=', + u'kinesisSchemaVersion': u'1.0', + u'sequenceNumber': u'49545115243490985018280067714973144582180062593244200961' + }, + u'eventSourceARN': u'bad:arn:1', + } + ] + } + self.assertIsNone(lh.handler(event, None)) ## # CLI diff --git a/zappa/cli.py b/zappa/cli.py index ca5d759d0..7e043552b 100644 --- a/zappa/cli.py +++ b/zappa/cli.py @@ -1065,6 +1065,16 @@ def create_package(self): else: settings_s = settings_s + "DJANGO_SETTINGS=None\n" + # AWS Events function mapping + event_mapping = {} + events = self.stage_config.get('events', []) + for event in events: + arn = event.get('event_source', {}).get('arn') + function = event.get('function') + if arn and function: + event_mapping[arn] = function + settings_s = settings_s + "AWS_EVENT_MAPPING={0!s}\n".format(event_mapping) + # Copy our Django app into root of our package. # It doesn't work otherwise. base = __file__.rsplit(os.sep, 1)[0] diff --git a/zappa/handler.py b/zappa/handler.py index 39c4e9c17..fa4a813a3 100644 --- a/zappa/handler.py +++ b/zappa/handler.py @@ -186,6 +186,26 @@ def update_certificate(self): return + def get_function_for_aws_event(self, record): + """ + Get the associated function to execute for a triggered AWS event + + Support S3, SNS, DynamoDB and kinesis events + """ + if 's3' in record: + return record['s3']['configurationId'] + + arn = None + if 'Sns' in record: + arn = record['Sns'].get('TopicArn') + elif 'dynamodb' in record or 'kinesis' in record: + arn = record.get('eventSourceARN') + + if arn: + return self.settings.AWS_EVENT_MAPPING.get(arn) + + return None + def handler(self, event, context): """ An AWS Lambda function which parses specific API Gateway input into a @@ -254,17 +274,15 @@ def handler(self, event, context): elif event.get('Records', None): records = event.get('Records') - event_types = ['dynamodb', 'kinesis', 's3', 'sns', 'events'] - + result = None for record in records: - for event_type in event_types: - if record.has_key(event_type): - - whole_function = record[event_type]['configurationId'] - app_function = self.import_module_and_get_function(whole_function) - result = self.run_function(app_function, event, context) - print(result) - + whole_function = self.get_function_for_aws_event(record) + if whole_function: + app_function = self.import_module_and_get_function(whole_function) + result = self.run_function(app_function, event, context) + logger.debug(result) + else: + logger.error("Cannot find a function to process the triggered event.") return result try: