Skip to content

Commit

Permalink
PRMP-642: Create Bulk ODS Update Lambda (#12)
Browse files Browse the repository at this point in the history
* Add bulk ods update lambda
  • Loading branch information
abbas-khan10 authored Aug 16, 2024
1 parent 32b1d01 commit 0deccb1
Show file tree
Hide file tree
Showing 18 changed files with 406 additions and 109 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
*.tfplan
.idea

__pycache__/
*/*/__pycache__
*/*/_trial_temp

Expand Down
188 changes: 188 additions & 0 deletions lambda/bulk-ods-update/bulk_ods_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import os
import tempfile
from datetime import date, timedelta
import calendar
import csv

import boto3

from utils.enums.trud import OdsDownloadType, TrudItem
from utils.models.ods_models import PracticeOds, IcbOds
from utils.services.trud_api_service import TrudApiService

import logging

from utils.trud_files import (
GP_FILE_HEADERS,
ICB_FILE_HEADERS,
ICB_MONTHLY_FILE_PATH,
ICB_QUARTERLY_FILE_PATH,
ICB_MONTHLY_FILE_NAME,
ICB_QUARTERLY_FILE_NAME,
GP_WEEKLY_FILE_NAME,
GP_WEEKLY_ZIP_FILE_PATH,
)

logger = logging.getLogger()
logger.setLevel(logging.INFO)

TEMP_DIR = tempfile.mkdtemp(dir="/tmp")


def lambda_handler(event, context):
download_type = determine_ods_manifest_download_type()
ssm = boto3.client("ssm")
trud_api_key_param = os.environ.get("TRUD_API_KEY_PARAM_NAME")
trud_api_key = ssm.get_parameter(trud_api_key_param) if trud_api_key_param else ""
trud_service = TrudApiService(
api_key=trud_api_key,
api_url=os.environ.get("TRUD_FHIR_API_URL_PARAM_NAME"),
)

extract_and_process_ods_gp_data(trud_service)

if download_type == OdsDownloadType.BOTH:
extract_and_process_ods_icb_data(trud_service)

return {"statusCode": 200}


def determine_ods_manifest_download_type() -> OdsDownloadType:
logger.info("Determining download type")
today = date.today()

total_days_in_month = calendar.monthrange(today.year, today.month)[1]
last_date_of_month = date(today.year, today.month, total_days_in_month)

last_sunday_of_month = last_date_of_month

while last_sunday_of_month.weekday() != 6:
last_sunday_of_month -= timedelta(days=1)

is_icb_download_date = today == last_sunday_of_month

if is_icb_download_date:
logger.info("Download type set to: GP and ICB")
return OdsDownloadType.BOTH

logger.info("Download type set to: GP")
return OdsDownloadType.GP


def extract_and_process_ods_gp_data(trud_service: TrudApiService):
logger.info("Extracting and processing ODS GP data")

gp_ods_releases = trud_service.get_release_list(
TrudItem.NHS_ODS_WEEKLY, is_latest=True
)

logger.info(gp_ods_releases)

download_file_bytes = trud_service.get_download_file(
gp_ods_releases[0].get("archiveFileUrl")
)

eppracur_csv_path = os.path.join(TEMP_DIR, GP_WEEKLY_FILE_NAME)

epraccur_zip_file = trud_service.unzipping_files(
download_file_bytes, GP_WEEKLY_ZIP_FILE_PATH, TEMP_DIR, True
)
trud_service.unzipping_files(epraccur_zip_file, GP_WEEKLY_FILE_NAME, TEMP_DIR)

gp_ods_data = trud_csv_to_dict(eppracur_csv_path, GP_FILE_HEADERS)
gp_ods_data_amended_data = get_amended_records(gp_ods_data)

if gp_ods_data_amended_data:
logger.info(
f"Found {len(gp_ods_data_amended_data)} amended GP data records to update"
)
compare_and_overwrite(OdsDownloadType.GP, gp_ods_data_amended_data)
return

logger.info("No amended GP data found")


def extract_and_process_ods_icb_data(trud_service: TrudApiService):
logger.info("Extracting and processing ODS ICB data")

icb_ods_releases = trud_service.get_release_list(
TrudItem.ORG_REF_DATA_MONTHLY, True
)

is_quarterly_release = icb_ods_releases[0].get("name").endswith(".0.0")
download_file = trud_service.get_download_file(
icb_ods_releases[0].get("archiveFileUrl")
)

icb_zip_file_path = (
ICB_MONTHLY_FILE_PATH if not is_quarterly_release else ICB_QUARTERLY_FILE_PATH
)
icb_csv_file_name = (
ICB_MONTHLY_FILE_NAME if not is_quarterly_release else ICB_QUARTERLY_FILE_NAME
)

icb_ods_data_amended_data = []
if icb_zip_file := trud_service.unzipping_files(
download_file, icb_zip_file_path, TEMP_DIR, True
):
if icb_csv_file := trud_service.unzipping_files(
icb_zip_file, icb_csv_file_name, TEMP_DIR
):
icb_ods_data = trud_csv_to_dict(icb_csv_file, ICB_FILE_HEADERS)
icb_ods_data_amended_data = get_amended_records(icb_ods_data)

if icb_ods_data_amended_data:
logger.info(
f"Found {len(icb_ods_data_amended_data)} amended ICB data records to update"
)
compare_and_overwrite(OdsDownloadType.ICB, icb_ods_data_amended_data)
return

logger.info("No amended ICB data found")


def get_amended_records(data: list[dict]) -> list[dict]:
return [
amended_data
for amended_data in data
if amended_data.get("AmendedRecordIndicator") == "1"
]


def trud_csv_to_dict(file_path: str, headers: list[str]) -> list[dict]:
with open(file_path, mode="r") as csv_file:
csv_reader = csv.DictReader(csv_file)
csv_reader.fieldnames = headers
data_list = []
for row in csv_reader:
data_list.append(dict(row))
return data_list


def compare_and_overwrite(download_type: OdsDownloadType, data: list[dict]):
if download_type == OdsDownloadType.GP:
logger.info("Comparing GP Practice data")
for amended_record in data:
try:
practice = PracticeOds(amended_record.get("PracticeOdsCode"))
practice.update(
actions=[
PracticeOds.practice_name.set(
amended_record.get("PracticeName")
),
PracticeOds.icb_ods_code.set(amended_record.get("IcbOdsCode")),
]
)
except Exception as e:
logger.info(
f"Failed to create/update record by Practice ODS code: {str(e)}"
)

if download_type == OdsDownloadType.ICB:
logger.info("Comparing ICB data")
for amended_record in data:
try:
icb = IcbOds(amended_record.get("IcbOdsCode"))
icb.update(actions=[IcbOds.icb_name.set(amended_record.get("IcbName"))])
except Exception as e:
logger.info(f"Failed to create/update record by ICB ODS code: {str(e)}")
1 change: 1 addition & 0 deletions lambda/mi-enrichment-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pynamodb==6.0.1
85 changes: 15 additions & 70 deletions stacks/gp-registrations-mi/scripts/get_latest_ods_csv.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,18 @@
import csv
import sys

from services.trud_api_service import TrudApiService, TrudItem

GP_FILE_HEADERS = [
"PracticeOdsCode",
"PracticeName",
"NG",
"HLHG",
"AD1",
"AD2",
"AD3",
"AD4",
"AD5",
"PostCode",
"OD",
"CD",
"Null1",
"Null2",
"IcbOdsCode",
"JPD",
"LPD",
"CTN",
"Null3",
"Null4",
"Null5",
"AM",
"Null6",
"GOR",
"Null7",
"Null8",
"Null9",
]

ICB_FILE_HEADERS = [
"IcbOdsCode",
"IcbName",
"NG",
"HLHG",
"AD1",
"AD2",
"AD3",
"AD4",
"AD5",
"PostCode",
"OD",
"CD",
"Null1",
"OSTC",
"Null2",
"Null3",
"Null4",
"Null5",
"Null6",
"Null7",
"Null8",
"AM",
"Null9",
"Null10",
"Null11",
"Null12",
"Null13",
]

ICB_MONTHLY_FILE_PATH = "eamendam.zip"
ICB_QUARTERLY_FILE_PATH = "ocsissue/data/eccg.zip"

ICB_MONTHLY_FILE_NAME = "eccgam.csv"
ICB_QUARTERLY_FILE_NAME = "eccg.csv"
from utils.enums.trud import TrudItem
from utils.services.trud_api_service import TrudApiService
from utils.trud_files import (
ICB_MONTHLY_FILE_PATH,
ICB_QUARTERLY_FILE_PATH,
ICB_MONTHLY_FILE_NAME,
ICB_QUARTERLY_FILE_NAME,
ICB_FILE_HEADERS,
GP_FILE_HEADERS,
GP_WEEKLY_FILE_NAME,
GP_WEEKLY_ZIP_FILE_PATH,
)


def create_modify_csv(
Expand Down Expand Up @@ -105,9 +50,9 @@ def get_gp_latest_ods_csv(service):
release_list_response[0].get("archiveFileUrl")
)
epraccur_zip_file = service.unzipping_files(
download_file, "Data/epraccur.zip", True
download_file, GP_WEEKLY_ZIP_FILE_PATH, byte=True
)
epraccur_csv_file = service.unzipping_files(epraccur_zip_file, "epraccur.csv")
epraccur_csv_file = service.unzipping_files(epraccur_zip_file, GP_WEEKLY_FILE_NAME)
create_modify_csv(
epraccur_csv_file,
"initial_full_gps_ods.csv",
Expand Down Expand Up @@ -144,7 +89,7 @@ def get_icb_latest_ods_csv(service):
)

if epraccur_zip_file := service.unzipping_files(
download_file, zip_file_path, True
download_file, zip_file_path, byte=True
):
if epraccur_csv_file := service.unzipping_files(
epraccur_zip_file, csv_file_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ resource "aws_lambda_function" "event_enrichment_lambda" {
ApplicationRole = "AwsLambdaFunction"
}
)
layers = [aws_lambda_layer_version.event_enrichment_lambda.arn]
layers = [aws_lambda_layer_version.mi_enrichment_lambda_layer.arn]
environment {
variables = {
SPLUNK_CLOUD_EVENT_UPLOADER_SQS_QUEUE_URL = aws_sqs_queue.incoming_mi_events_for_splunk_cloud_event_uploader.url,
Expand Down Expand Up @@ -54,11 +54,4 @@ resource "aws_cloudwatch_log_group" "event_enrichment_lambda" {
}
)
retention_in_days = 60
}

resource "aws_lambda_layer_version" "event_enrichment_lambda" {
filename = var.event_enrichment_lambda_layer_zip
layer_name = "${var.environment}-${var.event_enrichment_lambda_name}_layer"
compatible_runtimes = ["python3.12"]
compatible_architectures = ["x86_64"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ resource "aws_lambda_function" "ods_bulk_update" {
source_code_hash = filebase64sha256(var.bulk_ods_update_lambda_zip)
runtime = "python3.12"
timeout = 300
layers = [aws_lambda_layer_version.ods_bulk_update_lambda.arn]
layers = [aws_lambda_layer_version.mi_enrichment_lambda_layer.arn]
environment {
variables = {
TRUD_API_KEY_PARAM_NAME = data.aws_ssm_parameter.trud_api_key.name,
Expand Down Expand Up @@ -72,10 +72,3 @@ resource "aws_lambda_permission" "bulk_upload_metadata_schedule_permission" {
aws_lambda_function.ods_bulk_update,
]
}

resource "aws_lambda_layer_version" "ods_bulk_update_lambda" {
filename = var.event_enrichment_lambda_layer_zip
layer_name = "${var.environment}-${var.ods_bulk_update_lambda_name}_layer"
compatible_runtimes = ["python3.12"]
compatible_architectures = ["x86_64"]
}
6 changes: 6 additions & 0 deletions stacks/gp-registrations-mi/terraform/lambda_layer.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
resource "aws_lambda_layer_version" "mi_enrichment_lambda_layer" {
filename = var.mi_enrichment_lambda_layer_zip
layer_name = "${var.environment}_mi_enrichment_layer"
compatible_runtimes = ["python3.12"]
compatible_architectures = ["x86_64"]
}
20 changes: 7 additions & 13 deletions stacks/gp-registrations-mi/terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ variable "splunk_cloud_api_token_param_name" {
description = "SSM param containing splunk cloud api token to send MI events to"
}

variable "mi_enrichment_lambda_layer_zip" {
type = string
description = "Path to zipfile containing relevant packages for MI lambdas"
default = "../../../lambda/build/layers/mi-enrichment.zip"
}

variable "splunk_cloud_event_uploader_lambda_zip" {
type = string
description = "Path to zipfile containing lambda code for uploading events to splunk cloud"
Expand All @@ -96,22 +102,10 @@ variable "event_enrichment_lambda_zip" {
default = "../../../lambda/build/event-enrichment.zip"
}

variable "event_enrichment_lambda_layer_zip" {
type = string
description = "Path to zipfile containing lambda layer code for enriching MI events"
default = "../../../lambda/build/layer/event-enrichment.zip"
}

variable "bulk_ods_update_lambda_zip" {
type = string
description = "Path to zipfile containing lambda code for ODS update"
default = "placeholder_lambda_payload.zip"
}

variable "bulk_ods_update_lambda_layer_zip" {
type = string
description = "Path to zipfile containing lambda layer code for ODS update"
default = "placeholder_lambda_payload.zip"
default = "../../../lambda/build/bulk-ods-update.zip"
}

variable "s3_event_uploader_lambda_zip" {
Expand Down
Loading

0 comments on commit 0deccb1

Please sign in to comment.