-
Notifications
You must be signed in to change notification settings - Fork 31
/
handler.py
58 lines (49 loc) · 1.61 KB
/
handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
""" Kinesis Firehose Processing Logic """
from __future__ import print_function
import json
from base64 import b64decode, b64encode
STATUS_OK = 'Ok'
STATUS_DROPPED = 'Dropped'
STATUS_FAIL = 'ProcessingFailed'
class DroppedRecordException(Exception):
""" Raise this if a record needs to be skipped/dropped """
def lambda_handler(event, context):
""" Lambda entrypoint """
print("Event: %s" % event)
return {
'records': map(process_record, event['records']),
}
def process_record(record):
"""
Invoked once for each record (raw base64-encoded data).
Note: each processed record should contain a 'result' field
with the corresponding status (ok, fail, dropped)
"""
print("Processing record: %s" % record)
data = json.loads(b64decode(record['data']))
try:
# eventually manipulate record
new_data = transform_data(data)
# re-encode and add newline (for Athena)
record['data'] = b64encode(json.dumps(new_data) + "\n")
except DroppedRecordException:
record['result'] = STATUS_DROPPED
except Exception:
record['result'] = STATUS_FAIL
else:
record['result'] = STATUS_OK
return record
def transform_data(data):
"""
Invoked once for each record.
Input: decoded data
Output: manipulated data
"""
print("Processing data: %s" % data)
# example: you can skip records
# if 'nsfw' in data:
# raise DroppedRecordException()
# example: you can add new fields
# data['new_value'] = True
# return the transformed data dictionary
return data