Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding monitoring to send clean logging emails #420

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions sds_data_manager/constructs/indexer_lambda_construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def __init__(
vpc_subnets,
rds_security_group,
data_bucket,
sns_topic,
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved
layers: list,
**kwargs,
) -> None:
Expand Down Expand Up @@ -134,20 +133,6 @@ def __init__(
),
)

# Uses batch job status of failure
# to trigger a sns topic
batch_job_failure_rule = events.Rule(
self,
"batchJobFailure",
rule_name="batch-job-failure",
event_pattern=events.EventPattern(
source=["aws.batch"],
detail_type=["Batch Job State Change"],
detail={"status": ["FAILED"]},
),
)

# Add the Lambda function as the target for the rules
imap_data_arrival_rule.add_target(targets.LambdaFunction(indexer_lambda))
batch_job_status_rule.add_target(targets.LambdaFunction(indexer_lambda))
batch_job_failure_rule.add_target(targets.SnsTopic(sns_topic))
80 changes: 80 additions & 0 deletions sds_data_manager/constructs/monitoring_lambda_construct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Configure monitoring formatter lambda."""

import aws_cdk as cdk
from aws_cdk import aws_events as events
from aws_cdk import aws_events_targets as targets
from aws_cdk import aws_iam as iam
from aws_cdk import aws_lambda as lambda_
from constructs import Construct


class MonitoringLambda(Construct):
"""Construct for monitoring lambda."""

def __init__(
self,
scope: Construct,
construct_id: str,
code: lambda_.Code,
sns_topic,
**kwargs,
) -> None:
"""MonitoringLambda Construct.

Parameters
----------
scope : Construct
Parent construct.
construct_id : str
A unique string identifier for this construct.
code : aws_lambda.Code
Lambda code bundle
sns_topic : aws_sns.Topic
SNS Topic for sending notifications so that external
resources can subscribe to for alerts.
kwargs : dict
Keyword arguments

"""
super().__init__(scope, construct_id, **kwargs)

monitoring_lambda = lambda_.Function(
self,
id="MonitoringLambda",
function_name="monitoring",
code=code,
handler="SDSCode.pipeline_lambdas.monitoring.lambda_handler",
runtime=lambda_.Runtime.PYTHON_3_12,
timeout=cdk.Duration.minutes(1),
memory_size=1000,
environment={
"SNS_TOPIC_ARN": sns_topic.topic_arn,
},
allow_public_subnet=True,
architecture=lambda_.Architecture.ARM_64,
)

# Uses batch job status of failure
# to trigger a sns topic
batch_job_failure_rule = events.Rule(
self,
"batchJobFailure",
rule_name="batch-job-failed",
event_pattern=events.EventPattern(
source=["aws.batch"],
detail_type=["Batch Job State Change"],
detail={"status": ["FAILED"]},
),
)

# monitoring lambda will retrieve logs and publish output to SNS
sns_topic.grant_publish(monitoring_lambda)

monitoring_lambda.add_to_role_policy(
iam.PolicyStatement(
actions=["logs:GetLogEvents"],
resources=["arn:aws:logs:*:*:log-group:/aws/batch/*"],
)
)

batch_job_failure_rule.add_target(targets.LambdaFunction(monitoring_lambda))
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Lambda function to send a formatted SNS notification when a Batch job fails."""

import os

import boto3

sns_client = boto3.client("sns")
logs_client = boto3.client("logs")
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved


def lambda_handler(event, context):
"""Lambda handler to send an SNS notification when a Batch job fails.

Lambda will format the message and retrieve logging from the failed job, before
sending a message to the notification service (SNS topic defined by the environment
variable "SNS_ARN").

Parameters
----------
event : dict
The JSON formatted document with the data required for the
lambda function to process. Source event is from AWS Batch.
context : obj
The context object for the lambda function
"""
# Extract relevant details from the event
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved
detail = event.get("detail", {})

job_name = detail.get("jobName", "Unknown")
job_id = detail.get("jobId", "Unknown")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should always get jobName or jobId from batch job event. So can we remove the unknown options and handle error instead and return something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we still send an email if the event is "batch job failed" even if we don't know the job name or job id? How would you handle the error instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sure. I will try my best. I will add some clarification as well.

