Skip to content

Commit

Permalink
Data versioning support (#99)
Browse files Browse the repository at this point in the history
* Data versioning

* transactions/study updates, PR cleanup

* line length

* upload error handling

* bugfix bonanza

* lint

* script now executable

* touched site upload URL

* updated metadata version mocks
  • Loading branch information
dogversioning authored Aug 14, 2023
1 parent 103e148 commit 69a7aaa
Show file tree
Hide file tree
Showing 24 changed files with 609 additions and 292 deletions.
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ repos:
# pre-commit's default_language_version, see
# https://pre-commit.com/#top_level-default_language_version
language_version: python3.9
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black", "--filter-files"]
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[project]
name = "aggregator"
requires-python = ">= 3.9"
version = "0.1.3"
# This project is designed to run on the AWS serverless application framework (SAM).
# The project dependencies are handled via AWS layers. These are only required for
Expand Down Expand Up @@ -44,7 +45,13 @@ test = [
dev = [
"bandit",
"black==22.12.0",
"isort==5.12.0",
"pre-commit",
"pylint",
"pycodestyle"
]

[tool.isort]
profile = "black"
src_paths = ["src", "tests"]
skip_glob = [".aws_sam"]
1 change: 0 additions & 1 deletion scripts/credential_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import sys

import boto3

from requests.auth import _basic_auth_str


Expand Down
1 change: 0 additions & 1 deletion scripts/cumulus_upload_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import argparse
import os
import sys

from pathlib import Path

import boto3
Expand Down
102 changes: 102 additions & 0 deletions scripts/migrate_versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env python3
""" Utility for adding versioning to an existing aggregator data store
This is a one time thing for us, so the CLI/Boto creds are not robust.
"""
import argparse
import io
import json

import boto3

UPLOAD_ROOT_BUCKETS = [
"archive",
"error",
"last_valid",
"latest",
"site_upload",
"study_metadata",
]


def _get_s3_data(key: str, bucket_name: str, client) -> dict:
"""Convenience class for retrieving a dict from S3"""
try:
bytes_buffer = io.BytesIO()
client.download_fileobj(Bucket=bucket_name, Key=key, Fileobj=bytes_buffer)
return json.loads(bytes_buffer.getvalue().decode())
except Exception: # pylint: disable=broad-except
return {}


def _put_s3_data(key: str, bucket_name: str, client, data: dict) -> None:
"""Convenience class for writing a dict to S3"""
b_data = io.BytesIO(json.dumps(data).encode())
client.upload_fileobj(Bucket=bucket_name, Key=key, Fileobj=b_data)


def _get_depth(d):
if isinstance(d, dict):
return 1 + (max(map(_get_depth, d.values())) if d else 0)
return 0


def migrate_bucket_versioning(bucket: str):
client = boto3.client("s3")
res = client.list_objects_v2(Bucket=bucket)
contents = res["Contents"]
moved_files = 0
for s3_file in contents:
if s3_file["Key"].split("/")[0] in UPLOAD_ROOT_BUCKETS:
key = s3_file["Key"]
key_array = key.split("/")
if len(key_array) == 5:
key_array.insert(4, "000")
new_key = "/".join(key_array)
client.copy({"Bucket": bucket, "Key": key}, bucket, new_key)
client.delete_object(Bucket=bucket, Key=key)
moved_files += 1
print(f"Moved {moved_files} uploads")
study_periods = _get_s3_data("metadata/study_periods.json", bucket, client)

if _get_depth(study_periods) == 3:
new_sp = {}
for site in study_periods:
new_sp[site] = {}
for study in study_periods[site]:
new_sp[site][study] = {}
new_sp[site][study]["000"] = study_periods[site][study]
new_sp[site][study]["000"].pop("version")
new_sp[site][study]["000"]["study_period_format_version"] = 2
# print(json.dumps(new_sp, indent=2))
_put_s3_data("metadata/study_periods.json", bucket, client, new_sp)
print("study_periods.json updated")
else:
print("study_periods.json does not need update")

transactions = _get_s3_data("metadata/transactions.json", bucket, client)
if _get_depth(transactions) == 4:
new_t = {}
for site in transactions:
new_t[site] = {}
for study in transactions[site]:
new_t[site][study] = {}
for dp in transactions[site][study]:
new_t[site][study][dp] = {}
new_t[site][study][dp]["000"] = transactions[site][study][dp]
new_t[site][study][dp]["000"].pop("version")
new_t[site][study][dp]["000"]["transacton_format_version"] = 2
# print(json.dumps(new_t, indent=2))
_put_s3_data("metadata/transactions.json", bucket, client, new_t)
print("transactions.json updated")
else:
print("transactions.json does not need update")


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""Util for migrating aggregator data"""
)
parser.add_argument("-b", "--bucket", help="bucket name")
args = parser.parse_args()
migrate_bucket_versioning(args.bucket)
8 changes: 3 additions & 5 deletions src/handlers/dashboard/get_chart_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
"""
import os

from typing import List, Dict

import awswrangler
import boto3
import pandas
Expand All @@ -15,7 +13,7 @@
from src.handlers.shared.functions import http_response


def _get_table_cols(table_name: str) -> List:
def _get_table_cols(table_name: str) -> list:
"""Returns the columns associated with a table.
Since running an athena query takes a decent amount of time due to queueing
Expand All @@ -34,7 +32,7 @@ def _get_table_cols(table_name: str) -> List:
return next(s3_iter).decode().split(",")


def _build_query(query_params: Dict, filters: List, path_params: Dict) -> str:
def _build_query(query_params: dict, filters: list, path_params: dict) -> str:
"""Creates a query from the dashboard API spec"""
table = path_params["subscription_name"]
columns = _get_table_cols(table)
Expand Down Expand Up @@ -67,7 +65,7 @@ def _build_query(query_params: Dict, filters: List, path_params: Dict) -> str:
return query_str


def _format_payload(df: pandas.DataFrame, query_params: Dict, filters: List) -> Dict:
def _format_payload(df: pandas.DataFrame, query_params: dict, filters: list) -> dict:
"""Coerces query results into the return format defined by the dashboard"""
payload = {}
payload["column"] = query_params["column"]
Expand Down
15 changes: 10 additions & 5 deletions src/handlers/shared/awswrangler_functions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
""" functions specifically requiring AWSWranger, which requires a lambda layer"""
import awswrangler

from src.handlers.shared.enums import BucketPath


def get_s3_data_package_list(
bucket_root: str,
Expand All @@ -9,23 +11,26 @@ def get_s3_data_package_list(
data_package: str,
extension: str = "parquet",
):
"""Retrieves a list of data packages for a given S3 path post-upload proceesing"""
"""Retrieves a list of data packages for a given S3 path post-upload processing"""
return awswrangler.s3.list_objects(
path=f"s3://{s3_bucket_name}/{bucket_root}/{study}/{data_package}",
path=f"s3://{s3_bucket_name}/{bucket_root}/{study}/{study}__{data_package}/",
suffix=extension,
)


def get_s3_study_meta_list(
bucket_root: str,
s3_bucket_name: str,
study: str,
data_package: str,
site: str,
version: str,
extension: str = "parquet",
):
"""Retrieves a list of data packages for a given S3 path post-upload proceesing"""
"""Retrieves metadata associated with a given upload"""
return awswrangler.s3.list_objects(
path=f"s3://{s3_bucket_name}/{bucket_root}/{study}/{data_package}/{site}",
path=(
f"s3://{bucket_root}/{BucketPath.STUDY_META.value}/{study}/"
f"{study}__{data_package}/{site}/{version}"
),
suffix=extension,
)
39 changes: 23 additions & 16 deletions src/handlers/shared/functions.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
""" Functions used across different lambdas"""
import io
import logging
import json

from typing import Dict, Optional
import logging
from datetime import datetime, timezone
from typing import Optional

import boto3

from src.handlers.shared.enums import BucketPath, JsonFilename

TRANSACTION_METADATA_TEMPLATE = {
"version": "1.0",
"transacton_format_version": "2",
"last_upload": None,
"last_data_update": None,
"last_aggregation": None,
"last_error": None,
"deleted": None,
}
STUDY_PERIOD_METADATA_TEMPLATE = {
"version": "1.0",
"study_period_format_version": "2",
"earliest_date": None,
"latest_date": None,
"last_data_update": None,
}


def http_response(status: int, body: str, allow_cors: bool = False) -> Dict:
def http_response(status: int, body: str, allow_cors: bool = False) -> dict:
"""Generates the payload AWS lambda expects as a return value"""
headers = {"Content-Type": "application/json"}
if allow_cors:
Expand Down Expand Up @@ -57,7 +56,7 @@ def check_meta_type(meta_type: str) -> None:

def read_metadata(
s3_client, s3_bucket_name: str, meta_type: str = JsonFilename.TRANSACTIONS.value
) -> Dict:
) -> dict:
"""Reads transaction information from an s3 bucket as a dictionary"""
check_meta_type(meta_type)
s3_path = f"{BucketPath.META.value}/{meta_type}.json"
Expand All @@ -71,38 +70,46 @@ def read_metadata(


def update_metadata(
metadata: Dict,
metadata: dict,
site: str,
study: str,
data_package: str,
version: str,
target: str,
dt: Optional[datetime] = None,
meta_type: str = JsonFilename.TRANSACTIONS.value,
):
"""Safely updates items in metadata dictionary"""
"""Safely updates items in metadata dictionary
It's assumed that, other than the version field itself, every item in one
of these metadata dicts is a datetime corresponding to an S3 event timestamp
"""
check_meta_type(meta_type)
if meta_type == JsonFilename.TRANSACTIONS.value:
site_metadata = metadata.setdefault(site, {})
study_metadata = site_metadata.setdefault(study, {})
data_package_metadata = study_metadata.setdefault(
data_package, TRANSACTION_METADATA_TEMPLATE
data_package_metadata = study_metadata.setdefault(data_package, {})
data_version_metadata = data_package_metadata.setdefault(
version, TRANSACTION_METADATA_TEMPLATE
)
dt = dt or datetime.now(timezone.utc)
data_package_metadata[target] = dt.isoformat()
data_version_metadata[target] = dt.isoformat()
elif meta_type == JsonFilename.STUDY_PERIODS.value:
site_metadata = metadata.setdefault(site, {})
study_period_metadata = site_metadata.setdefault(
study, STUDY_PERIOD_METADATA_TEMPLATE
study_period_metadata = site_metadata.setdefault(study, {})
data_version_metadata = study_period_metadata.setdefault(
version, STUDY_PERIOD_METADATA_TEMPLATE
)
dt = dt or datetime.now(timezone.utc)
study_period_metadata[target] = dt.isoformat()
data_version_metadata[target] = dt.isoformat()
return metadata


def write_metadata(
s3_client,
s3_bucket_name: str,
metadata: Dict,
metadata: dict,
meta_type: str = JsonFilename.TRANSACTIONS.value,
) -> None:
"""Writes transaction info from ∏a dictionary to an s3 bucket metadata location"""
Expand Down
14 changes: 13 additions & 1 deletion src/handlers/site_upload/fetch_upload_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,21 @@ def upload_url_handler(event, context):
)
user = event["requestContext"]["authorizer"]["principalId"]
body = json.loads(event["body"])
for key in ["study", "data_package", "filename"]:
if body[key] is None:
return http_response(
400,
"Malformed data payload. See "
"https://docs.smarthealthit.org/cumulus/library/sharing-data.html "
"for more information about uploading data.",
)
if "data_package_version" in body:
version = body["data_package_version"]
else:
version = "0"
res = create_presigned_post(
os.environ.get("BUCKET_NAME"),
f"{BucketPath.UPLOAD.value}/{body['study']}/{body['data_package']}/"
f"{metadata_db[user]['path']}/{body['filename']}",
f"{metadata_db[user]['path']}/{int(version):03d}/{body['filename']}",
)
return res
Loading

0 comments on commit 69a7aaa

Please sign in to comment.