Skip to content

Commit

Permalink
Feature/update scripts (#29)
Browse files Browse the repository at this point in the history
* add in vpc_id and security group lookups

* removed pyc files

* /version 0.9.0-alpha.4

* update terraform deploy

* update to have default message config passed in as a configuration

* update forge py

* moved regression and memory profiler scripts

* fix pylint, add changelog, and update forge-py

* update default message config with example values

* update regression and memory profiler scripts

* update python libraries

* poetry update

* update preview message

---------

Co-authored-by: sliu008 <sliu008@users.noreply.github.com>
  • Loading branch information
sliu008 and sliu008 authored Sep 5, 2024
1 parent 9d343b9 commit 6a200df
Show file tree
Hide file tree
Showing 6 changed files with 490 additions and 403 deletions.
3 changes: 3 additions & 0 deletions podaac/hitide_backfill_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ def process_granules(self):
def print_monthly_results_table(self):
"""Function to print out monthly stats"""

if not self.message_senders:
print("** NOTE: When in preview mode, the messages sent count may not be accurate since it's only simulating sending messages. ** \n")

print("Monthly Counts Summary:\n")
header = f"{'Date':<10} {'Granules':<10} {'Need Image':<12} {'Need Footprint':<16} {'Both FP & BBox':<16} {'Need DMRPP':<12}"

Expand Down
262 changes: 141 additions & 121 deletions podaac/hitide_backfill_tool/memory_profiler.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,25 @@
# pylint: disable=redefined-outer-name, line-too-long, too-many-locals

"""Script to profile lambda performance"""

import json
import re
import time
import statistics
import csv
import argparse
from collections import defaultdict
import boto3

session = boto3.Session(profile_name="service-uat")
client = session.client('logs')

request_collection = {}
memory_collection = {}
billed_collection = {}


def query_cloudwatch(query):
"""Function to query cloudwatch"""

log_group_name = '/aws/lambda/svc-tig-podaac-services-uat-hitide-backfill-lambda'
response = client.start_query(
logGroupName=log_group_name,
startTime=int((time.time() - 1 * 3600) * 1000), # Two hours ago
endTime=int(time.time() * 1000),
queryString=query,
limit=10000
)
memory_collection = defaultdict(list)
billed_collection = defaultdict(list)

query_id = response['queryId']

while True:
query_status = client.get_query_results(queryId=query_id)
status = query_status['status']
if status == 'Complete':
break
if status in ['Failed', 'Cancelled']:
print("Query execution failed or was cancelled.")
break
time.sleep(1)

return query_id


def execute_query_with_pagination(query, start_time, end_time):
def execute_query_with_pagination(query, start_time, end_time, client, log_group):
"""Function to execute query with pagination"""

log_group_name = '/aws/lambda/svc-tig-podaac-services-uat-hitide-backfill-lambda'
log_group_name = f'/aws/lambda/{log_group}'

response = client.start_query(
logGroupName=log_group_name,
Expand All @@ -68,23 +42,19 @@ def execute_query_with_pagination(query, start_time, end_time):
time.sleep(1)

# Retrieve initial results
results = client.get_query_results(
queryId=query_id
)

results = client.get_query_results(queryId=query_id)
data = results['results']

return data


def execute_query_for_minute(query, minute_start_time, minute_end_time):
def execute_query_for_minute(query, minute_start_time, minute_end_time, client, log_group):
"""Function to execute query for a given minute"""

results = execute_query_with_pagination(query, minute_start_time, minute_end_time)
results = execute_query_with_pagination(query, minute_start_time, minute_end_time, client, log_group)
return results


def execute_query_for_time_range(query, start_time, end_time):
def execute_query_for_time_range(query, start_time, end_time, client, log_group):
"""Function to execute query for a given time range, minute by minute"""

all_results = []
Expand All @@ -93,118 +63,168 @@ def execute_query_for_time_range(query, start_time, end_time):
while current_time < end_time:
minute_start_time = current_time
minute_end_time = current_time + (300 * 1000)
results = execute_query_for_minute(query, minute_start_time, minute_end_time)
results = execute_query_for_minute(query, minute_start_time, minute_end_time, client, log_group)
all_results.extend(results)
current_time = minute_end_time

return all_results


if __name__ == "__main__":
def process_items(items):
"""Function to process a collection for stats"""
filtered_items = [x for x in items if x is not None]
if not filtered_items:
filtered_items = [0]

# Compile regex patterns
minimum = min(filtered_items)
maximum = max(filtered_items)
sampled = len(filtered_items)
average = round(sum(filtered_items) / sampled, 1)
median = statistics.median(filtered_items)

return minimum, maximum, average, median, sampled


def parse_arguments():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Analyze AWS Lambda logs.")
parser.add_argument('--aws_lambda_log', type=str, help="Lambda log to profile", required=True)
parser.add_argument('--aws_profile', type=str, help="AWS profile to use", required=True)
parser.add_argument('--start_time', type=int, help="Start time (hours ago) to analyze", default=1)
return parser.parse_args()


def setup_aws_client(profile_name):
"""Set up AWS boto3 client for CloudWatch Logs."""
session = boto3.Session(profile_name=profile_name)
return session.client('logs')


def compile_patterns():
"""Compile and return regex patterns."""
request_id_pattern = re.compile(r"RequestId: (\S+)")
memory_used_pattern = re.compile(r"Max Memory Used: (\d+) MB")
billed_duration_pattern = re.compile(r"Billed Duration: (\d+) ms")
return request_id_pattern, memory_used_pattern, billed_duration_pattern


# Combined query
COMBINED_QUERY = """
def execute_combined_query(client, log_group_name, start_time, end_time):
"""Execute a combined query on CloudWatch Logs."""
combined_query = """
fields @timestamp, @message
| filter (@message like /Max Memory Used:/ or @message like /aws_request_id/)
"""
return execute_query_for_time_range(combined_query, start_time, end_time, client, log_group_name)

start = int((time.time() - 2.5 * 3600) * 1000)
end = int((time.time() - 1.5 * 3600) * 1000)
response_query = execute_query_for_time_range(COMBINED_QUERY, start, end)

# Process results
def process_results(response_query, request_id_pattern, memory_used_pattern, billed_duration_pattern):
"""Process results from the CloudWatch Logs query."""
request_collection = {}

for result in response_query:
text = result[1]['value']
if 'aws_request_id' in text:
json_message = json.loads(text).get('message')
try:
message = json.loads(json_message)
request_id = message.get('aws_request_id')
collection = message.get('collection')
if request_id in request_collection:
request_collection[request_id]["request_id"] = request_id
request_collection[request_id]["collection"] = collection
else:
request_collection[request_id] = {
"request_id": request_id,
"collection": collection
}
except Exception: # pylint: disable=broad-exception-caught
pass

if 'aws_request_id' in text:
process_aws_request_id(text, request_collection)
elif 'Max Memory Used:' in text:
request_id_match = request_id_pattern.search(text)
memory_used_match = memory_used_pattern.search(text)
billed_duration_match = billed_duration_pattern.search(text)
if request_id_match and memory_used_match:
request_id = request_id_match.group(1)
memory_used = int(memory_used_match.group(1))
billed_duration = int(billed_duration_match.group(1))
if request_id in request_collection:
request_collection[request_id]["memory_used"] = memory_used
request_collection[request_id]["billed_duration"] = billed_duration
else:
request_collection[request_id] = {
"memory_used": memory_used,
"billed_duration": billed_duration
}

for key, item in request_collection.items():

collection = item.get('collection', None)
memory = item.get('memory_used')
billed_duration = item.get('billed_duration')
process_max_memory_used(text, request_id_pattern, memory_used_pattern, billed_duration_pattern, request_collection)

return request_collection


def process_aws_request_id(text, request_collection):
"""Process and update request collection for aws_request_id."""
try:
message = json.loads(json.loads(text).get('message', '{}'))
request_id = message.get('aws_request_id')
collection = message.get('collection')

if request_id:
request_collection.setdefault(request_id, {}).update({
"request_id": request_id,
"collection": collection
})
except (json.JSONDecodeError, TypeError):
pass


def process_max_memory_used(text, request_id_pattern, memory_used_pattern, billed_duration_pattern, request_collection):
"""Process and update request collection for Max Memory Used."""
request_id_match = request_id_pattern.search(text)
memory_used_match = memory_used_pattern.search(text)
billed_duration_match = billed_duration_pattern.search(text)

if request_id_match and memory_used_match and billed_duration_match:
request_id = request_id_match.group(1)
memory_used = int(memory_used_match.group(1))
billed_duration = int(billed_duration_match.group(1))

request_collection.setdefault(request_id, {}).update({
"memory_used": memory_used,
"billed_duration": billed_duration
})


def update_memory_billed_collections(request_collection):
"""Update memory and billed collections from request_collection."""
memory_collection = defaultdict(list)
billed_collection = defaultdict(list)

for item in request_collection.values():
collection = item.get('collection')
if collection:
if collection in memory_collection:
memory_collection[collection].append(memory)
billed_collection[collection].append(billed_duration)
else:
memory_collection[collection] = [memory]
billed_collection[collection] = [billed_duration]

CSV_FILENAME = "collection_statistics.csv"
memory_collection[collection].append(item.get('memory_used'))
billed_collection[collection].append(item.get('billed_duration'))

return memory_collection, billed_collection


def write_csv(memory_collection, billed_collection):
"""Write collection statistics to a CSV file."""
csv_filename = "collection_statistics.csv"
header = [
"Collection", "Mem Max", "Mem Min", "Mem Med", "Mem Avg",
"Bill Max", "Bill Min", "Bill Med", "Bill Avg", "Sampled"
]

with open(CSV_FILENAME, mode='w', newline='') as file: # pylint: disable=unspecified-encoding

with open(csv_filename, mode='w', newline='') as file: # pylint: disable=unspecified-encoding
writer = csv.writer(file)
writer.writerow(header)

for key in sorted(memory_collection.keys()):
item = memory_collection.get(key, [])
bill_item = billed_collection.get(key, [])

item = memory_collection[key]
_item = [x for x in item if x is not None]

if len(item) == 0 or item is None:
_item = [0]

bill_item = billed_collection[key]
_bill_item = [x for x in bill_item if x is not None]
if len(bill_item) == 0 or bill_item is None:
_bill_item = [0]

collection = key
minimum = min(_item)
maximum = max(_item)
sampled = len(_item)
average = round(sum(_item) / sampled, 1)
median = statistics.median(_item)

bill_min = min(_bill_item)
bill_max = max(_bill_item)
bill_avg = round(sum(_bill_item) / sampled, 1)
bill_med = statistics.median(_bill_item)
minimum, maximum, average, median, sampled = process_items(item)
bill_min, bill_max, bill_avg, bill_med, _ = process_items(bill_item)

row = [
collection, maximum, minimum, median, average,
key, maximum, minimum, median, average,
bill_max, bill_min, bill_med, bill_avg, sampled
]
writer.writerow(row)


def main():
"""Main function for the script."""
args = parse_arguments()
client = setup_aws_client(args.aws_profile)

request_id_pattern, memory_used_pattern, billed_duration_pattern = compile_patterns()

start_time = int((time.time() - args.start_time * 3600) * 1000)
end_time = int((time.time()) * 1000)

response_query = execute_combined_query(client, args.aws_lambda_log, start_time, end_time)

request_collection = process_results(
response_query, request_id_pattern, memory_used_pattern, billed_duration_pattern
)

memory_collection, billed_collection = update_memory_billed_collections(request_collection)

write_csv(memory_collection, billed_collection)


if __name__ == "__main__":
main()
14 changes: 12 additions & 2 deletions podaac/hitide_backfill_tool/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Test TIG on all our collections.
"""
import argparse
import os
import subprocess
import requests
Expand Down Expand Up @@ -37,7 +38,12 @@ def download_configs(config_dir):
file.write(config_file.content)


if __name__ == "__main__":
def main():
"""main function for regression"""
parser = argparse.ArgumentParser()
parser.add_argument('--backfill_config', type=str,
help="path to backfill config", required=True)
args = parser.parse_args()

test_dir = os.path.dirname(os.path.realpath(__file__))
config_directory = f'{test_dir}/dl_configs'
Expand All @@ -48,6 +54,10 @@ def download_configs(config_dir):

for _file in files:
collection = _file.strip('.cfg')
cli_command = f'backfill --config backfill_uat.cfg --collection {collection}'
cli_command = f'backfill --config {args.backfill_config} --collection {collection}'
result = make_cli_call(cli_command)
print(result)


if __name__ == "__main__":
main()
Loading

0 comments on commit 6a200df

Please sign in to comment.