Skip to content

Commit 765e24b

Browse files
committed
Rewrite script to avoid dependency on lambda #1564
1 parent 5d11956 commit 765e24b

File tree

3 files changed

+200
-55
lines changed

3 files changed

+200
-55
lines changed
+64-55
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,73 @@
11
import json
2+
import logging
23
import os
34
import requests
45
import re
5-
from base64 import b64decode
6-
from gzip import decompress
7-
from io import StringIO
6+
import sys
87

9-
def decode(event):
10-
event_data = event["awslogs"]["data"]
11-
compressed = b64decode(event_data)
12-
uncompressed = decompress(compressed)
13-
decoded = json.loads(uncompressed)
8+
import boto3
149

15-
return decoded
1610

17-
def communicate(event):
18-
slack_url = os.getenv("SLACK_WEBHOOK")
19-
message_header = {'Content-Type': 'application/json'}
20-
message = {'text': event}
21-
response = requests.post(url=slack_url, data=json.dumps(message), headers=message_header)
22-
if response.status_code != 200:
23-
raise ValueError(
24-
'Request to slack returned an error %s, the response is:\n%s'
25-
% (response.status_code, response.text)
26-
)
11+
class SlackHandler(logging.Handler):
12+
def __init__(self, webhook_url, level=logging.NOTSET):
13+
super().__init__(level)
14+
self.slack_url = webhook_url
15+
16+
def emit(self, record):
17+
message_header = {'Content-Type': 'application/json'}
18+
message = {'text': f"[{record.levelname}] {record.message}"}
19+
response = requests.post(url=self.slack_url, data=json.dumps(message), headers=message_header)
20+
if response.status_code != 200:
21+
raise ValueError(
22+
f"Request to slack returned an error {response.status_code}, the response is:\n{response.text}"
23+
)
24+
25+
26+
def interpret(message):
27+
lower = message.lower()
28+
if "'dateRange': {'start':".lower() in lower:
29+
return (logger.INFO, f"BACKFILL <@U01F70GPXNW>\n{message}")
30+
if "error" in lower:
31+
return (logger.ERROR, f"ERROR TYPE: Parser <@U011A0TFM7X> <@U017KLSPEM7>\n{message}")
32+
if "timed out" in lower:
33+
return (logger.ERROR, f"ERROR TYPE: Time out\n{message}")
34+
return (logger.WARN, message)
2735

28-
def define_errortype(decoded):
29-
max_memory = False
30-
for e in decoded['logEvents']:
31-
if "'dateRange': {'start':".lower() in e["message"].lower():
32-
communicate("BACKFILL <@U01F70GPXNW>")
33-
for e in decoded['logEvents']:
34-
communicate(e["message"])
35-
if "error" in e["message"].lower():
36-
communicate("ERROR TYPE: Parser <@U011A0TFM7X> <@U017KLSPEM7>")
37-
print(e["message"])
38-
communicate(e["message"])
39-
return None
40-
if "filtering cases" in e["message"].lower():
41-
filter_message = e["message"]
42-
if "memory size" in e["message"].lower():
43-
memory_use = re.findall(r'\d* MB', e["message"])
44-
if len(set(memory_use)) == 1:
45-
max_memory = True
46-
if "timed out" in e["message"].lower():
47-
if max_memory == True:
48-
communicate("ERROR TYPE: Time out, max memory reached <@U011A0TFM7X> <@U017KLSPEM7>")
49-
print(e["message"])
50-
communicate(e["message"])
51-
return None
52-
else:
53-
communicate("ERROR TYPE: Time out, max memory NOT reached <@U01F70GPXNW>")
54-
print(filter_message)
55-
communicate(filter_message)
56-
print(e["message"])
57-
communicate(e["message"])
58-
return None
36+
def setup_logger():
37+
logger = logging.getLogger(__name__)
38+
logger.setLevel(logging.DEBUG)
39+
stdoutHandler = logging.StreamHandler(stream=sys.stdout)
40+
stdoutHandler.setLevel(logging.DEBUG)
41+
logger.addHandler(stdoutHandler)
42+
slackHandler = SlackHandler(os.getenv('SLACK_WEBHOOK'), logging.DEBUG)
43+
logger.addHandler(slackHandler)
44+
return logger
5945

60-
def lambda_handler(event, context):
61-
decoded = decode(event)
62-
print(decoded['logGroup'])
63-
communicate(decoded['logGroup'])
64-
define_errortype(decoded)
46+
47+
if __name__ == '__main__':
48+
logger = setup_logger()
49+
logGroup = os.getenv('INGESTION_LOG_GROUP')
50+
logStream = os.getenv('INGESTION_LOG_STREAM')
51+
if logGroup is None or logStream is None:
52+
logger.critical(f"Cannot get messages from log group {logGroup} and stream {logStream}")
53+
sys.exit(1)
54+
logger.info(f"Output from {logGroup}/{logStream}:")
55+
hasMore = True
56+
oldNext = None
57+
logClient = boto3.client('logs')
58+
while hasMore:
59+
response = logClient.get_log_events(
60+
logGroupName=logGroup,
61+
logStreamName=logStream,
62+
startFromHead=True,
63+
nextToken=oldNext
64+
)
65+
newNext = response['nextForwardToken']
66+
if (not newNext) or (newNext == oldNext):
67+
hasMore = False
68+
else:
69+
oldNext = newNext
70+
for message in [e['message'] for e in response['events']]:
71+
(severity, output) = interpret(message)
72+
logger.log(severity, output)
73+
logger.info(f"End of output from {logGroup}/{logStream}")

ingestion/monitoring/poetry.lock

+120
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ingestion/monitoring/pyproject.toml

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[tool.poetry]
2+
name = "monitoring"
3+
version = "0.1.0"
4+
description = "Monitoring functions for Global.health ingestion"
5+
authors = ["Global.health <info@global.health>"]
6+
license = "MIT"
7+
8+
[tool.poetry.dependencies]
9+
python = "^3.9"
10+
boto3 = "^1.21.8"
11+
12+
[tool.poetry.dev-dependencies]
13+
14+
[build-system]
15+
requires = ["poetry-core>=1.0.0"]
16+
build-backend = "poetry.core.masonry.api"

0 commit comments

Comments
 (0)