Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/update scripts #29

Merged
merged 45 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
e545088
add in vpc_id and security group lookups
sliu008 Jul 30, 2024
7f2ae74
/deploy sit
sliu008 Jul 31, 2024
4cecece
/deploy sit
sliu008 Jul 31, 2024
edf4e29
/deploy sit
sliu008 Jul 31, 2024
8ab1f63
/deploy sit
sliu008 Jul 31, 2024
9ecc372
/deploy sit
sliu008 Jul 31, 2024
1f676be
/deploy sit
sliu008 Jul 31, 2024
ce9d1e4
/deploy sit
sliu008 Jul 31, 2024
f42a358
/deploy sit
sliu008 Jul 31, 2024
52d0757
/deploy sit
sliu008 Jul 31, 2024
fa920b4
/deploy sit
sliu008 Jul 31, 2024
c513dfb
/deploy sit
sliu008 Jul 31, 2024
90a25f8
/deploy sit
sliu008 Jul 31, 2024
0cca762
/deploy sit
sliu008 Jul 31, 2024
912d5f8
/deploy sit
sliu008 Jul 31, 2024
509a7f7
/deploy sit
sliu008 Jul 31, 2024
48137ac
/deploy sit
sliu008 Jul 31, 2024
7d46908
/deploy sit
sliu008 Jul 31, 2024
19c3a04
/deploy sit
sliu008 Jul 31, 2024
dd1656e
/deploy sit
sliu008 Jul 31, 2024
0e6ef05
/deploy sit
sliu008 Jul 31, 2024
b1ec93d
/deploy sit
sliu008 Jul 31, 2024
d294415
/deploy sit
sliu008 Jul 31, 2024
967db59
/deploy sit
sliu008 Aug 1, 2024
d90869c
/deploy sit
sliu008 Aug 1, 2024
55021c3
/deploy sit
sliu008 Aug 1, 2024
a0255cf
removed pyc files
sliu008 Aug 1, 2024
ccba004
/version 0.9.0-alpha.4
sliu008 Jul 30, 2024
5eedfb1
update terraform deploy
sliu008 Aug 1, 2024
d5ae1d9
update to have default message config passed in as a configuration
sliu008 Aug 2, 2024
cb074f6
/deploy uat
sliu008 Aug 12, 2024
0698a69
/deploy uat
sliu008 Aug 12, 2024
97a96fc
/deploy uat
sliu008 Aug 12, 2024
7b83cb2
/deploy uat
sliu008 Aug 12, 2024
9dce0f3
update forge py
sliu008 Aug 13, 2024
d9d2ee6
/deploy sit
sliu008 Aug 15, 2024
20372cb
moved regression and memory profiler scripts
sliu008 Aug 16, 2024
5cf8d03
fix pylint, add changelog, and update forge-py
sliu008 Aug 19, 2024
261f0fc
Merge branch 'develop' into feature/terraform-deploy
sliu008 Aug 19, 2024
8691aa6
update default message config with example values
sliu008 Aug 19, 2024
0b61d3f
update regression and memory profiler scripts
sliu008 Aug 20, 2024
8adee37
update python libraries
sliu008 Aug 21, 2024
fa5ce08
poetry update
sliu008 Aug 21, 2024
5266077
merge develop
sliu008 Aug 22, 2024
5d3d41d
update preview message
sliu008 Aug 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions podaac/hitide_backfill_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ def process_granules(self):
def print_monthly_results_table(self):
"""Function to print out monthly stats"""

if not self.message_senders:
print("** NOTE: When in preview mode, the messages sent count may not be accurate since it's only simulating sending messages. ** \n")

print("Monthly Counts Summary:\n")
header = f"{'Date':<10} {'Granules':<10} {'Need Image':<12} {'Need Footprint':<16} {'Both FP & BBox':<16} {'Need DMRPP':<12}"

Expand Down
262 changes: 141 additions & 121 deletions podaac/hitide_backfill_tool/memory_profiler.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,25 @@
# pylint: disable=redefined-outer-name, line-too-long, too-many-locals

"""Script to profile lambda performance"""

import json
import re
import time
import statistics
import csv
import argparse
from collections import defaultdict
import boto3

session = boto3.Session(profile_name="service-uat")
client = session.client('logs')

request_collection = {}
memory_collection = {}
billed_collection = {}


def query_cloudwatch(query):
"""Function to query cloudwatch"""

log_group_name = '/aws/lambda/svc-tig-podaac-services-uat-hitide-backfill-lambda'
response = client.start_query(
logGroupName=log_group_name,
startTime=int((time.time() - 1 * 3600) * 1000), # Two hours ago
endTime=int(time.time() * 1000),
queryString=query,
limit=10000
)
memory_collection = defaultdict(list)
billed_collection = defaultdict(list)

query_id = response['queryId']

while True:
query_status = client.get_query_results(queryId=query_id)
status = query_status['status']
if status == 'Complete':
break
if status in ['Failed', 'Cancelled']:
print("Query execution failed or was cancelled.")
break
time.sleep(1)

return query_id


