Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding monitoring to send clean logging emails #420

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions sds_data_manager/constructs/indexer_lambda_construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def __init__(
vpc_subnets,
rds_security_group,
data_bucket,
sns_topic,
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved
layers: list,
**kwargs,
) -> None:
Expand All @@ -47,9 +46,6 @@ def __init__(
The RDS security group
data_bucket : obj
The data bucket
sns_topic : aws_sns.Topic
SNS Topic for sending notifications so that external
resources can subscribe to for alerts.
layers : list
List of Lambda layers cdk.cdfnOutput names
kwargs : dict
Expand Down Expand Up @@ -134,20 +130,6 @@ def __init__(
),
)

# Uses batch job status of failure
# to trigger a sns topic
batch_job_failure_rule = events.Rule(
self,
"batchJobFailure",
rule_name="batch-job-failure",
event_pattern=events.EventPattern(
source=["aws.batch"],
detail_type=["Batch Job State Change"],
detail={"status": ["FAILED"]},
),
)

# Add the Lambda function as the target for the rules
imap_data_arrival_rule.add_target(targets.LambdaFunction(indexer_lambda))
batch_job_status_rule.add_target(targets.LambdaFunction(indexer_lambda))
batch_job_failure_rule.add_target(targets.SnsTopic(sns_topic))
80 changes: 80 additions & 0 deletions sds_data_manager/constructs/monitoring_lambda_construct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Configure monitoring formatter lambda."""

import aws_cdk as cdk
from aws_cdk import aws_events as events
from aws_cdk import aws_events_targets as targets
from aws_cdk import aws_iam as iam
from aws_cdk import aws_lambda as lambda_
from constructs import Construct


class MonitoringLambda(Construct):
"""Construct for monitoring lambda."""

def __init__(
self,
scope: Construct,
construct_id: str,
code: lambda_.Code,
sns_topic,
**kwargs,
) -> None:
"""MonitoringLambda Construct.

Parameters
----------
scope : Construct
Parent construct.
construct_id : str
A unique string identifier for this construct.
code : aws_lambda.Code
Lambda code bundle
sns_topic : aws_sns.Topic
SNS Topic for sending notifications so that external
resources can subscribe to for alerts.
kwargs : dict
Keyword arguments

"""
super().__init__(scope, construct_id, **kwargs)

monitoring_lambda = lambda_.Function(
self,
id="MonitoringLambda",
function_name="monitoring",
code=code,
handler="SDSCode.pipeline_lambdas.monitoring.lambda_handler",
runtime=lambda_.Runtime.PYTHON_3_12,
timeout=cdk.Duration.minutes(1),
memory_size=1000,
environment={
"SNS_TOPIC_ARN": sns_topic.topic_arn,
},
allow_public_subnet=True,
architecture=lambda_.Architecture.ARM_64,
)

# Uses batch job status of failure
# to trigger a sns topic
batch_job_failure_rule = events.Rule(
self,
"batchJobFailure",
rule_name="batch-job-failed",
event_pattern=events.EventPattern(
source=["aws.batch"],
detail_type=["Batch Job State Change"],
detail={"status": ["FAILED"]},
),
)

# monitoring lambda will retrieve logs and publish output to SNS
sns_topic.grant_publish(monitoring_lambda)

monitoring_lambda.add_to_role_policy(
iam.PolicyStatement(
actions=["logs:GetLogEvents"],
resources=["arn:aws:logs:*:*:log-group:/aws/batch/*"],
)
)

batch_job_failure_rule.add_target(targets.LambdaFunction(monitoring_lambda))
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Lambda function to send a formatted SNS notification when a Batch job fails."""

import logging
import os

import boto3

SNS_CLIENT = boto3.client("sns")
LOGS_CLIENT = boto3.client("logs")

