Skip to content

Commit

Permalink
Merge pull request Miserlou#290 from parroyo/master
Browse files Browse the repository at this point in the history
Add handler support for SNS, dynamodb and kinesis events
  • Loading branch information
Rich Jones authored Aug 31, 2016
2 parents b7b692a + d40826c commit 9155caa
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 19 deletions.
19 changes: 15 additions & 4 deletions test_settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,21 @@
"debug": true,
"parameter_depth": 2,
"prebuild_script": "test_settings.prebuild_me",
"events": [{
"function": "tests.test_app.schedule_me",
"expression": "rate(1 minute)"
}]
"events": [
{
"function": "tests.test_app.schedule_me",
"expression": "rate(1 minute)"
},
{
"function": "test.test_app.method",
"event_source": {
"arn": "arn:aws:sns:::1",
"events": [
"sns:Publish"
]
}
}
]
},
"devor": {
"s3_bucket": "lmbda",
Expand Down
27 changes: 25 additions & 2 deletions test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,39 @@
#}
#

AWS_EVENT_MAPPING = {
'arn:aws:s3:1': 'test_settings.aws_s3_event',
'arn:aws:sns:1': 'test_settings.aws_sns_event',
'arn:aws:dynamodb:1': 'test_settings.aws_dynamodb_event',
'arn:aws:kinesis:1': 'test_settings.aws_kinesis_event'
}

ENVIRONMENT_VARIABLES={'testenv': 'envtest'}


def prebuild_me():
print("This is a prebuild script!")


def callback(self):
print("this is a callback")

def aws_event(event, contect):
print("AWS EVENT")

def aws_s3_event(event, content):
return "AWS S3 EVENT"


def aws_sns_event(event, content):
return "AWS SNS EVENT"


def aws_dynamodb_event(event, content):
return "AWS DYNAMODB EVENT"


def aws_kinesis_event(event, content):
return "AWS KINESIS EVENT"


def command():
print("command")
98 changes: 95 additions & 3 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,19 +397,111 @@ def test_handler(self, session):
}
lh.handler(event, None)

# Test AWS event
# Test AWS S3 event
event = {
u'account': u'72333333333',
u'region': u'us-east-1',
u'detail': {},
u'Records': [{'s3': {'configurationId': 'test_settings.aws_event'}}],
u'Records': [{'s3': {'configurationId': 'test_settings.aws_s3_event'}}],
u'source': u'aws.events',
u'version': u'0',
u'time': u'2016-05-10T21:05:39Z',
u'id': u'0d6a6db0-d5e7-4755-93a0-750a8bf49d55',
u'resources': [u'arn:aws:events:us-east-1:72333333333:rule/tests.test_app.schedule_me']
}
lh.handler(event, None)
self.assertEqual("AWS S3 EVENT", lh.handler(event, None))

# Test AWS SNS event
event = {
u'account': u'72333333333',
u'region': u'us-east-1',
u'detail': {},
u'Records': [
{
u'EventVersion': u'1.0',
u'EventSource': u'aws:sns',
u'EventSubscriptionArn': u'arn:aws:sns:EXAMPLE',
u'Sns': {
u'SignatureVersion': u'1',
u'Timestamp': u'1970-01-01T00:00:00.000Z',
u'Signature': u'EXAMPLE',
u'SigningCertUrl': u'EXAMPLE',
u'MessageId': u'95df01b4-ee98-5cb9-9903-4c221d41eb5e',
u'Message': u'Hello from SNS!',
u'Subject': u'TestInvoke',
u'Type': u'Notification',
u'UnsubscribeUrl': u'EXAMPLE',
u'TopicArn': u'arn:aws:sns:1',
u'MessageAttributes': {
u'Test': {u'Type': u'String', u'Value': u'TestString'},
u'TestBinary': {u'Type': u'Binary', u'Value': u'TestBinary'}
}
}
}
]
}
self.assertEqual("AWS SNS EVENT", lh.handler(event, None))

