-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathprocessor.py
55 lines (48 loc) · 1.54 KB
/
processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python
import boto3
import json
import logging
import exceptions
import sys
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
queue = boto3.resource('sqs', region_name='eu-central-1').get_queue_by_name(QueueName="my-vote")
table = boto3.resource('dynamodb', region_name='eu-central-1').Table('Votes')
def process_message(message):
try:
payload = json.loads(message.body)
voter = payload['MessageAttributes']['voter']['Value']
vote = payload['MessageAttributes']['vote']['Value']
logging.info("Voter: %s, Vote: %s", voter, vote)
store_vote(voter, vote)
update_count(vote)
message.delete()
except Exception as e:
logging.error("Failed to process message")
logging.error(str(e))
def store_vote(voter, vote):
try:
response = table.put_item(
Item={'voter': voter, 'vote': vote}
)
except:
logging.error("Failed to store message")
raise
def update_count(vote):
table.update_item(
Key={'voter': 'count'},
UpdateExpression="set #vote = #vote + :incr",
ExpressionAttributeNames={'#vote': vote},
ExpressionAttributeValues={':incr': 1}
)
if __name__ == "__main__":
while True:
try:
messages = queue.receive_messages()
except exceptions.KeyboardInterrupt:
logging.info("Stopping...")
break
except:
logging.error(sys.exc_info()[0])
continue
for message in messages:
process_message(message)