Skip to content

Commit

Permalink
Improve formatting of messages #1564
Browse files Browse the repository at this point in the history
Handle next token correctly

Handle slack rate limiting us
  • Loading branch information
iamleeg committed Mar 2, 2022
1 parent 3311ffc commit 98e138a
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 12 deletions.
43 changes: 32 additions & 11 deletions ingestion/monitoring/errorLogsToSlack.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import requests
import re
import sys
from time import sleep

import boto3

Expand All @@ -17,32 +18,45 @@ def emit(self, record):
message_header = {'Content-Type': 'application/json'}
message = {'text': f"[{record.levelname}] {record.message}"}
response = requests.post(url=self.slack_url, data=json.dumps(message), headers=message_header)
if response.status_code != 200:
if response.status_code == 429 and response['error'] == 'rate_limited':
sleep(response['retry_after'])
elif response.status_code != 200:
raise ValueError(
f"Request to slack returned an error {response.status_code}, the response is:\n{response.text}"
)


def interpret(message):
graham = "<@U011A0TFM7X>"
abhishek = "<@U01F70FAJ6N>"
jim = "<@U01TAHDR4F7>"
engineers = f"{graham} {abhishek} {jim}"
lower = message.lower()
if "'dateRange': {'start':".lower() in lower:
return (logger.INFO, f"BACKFILL <@U01F70GPXNW>\n{message}")
return (logging.INFO, f"BACKFILL INITIATED\n{message}")
if "error" in lower:
return (logger.ERROR, f"ERROR TYPE: Parser <@U011A0TFM7X> <@U017KLSPEM7>\n{message}")
return (logging.ERROR, f"PARSER ERROR: {engineers}\n{message}")
if "timed out" in lower:
return (logger.ERROR, f"ERROR TYPE: Time out\n{message}")
return (logger.WARN, message)
return (logging.ERROR, f"TIME OUT: {engineers}\n{message}")
if lower.startswith('info:'):
return (logging.INFO, message)
return (logging.WARN, message)

def setup_logger():
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.setLevel(logging.WARN)
stdoutHandler = logging.StreamHandler(stream=sys.stdout)
stdoutHandler.setLevel(logging.DEBUG)
logger.addHandler(stdoutHandler)
slackHandler = SlackHandler(os.getenv('SLACK_WEBHOOK'), logging.DEBUG)
logger.addHandler(slackHandler)
return logger

def log_messages(cloudwatch_response, logger):
for message in [e['message'] for e in cloudwatch_response['events']]:
(severity, output) = interpret(message)
logger.log(severity, output)


if __name__ == '__main__':
logger = setup_logger()
Expand All @@ -52,22 +66,29 @@ def setup_logger():
logger.critical(f"Cannot get messages from log group {logGroup} and stream {logStream}")
sys.exit(1)
logger.info(f"Output from {logGroup}/{logStream}:")
hasMore = True
oldNext = None
hasMore = False
oldNext = ''
logClient = boto3.client('logs')
response = logClient.get_log_events(
logGroupName=logGroup,
logStreamName=logStream,
startFromHead=True
)
log_messages(response, logger)
oldNext = response['nextForwardToken']
if oldNext and len(oldNext) > 0:
hasMore = true
while hasMore:
response = logClient.get_log_events(
logGroupName=logGroup,
logStreamName=logStream,
startFromHead=True,
nextToken=oldNext
)
log_messages(response, logger)
newNext = response['nextForwardToken']
if (not newNext) or (newNext == oldNext):
hasMore = False
else:
oldNext = newNext
for message in [e['message'] for e in response['events']]:
(severity, output) = interpret(message)
logger.log(severity, output)
logger.info(f"End of output from {logGroup}/{logStream}")
63 changes: 62 additions & 1 deletion ingestion/monitoring/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ingestion/monitoring/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license = "MIT"
[tool.poetry.dependencies]
python = "^3.9"
boto3 = "^1.21.8"
requests = "^2.27.1"

[tool.poetry.dev-dependencies]

Expand Down

0 comments on commit 98e138a

Please sign in to comment.