# Logger setup
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def lambda_handler(event, context):
"""Lambda handler to send an SNS notification when a Batch job fails.

Lambda will format the message and retrieve logging from the failed job, before
sending a message to the notification service (SNS topic defined by the environment
variable "SNS_ARN").

Parameters
----------
event : dict
The JSON formatted document with the data required for the
lambda function to process. Source event is from AWS Batch.
context : obj
The context object for the lambda function
"""
logger.info("Received event: %s", event)

if event.get("source") != "aws.batch":
logger.error("Function only supports AWS Batch events")
return {"statusCode": 400, "body": "Bad Request"}

# Extract relevant details from the event
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved
detail = event.get("detail", {})

job_name = detail.get("jobName")
job_id = detail.get("jobId")

log_stream_name = (
detail.get("attempts", [{}])[0].get("container", {}).get("logStreamName", None)
)

status_reason = detail.get("statusReason", "No reason provided")

# Fetch logs if logStreamName is available
logs = []
if log_stream_name:
log_group_name = "/aws/batch/job"
try:
response = LOGS_CLIENT.get_log_events(
logGroupName=log_group_name, logStreamName=log_stream_name, limit=10
)
logs = [event["message"] for event in response.get("events", [])]
except Exception as e:
logs.append(f"Could not fetch logs: {e!s}")

# Format email message
formatted_message = f"""
Batch Job Failed!

Job Name: {job_name}
Job ID: {job_id}
Status Reason: {status_reason}

Logs (Last 10 lines):
{''.join(logs) if logs else 'No logs available'}
"""
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved

logger.info(f"Formatted Message: {formatted_message}")

# Send the formatted message to the SNS topic
SNS_CLIENT.publish(
TopicArn=os.environ["SNS_TOPIC_ARN"],
Subject=f"Batch Job Failure: {job_name}",
Message=formatted_message,
)

return {"statusCode": 200, "body": "Notification sent"}
tech3371 marked this conversation as resolved.
Show resolved Hide resolved
9 changes: 8 additions & 1 deletion sds_data_manager/utils/stackbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
instrument_lambdas,
lambda_layer_construct,
monitoring_construct,
monitoring_lambda_construct,
networking_construct,
processing_construct,
route53_hosted_zone,
Expand Down Expand Up @@ -185,10 +186,16 @@ def build_sds(
vpc_subnets=rds_construct.rds_subnet_selection,
rds_security_group=rds_construct.rds_security_group,
data_bucket=data_bucket.data_bucket,
sns_topic=monitoring.sns_topic_notifications,
layers=[db_lambda_layer],
)

monitoring_lambda_construct.MonitoringLambda(
scope=sdc_stack,
construct_id="MonitoringLambda",
code=lambda_code,
sns_topic=monitoring.sns_topic_notifications,
)

sds_api_manager_construct.SdsApiManager(
scope=sdc_stack,
construct_id="SdsApiManager",
Expand Down
5 changes: 0 additions & 5 deletions tests/infrastructure/test_indexer_lambda_construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from sds_data_manager.constructs.data_bucket_construct import DataBucketConstruct
from sds_data_manager.constructs.database_construct import SdpDatabase
from sds_data_manager.constructs.indexer_lambda_construct import IndexerLambda
from sds_data_manager.constructs.monitoring_construct import MonitoringConstruct
from sds_data_manager.constructs.networking_construct import NetworkingConstruct


Expand All @@ -34,9 +33,6 @@ def template(stack, env, code):
code=code,
layers=[],
)
monitoring_construct = MonitoringConstruct(
stack, construct_id="MonitoringConstruct"
)
IndexerLambda(
stack,
"indexer-lambda",
Expand All @@ -46,7 +42,6 @@ def template(stack, env, code):
vpc_subnets=database_construct.rds_subnet_selection,
rds_security_group=database_construct.rds_security_group,
data_bucket=data_bucket.data_bucket,
sns_topic=monitoring_construct.sns_topic_notifications,
layers=[],
)

Expand Down
Loading