diff --git a/.github/workflows/ingestion-error-reporter-deploy.yml b/.github/workflows/ingestion-error-reporter-deploy.yml deleted file mode 100644 index 680d5e5e3..000000000 --- a/.github/workflows/ingestion-error-reporter-deploy.yml +++ /dev/null @@ -1,62 +0,0 @@ -name: Ingestion error reporter deploy - -on: - push: - branches: [main, '*-stable'] - paths: - - '.github/workflows/ingestion-error-reporter-deploy.yml' - - 'ingestion/monitoring/errorLogsToSlack.py' - - 'ingestion/monitoring/pyproject.toml' - - 'ingestion/monitoring/poetry.lock' - # Build whenever a new tag is created. - tags: - - "*" - workflow_dispatch: - branches: [main, '*-stable'] - paths: - - '.github/workflows/ingestion-error-reporter-deploy.yml' - - 'ingestion/monitoring/errorLogsToSlack.py' - - 'ingestion/monitoring/pyproject.toml' - - 'ingestion/monitoring/poetry.lock' - -jobs: - deploy: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v1 - with: - aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} - aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - aws-region: eu-central-1 - - - name: Login to Amazon ECR - id: login-ecr - uses: aws-actions/amazon-ecr-login@v1 - - - name: Build, tag, and push image to Amazon ECR (latest) - if: ${{ github.ref == 'refs/heads/main' }} - working-directory: ingestion/monitoring - env: - ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} - ECR_REPOSITORY: gdh-ingestor-error-reporter - IMAGE_TAG: ${{ github.sha }} - run: | - docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -t $ECR_REGISTRY/$ECR_REPOSITORY -f Dockerfile . - docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG - docker push $ECR_REGISTRY/$ECR_REPOSITORY:latest - - - name: Build, tag, and push image to Amazon ECR (stable) - if: ${{ endsWith(github.ref, '-stable') }} - working-directory: ingestion/monitoring - env: - ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} - ECR_REPOSITORY: gdh-ingestor-error-monitor - IMAGE_TAG: ${{ github.sha }} - SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_INGESTION_LOGS }} - AWS_REGION: eu-central-1 - run: | - docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -t $ECR_REGISTRY/$ECR_REPOSITORY:stable -f Dockerfile . - docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG - docker push $ECR_REGISTRY/$ECR_REPOSITORY:stable diff --git a/.github/workflows/ingestion-functions-deploy.yml b/.github/workflows/ingestion-functions-deploy.yml index 223d74b98..0aff8961e 100644 --- a/.github/workflows/ingestion-functions-deploy.yml +++ b/.github/workflows/ingestion-functions-deploy.yml @@ -41,6 +41,7 @@ jobs: ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} ECR_REPOSITORY: gdh-ingestor IMAGE_TAG: ${{ github.sha }} + SLACK_LOGS_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_INGESTION_LOGS }} run: | docker build --build-arg NOTIFY_WEBHOOK_URL -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -t $ECR_REGISTRY/$ECR_REPOSITORY . docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG @@ -54,6 +55,7 @@ jobs: ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} ECR_REPOSITORY: gdh-ingestor IMAGE_TAG: ${{ github.sha }} + SLACK_LOGS_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_INGESTION_LOGS_TEST }} run: | docker build --build-arg NOTIFY_WEBHOOK_URL -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -t $ECR_REGISTRY/$ECR_REPOSITORY:stable . docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG diff --git a/ingestion/functions/common/ingestion_logging.py b/ingestion/functions/common/ingestion_logging.py index ec0ad6590..87dd03b59 100644 --- a/ingestion/functions/common/ingestion_logging.py +++ b/ingestion/functions/common/ingestion_logging.py @@ -1,8 +1,26 @@ import logging +import os +import requests import sys handlers = set() +class SlackHandler(logging.Handler): + def __init__(self, webhook_url, level=logging.NOTSET): + super().__init__(level) + self.slack_url = webhook_url + + def emit(self, record): + message_header = {'Content-Type': 'application/json'} + message = {'text': f"[{record.levelname}] {record.message}"} + response = requests.post(url=self.slack_url, data=json.dumps(message), headers=message_header) + if response.status_code == 429 and response['error'] == 'rate_limited': + sleep(response['retry_after']) + elif response.status_code != 200: + raise ValueError( + f"Request to slack returned an error {response.status_code}, the response is:\n{response.text}" + ) + def getLogger(module_name): logger = logging.getLogger(module_name) logger.setLevel(logging.DEBUG) @@ -11,6 +29,10 @@ def getLogger(module_name): handler.setLevel(logging.DEBUG) logger.addHandler(handler) handlers.add(handler) + if slack_webhook := os.getenv('SLACK_LOGS_WEBHOOK'): + slackHandler = SlackHandler(slack_webhook, logging.WARNING) + logger.addHandler(slackHandler) + handlers.add(slackHandler) return logger def flushAll(): diff --git a/ingestion/monitoring/Dockerfile b/ingestion/monitoring/Dockerfile deleted file mode 100644 index 19cf86cb4..000000000 --- a/ingestion/monitoring/Dockerfile +++ /dev/null @@ -1,74 +0,0 @@ -# `python-base` sets up all our shared environment variables -FROM python:3.10-slim as python-base - -ENV PYTHONUNBUFFERED=1 \ - # prevents python creating .pyc files - PYTHONDONTWRITEBYTECODE=1 \ - \ - PIP_NO_CACHE_DIR=off \ - PIP_DISABLE_PIP_VERSION_CHECK=on \ - PIP_DEFAULT_TIMEOUT=100 \ - \ - # https://python-poetry.org/docs/configuration/#using-environment-variables - POETRY_VERSION=1.1.5 \ - # make poetry install to this location - POETRY_HOME="/opt/poetry" \ - # make poetry create the virtual environment in the project's root - # it gets named `.venv` - POETRY_VIRTUALENVS_IN_PROJECT=true \ - # do not ask any interactive question - POETRY_NO_INTERACTION=1 \ - \ - # this is where our requirements + virtual environment will live - PYSETUP_PATH="/opt/pysetup" \ - VENV_PATH="/opt/pysetup/.venv" - -# prepend poetry and venv to path -ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH" - -# `builder-base` stage is used to build deps + create our virtual environment -FROM python-base as builder-base -RUN apt-get update \ - && apt-get install --no-install-recommends -y \ - curl \ - build-essential - -# install poetry - respects $POETRY_VERSION & $POETRY_HOME -RUN curl -sSL https://install.python-poetry.org/ | python3 - --version 1.1.13 - -# copy project requirement files here to ensure they will be cached. -WORKDIR $PYSETUP_PATH -COPY poetry.lock pyproject.toml ./ - -ENV PATH="${PATH}:/root/.poetry/bin" - -# install runtime deps - uses $POETRY_VIRTUALENVS_IN_PROJECT internally -RUN poetry install --no-dev - -# `development` image is used during development / testing -FROM python-base as development - -RUN apt-get update && apt-get upgrade -y curl \ - awscli - -WORKDIR $PYSETUP_PATH - -# copy in our built poetry + venv -COPY --from=builder-base $POETRY_HOME $POETRY_HOME -COPY --from=builder-base $PYSETUP_PATH $PYSETUP_PATH -ENV PATH="${PATH}:/root/.poetry/bin" - -# will become mountpoint of our code -WORKDIR /app - -COPY errorLogsToSlack.py poetry.lock pyproject.toml ./ - -# quicker install as runtime deps are already installed -RUN poetry install - -# notice I haven't set the environment variable values or args here. -# the slack webhook and AWS region should be configured in the job definition, -# and the args should be configured at submission time -ENV AWS_REGION $AWS_REGION -ENV SLACK_WEBHOOK $SLACK_WEBHOOK -CMD ["poetry", "run", "python3", "./errorLogsToSlack.py"] diff --git a/ingestion/monitoring/README.md b/ingestion/monitoring/README.md index 14d34caf6..7e4f5d395 100644 --- a/ingestion/monitoring/README.md +++ b/ingestion/monitoring/README.md @@ -1,20 +1,3 @@ -# Error monitoring - -The `errorLogsToSlack.py` script reads log messages from a given Cloudwatch stream -and posts any errors to Slack. It has three inputs, all passed via the environment: - - - `SLACK_WEBHOOK` is the webhook URL to post messages to Slack. - - `INGESTION_LOG_GROUP` is the Cloudwatch log group name. - - `INGESTION_LOG_STREAM` is the Cloudwatch log stream name. - -Typically, all would be set up EventBridge in AWS when it's run in Batch. - -## To set up for a new instance - -1. see https://api.slack.com/messaging/webhooks for details on creating a Slack app and enabling web hooks. -2. change the Slack user IDs in the script to ones that represent users in your workspace (who should get notified on ingestion errors). -3. deploy to Batch - # Data monitoring Data monitoring scripts, currently there's a script to alert daily about diff --git a/ingestion/monitoring/errorLogsToSlack.py b/ingestion/monitoring/errorLogsToSlack.py deleted file mode 100644 index 52c7ffac0..000000000 --- a/ingestion/monitoring/errorLogsToSlack.py +++ /dev/null @@ -1,99 +0,0 @@ -import argparse -import json -import logging -import os -import requests -import re -import sys -from time import sleep - -import boto3 - - -class SlackHandler(logging.Handler): - def __init__(self, webhook_url, level=logging.NOTSET): - super().__init__(level) - self.slack_url = webhook_url - - def emit(self, record): - message_header = {'Content-Type': 'application/json'} - message = {'text': f"[{record.levelname}] {record.message}"} - response = requests.post(url=self.slack_url, data=json.dumps(message), headers=message_header) - if response.status_code == 429 and response['error'] == 'rate_limited': - sleep(response['retry_after']) - elif response.status_code != 200: - raise ValueError( - f"Request to slack returned an error {response.status_code}, the response is:\n{response.text}" - ) - - -def interpret(message): - graham = "<@U011A0TFM7X>" - abhishek = "<@U01F70FAJ6N>" - jim = "<@U01TAHDR4F7>" - engineers = f"{graham} {abhishek} {jim}" - lower = message.lower() - if "'dateRange': {'start':".lower() in lower: - return (logging.INFO, f"BACKFILL INITIATED\n{message}") - if "error" in lower: - return (logging.ERROR, f"PARSER ERROR: {engineers}\n{message}") - if "timed out" in lower: - return (logging.ERROR, f"TIME OUT: {engineers}\n{message}") - if lower.startswith('info:'): - return (logging.INFO, message) - return (logging.WARN, message) - -def setup_logger(): - logger = logging.getLogger(__name__) - logger.setLevel(logging.WARN) - stdoutHandler = logging.StreamHandler(stream=sys.stdout) - stdoutHandler.setLevel(logging.DEBUG) - logger.addHandler(stdoutHandler) - slackHandler = SlackHandler(os.getenv('SLACK_WEBHOOK'), logging.DEBUG) - logger.addHandler(slackHandler) - return logger - -def log_messages(cloudwatch_response, logger): - for message in [e['message'] for e in cloudwatch_response['events']]: - (severity, output) = interpret(message) - logger.log(severity, output) - - -if __name__ == '__main__': - logger = setup_logger() - parser = argparse.ArgumentParser() - parser.add_argument("group", help="AWS log group name for the failed parser") - parser.add_argument("stream", help="AWS log stream name for the failed parser") - args = parser.parse_args() - logGroup = args.group - logStream = args.stream - if logGroup is None or logStream is None: - logger.critical(f"Cannot get messages from log group {logGroup} and stream {logStream}") - sys.exit(1) - logger.info(f"Output from {logGroup}/{logStream}:") - hasMore = False - oldNext = '' - logClient = boto3.client('logs', region_name=os.getenv('AWS_REGION')) - response = logClient.get_log_events( - logGroupName=logGroup, - logStreamName=logStream, - startFromHead=True - ) - log_messages(response, logger) - oldNext = response['nextForwardToken'] - if oldNext and len(oldNext) > 0: - hasMore = True - while hasMore: - response = logClient.get_log_events( - logGroupName=logGroup, - logStreamName=logStream, - startFromHead=True, - nextToken=oldNext - ) - log_messages(response, logger) - newNext = response['nextForwardToken'] - if (not newNext) or (newNext == oldNext): - hasMore = False - else: - oldNext = newNext - logger.info(f"End of output from {logGroup}/{logStream}")