# Test AWS DynamoDB event
event = {
u'Records': [
{
u'eventID': u'1',
u'eventVersion': u'1.0',
u'dynamodb': {
u'Keys': {u'Id': {u'N': u'101'}},
u'NewImage': {u'Message': {u'S': u'New item!'}, u'Id': {u'N': u'101'}},
u'StreamViewType': u'NEW_AND_OLD_IMAGES',
u'SequenceNumber': u'111', u'SizeBytes': 26
},
u'awsRegion': u'us-west-2',
u'eventName': u'INSERT',
u'eventSourceARN': u'arn:aws:dynamodb:1',
u'eventSource': u'aws:dynamodb'
}
]
}
self.assertEqual("AWS DYNAMODB EVENT", lh.handler(event, None))

# Test AWS kinesis event
event = {
u'Records': [
{
u'eventID': u'shardId-000000000000:49545115243490985018280067714973144582180062593244200961',
u'eventVersion': u'1.0',
u'kinesis': {
u'partitionKey': u'partitionKey-3',
u'data': u'SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=',
u'kinesisSchemaVersion': u'1.0',
u'sequenceNumber': u'49545115243490985018280067714973144582180062593244200961'
},
u'invokeIdentityArn': u'arn:aws:iam::EXAMPLE',
u'eventName': u'aws:kinesis:record',
u'eventSourceARN': u'arn:aws:kinesis:1',
u'eventSource': u'aws:kinesis',
u'awsRegion': u'us-east-1'
}
]
}
self.assertEqual("AWS KINESIS EVENT", lh.handler(event, None))

# Unhandled event
event = {
u'Records': [
{
u'eventID': u'shardId-000000000000:49545115243490985018280067714973144582180062593244200961',
u'eventVersion': u'1.0',
u'kinesis': {
u'partitionKey': u'partitionKey-3',
u'data': u'SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=',
u'kinesisSchemaVersion': u'1.0',
u'sequenceNumber': u'49545115243490985018280067714973144582180062593244200961'
},
u'eventSourceARN': u'bad:arn:1',
}
]
}
self.assertIsNone(lh.handler(event, None))

##
# CLI
Expand Down
10 changes: 10 additions & 0 deletions zappa/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,16 @@ def create_package(self):
else:
settings_s = settings_s + "DJANGO_SETTINGS=None\n"

# AWS Events function mapping
event_mapping = {}
events = self.stage_config.get('events', [])
for event in events:
arn = event.get('event_source', {}).get('arn')
function = event.get('function')
if arn and function:
event_mapping[arn] = function
settings_s = settings_s + "AWS_EVENT_MAPPING={0!s}\n".format(event_mapping)

# Copy our Django app into root of our package.
# It doesn't work otherwise.
if self.django_settings:
Expand Down
38 changes: 28 additions & 10 deletions zappa/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,26 @@ def update_certificate(self):

return

def get_function_for_aws_event(self, record):
"""
Get the associated function to execute for a triggered AWS event
Support S3, SNS, DynamoDB and kinesis events
"""
if 's3' in record:
return record['s3']['configurationId']

arn = None
if 'Sns' in record:
arn = record['Sns'].get('TopicArn')
elif 'dynamodb' in record or 'kinesis' in record:
arn = record.get('eventSourceARN')

if arn:
return self.settings.AWS_EVENT_MAPPING.get(arn)

return None

def handler(self, event, context):
"""
An AWS Lambda function which parses specific API Gateway input into a
Expand Down Expand Up @@ -254,17 +274,15 @@ def handler(self, event, context):
elif event.get('Records', None):

records = event.get('Records')
event_types = ['dynamodb', 'kinesis', 's3', 'sns', 'events']

result = None
for record in records:
for event_type in event_types:
if record.has_key(event_type):

whole_function = record[event_type]['configurationId']
app_function = self.import_module_and_get_function(whole_function)
result = self.run_function(app_function, event, context)
print(result)

whole_function = self.get_function_for_aws_event(record)
if whole_function:
app_function = self.import_module_and_get_function(whole_function)
result = self.run_function(app_function, event, context)
logger.debug(result)
else:
logger.error("Cannot find a function to process the triggered event.")
return result

try:
Expand Down

0 comments on commit 9155caa

Please sign in to comment.