def execute_query_with_pagination(query, start_time, end_time):
def execute_query_with_pagination(query, start_time, end_time, client, log_group):
"""Function to execute query with pagination"""

log_group_name = '/aws/lambda/svc-tig-podaac-services-uat-hitide-backfill-lambda'
log_group_name = f'/aws/lambda/{log_group}'

response = client.start_query(
logGroupName=log_group_name,
Expand All @@ -68,23 +42,19 @@ def execute_query_with_pagination(query, start_time, end_time):
time.sleep(1)

# Retrieve initial results
results = client.get_query_results(
queryId=query_id
)

results = client.get_query_results(queryId=query_id)
data = results['results']

return data


def execute_query_for_minute(query, minute_start_time, minute_end_time):
def execute_query_for_minute(query, minute_start_time, minute_end_time, client, log_group):
"""Function to execute query for a given minute"""

results = execute_query_with_pagination(query, minute_start_time, minute_end_time)
results = execute_query_with_pagination(query, minute_start_time, minute_end_time, client, log_group)
return results


def execute_query_for_time_range(query, start_time, end_time):
def execute_query_for_time_range(query, start_time, end_time, client, log_group):
"""Function to execute query for a given time range, minute by minute"""

all_results = []
Expand All @@ -93,118 +63,168 @@ def execute_query_for_time_range(query, start_time, end_time):
while current_time < end_time:
minute_start_time = current_time
minute_end_time = current_time + (300 * 1000)
results = execute_query_for_minute(query, minute_start_time, minute_end_time)
results = execute_query_for_minute(query, minute_start_time, minute_end_time, client, log_group)
all_results.extend(results)
current_time = minute_end_time

return all_results


if __name__ == "__main__":
def process_items(items):
"""Function to process a collection for stats"""
filtered_items = [x for x in items if x is not None]
if not filtered_items:
filtered_items = [0]

# Compile regex patterns
minimum = min(filtered_items)
maximum = max(filtered_items)
sampled = len(filtered_items)
average = round(sum(filtered_items) / sampled, 1)
median = statistics.median(filtered_items)

return minimum, maximum, average, median, sampled


def parse_arguments():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Analyze AWS Lambda logs.")
parser.add_argument('--aws_lambda_log', type=str, help="Lambda log to profile", required=True)
parser.add_argument('--aws_profile', type=str, help="AWS profile to use", required=True)
parser.add_argument('--start_time', type=int, help="Start time (hours ago) to analyze", default=1)
return parser.parse_args()


def setup_aws_client(profile_name):
"""Set up AWS boto3 client for CloudWatch Logs."""
session = boto3.Session(profile_name=profile_name)
return session.client('logs')


def compile_patterns():
"""Compile and return regex patterns."""
request_id_pattern = re.compile(r"RequestId: (\S+)")
memory_used_pattern = re.compile(r"Max Memory Used: (\d+) MB")
billed_duration_pattern = re.compile(r"Billed Duration: (\d+) ms")
return request_id_pattern, memory_used_pattern, billed_duration_pattern


# Combined query
COMBINED_QUERY = """
def execute_combined_query(client, log_group_name, start_time, end_time):
"""Execute a combined query on CloudWatch Logs."""
combined_query = """
fields @timestamp, @message
| filter (@message like /Max Memory Used:/ or @message like /aws_request_id/)
"""
return execute_query_for_time_range(combined_query, start_time, end_time, client, log_group_name)

start = int((time.time() - 2.5 * 3600) * 1000)
end = int((time.time() - 1.5 * 3600) * 1000)
response_query = execute_query_for_time_range(COMBINED_QUERY, start, end)

# Process results
def process_results(response_query, request_id_pattern, memory_used_pattern, billed_duration_pattern):
"""Process results from the CloudWatch Logs query."""
request_collection = {}

for result in response_query:
text = result[1]['value']
if 'aws_request_id' in text:
json_message = json.loads(text).get('message')
try:
message = json.loads(json_message)
request_id = message.get('aws_request_id')
collection = message.get('collection')
if request_id in request_collection:
request_collection[request_id]["request_id"] = request_id
request_collection[request_id]["collection"] = collection
else:
request_collection[request_id] = {
"request_id": request_id,
"collection": collection
}
except Exception: # pylint: disable=broad-exception-caught
pass

if 'aws_request_id' in text:
process_aws_request_id(text, request_collection)
elif 'Max Memory Used:' in text:
request_id_match = request_id_pattern.search(text)
memory_used_match = memory_used_pattern.search(text)
billed_duration_match = billed_duration_pattern.search(text)
if request_id_match and memory_used_match:
request_id = request_id_match.group(1)
memory_used = int(memory_used_match.group(1))
billed_duration = int(billed_duration_match.group(1))
if request_id in request_collection:
request_collection[request_id]["memory_used"] = memory_used
request_collection[request_id]["billed_duration"] = billed_duration
else:
request_collection[request_id] = {
"memory_used": memory_used,
"billed_duration": billed_duration
}

for key, item in request_collection.items():

collection = item.get('collection', None)
memory = item.get('memory_used')
billed_duration = item.get('billed_duration')
process_max_memory_used(text, request_id_pattern, memory_used_pattern, billed_duration_pattern, request_collection)

return request_collection


def process_aws_request_id(text, request_collection):
"""Process and update request collection for aws_request_id."""
try:
message = json.loads(json.loads(text).get('message', '{}'))
request_id = message.get('aws_request_id')
collection = message.get('collection')

if request_id:
request_collection.setdefault(request_id, {}).update({
"request_id": request_id,
"collection": collection
})
except (json.JSONDecodeError, TypeError):
pass


def process_max_memory_used(text, request_id_pattern, memory_used_pattern, billed_duration_pattern, request_collection):
"""Process and update request collection for Max Memory Used."""
request_id_match = request_id_pattern.search(text)
memory_used_match = memory_used_pattern.search(text)
billed_duration_match = billed_duration_pattern.search(text)

if request_id_match and memory_used_match and billed_duration_match:
request_id = request_id_match.group(1)
memory_used = int(memory_used_match.group(1))
billed_duration = int(billed_duration_match.group(1))

request_collection.setdefault(request_id, {}).update({
"memory_used": memory_used,
"billed_duration": billed_duration
})


def update_memory_billed_collections(request_collection):
"""Update memory and billed collections from request_collection."""
memory_collection = defaultdict(list)
billed_collection = defaultdict(list)

for item in request_collection.values():
collection = item.get('collection')
if collection:
if collection in memory_collection:
memory_collection[collection].append(memory)
billed_collection[collection].append(billed_duration)
else:
memory_collection[collection] = [memory]
billed_collection[collection] = [billed_duration]

CSV_FILENAME = "collection_statistics.csv"
memory_collection[collection].append(item.get('memory_used'))
billed_collection[collection].append(item.get('billed_duration'))

return memory_collection, billed_collection


def write_csv(memory_collection, billed_collection):
"""Write collection statistics to a CSV file."""
csv_filename = "collection_statistics.csv"
header = [
"Collection", "Mem Max", "Mem Min", "Mem Med", "Mem Avg",
"Bill Max", "Bill Min", "Bill Med", "Bill Avg", "Sampled"
]

with open(CSV_FILENAME, mode='w', newline='') as file: # pylint: disable=unspecified-encoding

with open(csv_filename, mode='w', newline='') as file: # pylint: disable=unspecified-encoding
writer = csv.writer(file)
writer.writerow(header)

for key in sorted(memory_collection.keys()):
item = memory_collection.get(key, [])
bill_item = billed_collection.get(key, [])

item = memory_collection[key]
_item = [x for x in item if x is not None]

if len(item) == 0 or item is None:
_item = [0]

bill_item = billed_collection[key]
_bill_item = [x for x in bill_item if x is not None]
if len(bill_item) == 0 or bill_item is None:
_bill_item = [0]

collection = key
minimum = min(_item)
maximum = max(_item)
sampled = len(_item)
average = round(sum(_item) / sampled, 1)
median = statistics.median(_item)

bill_min = min(_bill_item)
bill_max = max(_bill_item)
bill_avg = round(sum(_bill_item) / sampled, 1)
bill_med = statistics.median(_bill_item)
minimum, maximum, average, median, sampled = process_items(item)
bill_min, bill_max, bill_avg, bill_med, _ = process_items(bill_item)

row = [
collection, maximum, minimum, median, average,
key, maximum, minimum, median, average,
bill_max, bill_min, bill_med, bill_avg, sampled
]
writer.writerow(row)


def main():
"""Main function for the script."""
args = parse_arguments()
client = setup_aws_client(args.aws_profile)

request_id_pattern, memory_used_pattern, billed_duration_pattern = compile_patterns()

start_time = int((time.time() - args.start_time * 3600) * 1000)
end_time = int((time.time()) * 1000)

response_query = execute_combined_query(client, args.aws_lambda_log, start_time, end_time)

request_collection = process_results(
response_query, request_id_pattern, memory_used_pattern, billed_duration_pattern
)

memory_collection, billed_collection = update_memory_billed_collections(request_collection)

write_csv(memory_collection, billed_collection)


if __name__ == "__main__":
main()
14 changes: 12 additions & 2 deletions podaac/hitide_backfill_tool/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

Test TIG on all our collections.
"""
import argparse
import os
import subprocess
import requests
Expand Down Expand Up @@ -37,7 +38,12 @@ def download_configs(config_dir):
file.write(config_file.content)


if __name__ == "__main__":
def main():
"""main function for regression"""
parser = argparse.ArgumentParser()
parser.add_argument('--backfill_config', type=str,
help="path to backfill config", required=True)
args = parser.parse_args()

test_dir = os.path.dirname(os.path.realpath(__file__))
config_directory = f'{test_dir}/dl_configs'
Expand All @@ -48,6 +54,10 @@ def download_configs(config_dir):

for _file in files:
collection = _file.strip('.cfg')
cli_command = f'backfill --config backfill_uat.cfg --collection {collection}'
cli_command = f'backfill --config {args.backfill_config} --collection {collection}'
result = make_cli_call(cli_command)
print(result)


if __name__ == "__main__":
main()
Loading