What I meant above is batch job event will be in this format where it should always have 'jobName' or 'jobId' or 'attempts' or 'container' and so on keys. I took out below example batch job event json from indexer.py lambda code. It's missing some other fields because I only kept information that indexer cares about. But batch event job will always has fixed keys.

{
        "detail-type": "Batch Job State Change",
        "source": "aws.batch",
        "detail": {
            "jobArn": (
                "arn:aws:batch:us-west-2:012345678910:"
                "job/26242c7e-3d49-4e41-9387-74fcaf9630bb"
            ),
            "jobName": "swe-l0-job",
            "jobId": "26242c7e-3d49-4e41-9387-74fcaf9630bb",
            "jobQueue": (
                "arn:aws:batch:us-west-2:012345678910:"
                "job-queue/swe-fargate-batch-job-queue"
            ),
            "status": "FAILED",
            "statusReason": "some error message",
            "jobDefinition": (
                "arn:aws:batch:us-west-2:012345678910:"
                "job-definition/fargate-batch-job-definitionswe:1"
            ),
            "container": {
                "image": (
                    "123456789012.dkr.ecr.us-west-2.amazonaws.com/" "swapi-repo:latest"
                ),
                "command": [
                    "--instrument", "swapi",
                    "--data-level", "l1",
                    "--descriptor", "sci",
                    "--start-date", "20230724",
                    "--version", "v001",
                    "--dependency", \"""[
                        {
                            'instrument': 'swapi',
                            'level': 'l0',
                            'start_date': 20230724,
                            'version': 'v001'
                        }
                    ]\""",
                    "--upload-to-sdc",
                ],
                "logStreamName": (
                    "fargate-batch-job-definitionswe/default/"
                    "8a2b784c7bd342f69ea5dac3adaed26f"
                ),
            },
        }
    }

Given above reason, we should never receive batch job event json that didn't have jobName or jobId and so on. If we do, then it's a bug and we need to correct our event source or lambda trigger source and update accordingly. I hope that helps add some context I missed to add last time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To your question about how we should handle error, I would check event source. If it's not 'aws.batch', then raise error and shouldn't send message to the SNS. That's my thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that makes sense! I'll add that case.

log_stream_name = (
detail.get("attempts", [{}])[0].get("container", {}).get("logStreamName", None)
)

status_reason = detail.get("statusReason", "No reason provided")

# Fetch logs if logStreamName is available
logs = []
if log_stream_name:
log_group_name = "/aws/batch/job"
try:
response = logs_client.get_log_events(
logGroupName=log_group_name, logStreamName=log_stream_name, limit=10
)
logs = [event["message"] for event in response.get("events", [])]
except Exception as e:
logs.append(f"Could not fetch logs: {e!s}")

# Format email message
formatted_message = f"""
Batch Job Failed!

Job Name: {job_name}
Job ID: {job_id}
Status Reason: {status_reason}

Logs (Last 10 lines):
{''.join(logs) if logs else 'No logs available'}
"""
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved

print(f"Formatted Message: {formatted_message}")

# Send the formatted message to the SNS topic
sns_client.publish(
TopicArn=os.environ["SNS_TOPIC_ARN"],
Subject=f"Batch Job Failure: {job_name}",
Message=formatted_message,
)

return {"statusCode": 200, "body": "Notification sent"}
tech3371 marked this conversation as resolved.
Show resolved Hide resolved
9 changes: 8 additions & 1 deletion sds_data_manager/utils/stackbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
instrument_lambdas,
lambda_layer_construct,
monitoring_construct,
monitoring_lambda_construct,
networking_construct,
processing_construct,
route53_hosted_zone,
Expand Down Expand Up @@ -185,10 +186,16 @@ def build_sds(
vpc_subnets=rds_construct.rds_subnet_selection,
rds_security_group=rds_construct.rds_security_group,
data_bucket=data_bucket.data_bucket,
sns_topic=monitoring.sns_topic_notifications,
layers=[db_lambda_layer],
)

monitoring_lambda_construct.MonitoringLambda(
scope=sdc_stack,
construct_id="MonitoringLambda",
code=lambda_code,
sns_topic=monitoring.sns_topic_notifications,
)

sds_api_manager_construct.SdsApiManager(
scope=sdc_stack,
construct_id="SdsApiManager",
Expand Down
Loading