Skip to content

Commit

Permalink
Include cohort partition in JSON dataset key prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Sep 1, 2023
1 parent 03abdda commit 5cd8b76
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 23 deletions.
44 changes: 34 additions & 10 deletions src/glue/jobs/s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ def write_file_to_json_dataset(
z: zipfile.ZipFile,
json_path: str,
dataset_identifier: str,
cohort: str,
metadata: dict,
workflow_run_properties: dict,
delete_upon_successful_upload: bool=True,
Expand All @@ -325,6 +326,7 @@ def write_file_to_json_dataset(
z (zipfile.Zipfile): The zip archive as provided by the data provider.
json_path (str): A JSON path relative to the root of `z`.
dataset_identifier (str): The data type of `json_path`.
cohort (str): The cohort which this data associates with.
metadata (dict): Metadata derived from the file basename.
workflow_run_properties (dict): The workflow arguments
delete_upon_successful_upload (bool): Whether to delete the local
Expand All @@ -337,7 +339,9 @@ def write_file_to_json_dataset(
list: A list of files uploaded to S3
"""
s3_client = boto3.client("s3")
os.makedirs(dataset_identifier, exist_ok=True)
part_dir = os.path.join(
f"dataset={dataset_identifier}", f"cohort={cohort}")
os.makedirs(part_dir, exist_ok=True)
s3_metadata = metadata.copy()
if s3_metadata["start_date"] is None:
s3_metadata.pop("start_date")
Expand All @@ -348,7 +352,7 @@ def write_file_to_json_dataset(
output_path = get_part_path(
metadata=metadata,
part_number=part_number,
dataset_identifier=dataset_identifier,
part_dir=part_dir,
touch=True
)
with z.open(json_path, "r") as input_json:
Expand All @@ -365,20 +369,19 @@ def write_file_to_json_dataset(
current_output_path = get_part_path(
metadata=metadata,
part_number=part_number,
dataset_identifier=dataset_identifier,
part_dir=part_dir,
touch=True
)
with open(current_output_path, "a") as f_out:
for transformed_record in transformed_block:
f_out.write("{}\n".format(json.dumps(transformed_record)))
uploaded_files = []
for part_file in os.listdir(dataset_identifier):
output_path = os.path.join(dataset_identifier, part_file)
for part_file in os.listdir(part_dir):
output_path = os.path.join(part_dir, part_file)
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
workflow_run_properties["json_prefix"],
f"dataset={dataset_identifier}",
part_file
output_path
)
logger.debug(
"Uploading %s to %s",
Expand All @@ -398,7 +401,11 @@ def write_file_to_json_dataset(
os.remove(output_path)
return uploaded_files

def get_part_path(metadata: dict, part_number: int, dataset_identifier: str, touch: bool):
def get_part_path(
metadata: dict,
part_number: int,
part_dir: str,
touch: bool,):
"""
A helper function for `write_file_to_json_dataset`
Expand All @@ -408,7 +415,7 @@ def get_part_path(metadata: dict, part_number: int, dataset_identifier: str, tou
Args:
metadata (dict): Metadata derived from the file basename.
part_number (int): Which part we need a file name for.
dataset_identifier (str): The data type of `json_path`.
part_dir (str): The directory to which we write the part file.
touch (bool): Whether to create an empty file at the part path
Returns:
Expand All @@ -422,8 +429,9 @@ def get_part_path(metadata: dict, part_number: int, dataset_identifier: str, tou
metadata=metadata,
part_number=part_number
)
output_path = os.path.join(dataset_identifier, output_filename)
output_path = os.path.join(part_dir, output_filename)
if touch:
os.makedirs(part_dir, exist_ok=True)
with open(output_path, "x") as initial_file:
# create file
pass
Expand Down Expand Up @@ -478,6 +486,7 @@ def get_metadata(basename: str) -> dict:

def process_record(
s3_obj: dict,
cohort: str,
workflow_run_properties: dict) -> None:
"""
Write the contents of a .zip archive stored on S3 to their respective
Expand All @@ -488,6 +497,7 @@ def process_record(
Args:
s3_obj (dict): An S3 object as returned by `boto3.get_object`.
cohort (str): The cohort which this data associates with.
workflow_run_properties (dict): The workflow arguments
Returns:
Expand All @@ -511,6 +521,7 @@ def process_record(
z=z,
json_path=json_path,
dataset_identifier=dataset_identifier,
cohort=cohort,
metadata=metadata,
workflow_run_properties=workflow_run_properties)

Expand Down Expand Up @@ -544,8 +555,21 @@ def main() -> None:
Key = message["source_key"]
)
s3_obj["Body"] = s3_obj["Body"].read()
cohort = None
if "adults_v1" in message["source_key"]:
cohort = "adults_v1"
elif "pediatric_v1" in message["source_key"]:
cohort = "pediatric_v1"
else:
logger.warning(
"Could not determine the cohort of object at %s"
"This file will not be written to a JSON dataset.",
f"s3://{message['source_bucket']}/{message['source_key']}. "
)
continue
process_record(
s3_obj=s3_obj,
cohort=cohort,
workflow_run_properties=workflow_run_properties
)

Expand Down
5 changes: 4 additions & 1 deletion src/lambda_function/s3_to_glue/events/generate_test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ def create_event(bucket: str, key: str, key_prefix: str, key_file: str) -> dict:
Bucket=bucket,
Prefix=key_prefix
)
test_data = [obj["Key"] for obj in test_objects["Contents"]]
test_data = [
obj["Key"] for obj in test_objects["Contents"]
if not obj["Key"].endswith("/")
]
else:
test_data = [key]
s3_event = {
Expand Down
26 changes: 14 additions & 12 deletions tests/test_s3_to_json.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import io
import json
import shutil
import zipfile
import datetime
from dateutil.tz import tzutc
Expand Down Expand Up @@ -453,12 +454,10 @@ def test_get_output_filename_subtype(self):
def test_write_file_to_json_dataset_delete_local_copy(self, s3_obj, namespace, monkeypatch):
monkeypatch.setattr("boto3.client", lambda x: MockAWSClient())
sample_metadata = {
"Metadata": {
"type": "HealthKitV2Samples",
"start_date": datetime.datetime(2022, 1, 12, 0, 0),
"end_date": datetime.datetime(2023, 1, 14, 0, 0),
"subtype": "Weight",
}
}
workflow_run_properties = {
"namespace": namespace,
Expand All @@ -470,22 +469,22 @@ def test_write_file_to_json_dataset_delete_local_copy(self, s3_obj, namespace, m
z=z,
json_path="HealthKitV2Samples_Weight_20230112-20230114.json",
dataset_identifier="HealthKitV2Samples",
metadata=sample_metadata["Metadata"],
cohort="adults_v1",
metadata=sample_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=True,
)
output_file = output_files[0]

assert not os.path.exists(output_file)
shutil.rmtree(f"dataset={sample_metadata['type']}")

def test_write_file_to_json_dataset_record_consistency(self, s3_obj, namespace, monkeypatch):
monkeypatch.setattr("boto3.client", lambda x: MockAWSClient())
sample_metadata = {
"Metadata": {
"type": "FitbitDevices",
"start_date": None,
"end_date": datetime.datetime(2023, 1, 14, 0, 0)
}
}
workflow_run_properties = {
"namespace": namespace,
Expand All @@ -499,8 +498,9 @@ def test_write_file_to_json_dataset_record_consistency(self, s3_obj, namespace,
output_files = s3_to_json.write_file_to_json_dataset(
z=z,
json_path="FitbitDevices_20230114.json",
dataset_identifier="FitbitDevices",
metadata=sample_metadata["Metadata"],
dataset_identifier=sample_metadata["type"],
cohort="adults_v1",
metadata=sample_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=False,
)
Expand All @@ -512,13 +512,13 @@ def test_write_file_to_json_dataset_record_consistency(self, s3_obj, namespace,
metadata = json.loads(json_line)
assert metadata["export_start_date"] is None
assert (
sample_metadata["Metadata"]["end_date"].isoformat()
sample_metadata["end_date"].isoformat()
== metadata["export_end_date"]
)
output_line_cnt += 1
# gets line count of input json and exported json and checks the two
assert input_line_cnt == output_line_cnt
os.remove(output_file)
shutil.rmtree(f"dataset={sample_metadata['type']}", ignore_errors=True)

def test_write_file_to_json_dataset_multiple_parts(self, s3_obj, namespace, monkeypatch):
monkeypatch.setattr("boto3.client", lambda x: MockAWSClient())
Expand All @@ -540,6 +540,7 @@ def test_write_file_to_json_dataset_multiple_parts(self, s3_obj, namespace, monk
z=z,
json_path=json_path,
dataset_identifier=sample_metadata["type"],
cohort="adults_v1",
metadata=sample_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=False,
Expand All @@ -552,6 +553,7 @@ def test_write_file_to_json_dataset_multiple_parts(self, s3_obj, namespace, monk
output_line_count += 1
os.remove(output_file)
assert input_line_cnt == output_line_count
shutil.rmtree(f"dataset={sample_metadata['type']}", ignore_errors=True)

def test_get_part_path_no_touch(self):
sample_metadata = {
Expand All @@ -562,7 +564,7 @@ def test_get_part_path_no_touch(self):
part_path = s3_to_json.get_part_path(
metadata=sample_metadata,
part_number=0,
dataset_identifier=sample_metadata["type"],
part_dir=sample_metadata["type"],
touch=False
)
assert part_path == "FitbitDevices/FitbitDevices_20230114.part0.ndjson"
Expand All @@ -576,11 +578,11 @@ def test_get_part_path_touch(self):
part_path = s3_to_json.get_part_path(
metadata=sample_metadata,
part_number=0,
dataset_identifier=sample_metadata["type"],
part_dir=sample_metadata["type"],
touch=True
)
assert os.path.exists(part_path)
os.remove(part_path)
shutil.rmtree(sample_metadata["type"], ignore_errors=True)

def test_get_metadata_startdate_enddate(self, json_file_basenames_dict):
basename = json_file_basenames_dict["HealthKitV2Samples_Deleted"]
Expand Down

0 comments on commit 5cd8b76

Please sign in to comment.