diff --git a/src/glue/jobs/s3_to_json.py b/src/glue/jobs/s3_to_json.py index 7d01669b..0e3002e3 100644 --- a/src/glue/jobs/s3_to_json.py +++ b/src/glue/jobs/s3_to_json.py @@ -6,7 +6,9 @@ (for example: json/dataset=EnrolledParticipants/) and only contains files which share a similar schema. """ + import datetime +import gzip import io import json import logging @@ -14,6 +16,7 @@ import sys import typing import zipfile +from typing import Iterator, Optional import boto3 import ecs_logging @@ -409,9 +412,12 @@ def _transform_garmin_data_types( return json_obj -def get_output_filename(metadata: dict, part_number: int) -> str: +def get_file_identifier(metadata: dict) -> str: """ - Get a formatted file name. + Get an identifier for a file from the source file's metadata. + + This function effectively reverse-engineers the process in `get_metadata`, + enabling us to reconstuct the source file's identifier. The format depends on which metadata fields we have available to us. Metadata fields we can potentially use: @@ -428,25 +434,23 @@ def get_output_filename(metadata: dict, part_number: int) -> str: str: A formatted file name. """ if metadata["type"] in DATA_TYPES_WITH_SUBTYPE: - output_fname = "{}_{}_{}-{}.part{}.ndjson".format( + identifier = "{}_{}_{}-{}".format( metadata["type"], metadata["subtype"], metadata["start_date"].strftime("%Y%m%d"), metadata["end_date"].strftime("%Y%m%d"), - part_number, ) elif metadata["start_date"] is None: - output_fname = "{}_{}.part{}.ndjson".format( - metadata["type"], metadata["end_date"].strftime("%Y%m%d"), part_number + identifier = "{}_{}".format( + metadata["type"], metadata["end_date"].strftime("%Y%m%d") ) else: - output_fname = "{}_{}-{}.part{}.ndjson".format( + identifier = "{}_{}-{}".format( metadata["type"], metadata["start_date"].strftime("%Y%m%d"), metadata["end_date"].strftime("%Y%m%d"), - part_number, ) - return output_fname + return identifier def transform_block( @@ -454,7 +458,7 @@ def transform_block( metadata: dict, logger_context: dict = {}, block_size: int = 10000, -): +) -> Iterator[list]: """ A generator function which yields a block of transformed JSON records. @@ -522,74 +526,91 @@ def write_file_to_json_dataset( Returns: list: A list of files uploaded to S3 """ - # Configuration related to where we write our part files - part_dir = os.path.join( + # Configuration related to where we write the JSON + output_dir = os.path.join( f"dataset={metadata['type']}", f"cohort={metadata['cohort']}" ) - os.makedirs(part_dir, exist_ok=True) + os.makedirs(output_dir, exist_ok=True) part_number = 0 - output_path = get_part_path( - metadata=metadata, part_number=part_number, part_dir=part_dir, touch=True + # we update this value as we write + current_part_path = get_output_path( + metadata=metadata, part_number=part_number, output_dir=output_dir, touch=True + ) + compressed_output_path = get_output_path( + metadata=metadata, part_number=None, output_dir=output_dir, touch=True ) # We will attach file metadata to the uploaded S3 object s3_metadata = _derive_str_metadata(metadata=metadata) + line_count = 0 uploaded_files = [] - with z.open(json_path, "r") as input_json: - current_output_path = output_path - line_count = 0 + # fmt: off + # file_size_limit: # Upload completed part file _upload_file_to_json_dataset( - file_path=current_output_path, + file_path=current_part_path, s3_metadata=s3_metadata, workflow_run_properties=workflow_run_properties, delete_upon_successful_upload=delete_upon_successful_upload, ) - uploaded_files.append(current_output_path) - + uploaded_files.append(current_part_path) # Update output path to next part part_number += 1 - current_output_path = get_part_path( + current_part_path = get_output_path( metadata=metadata, part_number=part_number, - part_dir=part_dir, + output_dir=output_dir, touch=True, ) - with open(current_output_path, "a") as f_out: - # Write block data to part file - for transformed_record in transformed_block: - line_count += 1 - f_out.write("{}\n".format(json.dumps(transformed_record))) - # Upload final block - _upload_file_to_json_dataset( - file_path=current_output_path, - s3_metadata=s3_metadata, - workflow_run_properties=workflow_run_properties, - delete_upon_successful_upload=delete_upon_successful_upload, - ) - uploaded_files.append(current_output_path) - logger_extra = dict( - merge_dicts( - logger_context, - { - "file.LineCount": line_count, - "event.kind": "metric", - "event.category": ["file"], - "event.type": ["info", "creation"], - "event.action": "list-file-properties", - "labels": { - k: v.isoformat() if isinstance(v, datetime.datetime) else v - for k, v in metadata.items() - }, + # Write block data to both part file and compressed file + for transformed_record in transformed_block: + line_count += 1 + record_with_newline = "{}\n".format(json.dumps(transformed_record)) + f_out.write(record_with_newline) + f_compressed_out.write(record_with_newline) + # fmt: on + # Upload final part + _upload_file_to_json_dataset( + file_path=current_part_path, + s3_metadata=s3_metadata, + workflow_run_properties=workflow_run_properties, + delete_upon_successful_upload=delete_upon_successful_upload, + ) + uploaded_files.append(current_part_path) + logger_extra = dict( + merge_dicts( + logger_context, + { + "file.LineCount": line_count, + "event.kind": "metric", + "event.category": ["file"], + "event.type": ["info", "creation"], + "event.action": "list-file-properties", + "labels": { + k: v.isoformat() if isinstance(v, datetime.datetime) else v + for k, v in metadata.items() }, - ) + }, ) - logger.info("Output file attributes", extra=logger_extra) + ) + logger.info("Output file attributes", extra=logger_extra) + # Upload compressed file + _upload_file_to_json_dataset( + file_path=compressed_output_path, + s3_metadata=s3_metadata, + workflow_run_properties=workflow_run_properties, + delete_upon_successful_upload=delete_upon_successful_upload, + upload_to_compressed_s3_prefix=True, + ) + uploaded_files.append(compressed_output_path) return uploaded_files @@ -615,6 +636,7 @@ def _upload_file_to_json_dataset( s3_metadata: dict, workflow_run_properties: dict, delete_upon_successful_upload: bool, + upload_to_compressed_s3_prefix: bool = False, ) -> str: """ A helper function for `write_file_to_json_dataset` which handles @@ -627,16 +649,27 @@ def _upload_file_to_json_dataset( delete_upon_successful_upload (bool): Whether to delete the local copy of the JSON file after uploading to S3. Set to False during testing. + upload_to_compressed_s3_prefix (bool): Whether to upload this file + to the compressed JSON S3 prefix. Effectively, this substitutes + `workflow_run_properties['json_prefix']` with the string + "compressed_json". Returns: str: The S3 object key of the uploaded file. """ s3_client = boto3.client("s3") - s3_output_key = os.path.join( - workflow_run_properties["namespace"], - workflow_run_properties["json_prefix"], - file_path, - ) + if upload_to_compressed_s3_prefix: + s3_output_key = os.path.join( + workflow_run_properties["namespace"], + "compressed_json", + file_path, + ) + else: + s3_output_key = os.path.join( + workflow_run_properties["namespace"], + workflow_run_properties["json_prefix"], + file_path, + ) basic_file_info = get_basic_file_info(file_path=file_path) with open(file_path, "rb") as f_in: response = s3_client.put_object( @@ -701,23 +734,25 @@ def merge_dicts(x: dict, y: dict) -> typing.Generator: yield (key, y[key]) -def get_part_path( +def get_output_path( metadata: dict, - part_number: int, - part_dir: str, + output_dir: str, + part_number: Optional[int], touch: bool, -): +) -> str: """ A helper function for `write_file_to_json_dataset` - This function returns a part path where we can write data to. Optionally, - create empty file at path. + This function returns a file path where we can write data to. If a part + number is provided, we assume that this is a part file. Otherwise, we + assume that this is a gzip file. Args: metadata (dict): Metadata about the source JSON file. - part_number (int): Which part we need a file name for. - part_dir (str): The directory to which we write the part file. - touch (bool): Whether to create an empty file at the part path + output_dir (str): The directory to which we write the file. + part_number (int): Which part we need a file name for. Set to None if + this is the path to a gzip file. + touch (bool): Whether to create an empty file at the path Returns: str: A new part path @@ -726,10 +761,14 @@ def get_part_path( FileExistsError: If touch is True and a file already exists at the part path. """ - output_filename = get_output_filename(metadata=metadata, part_number=part_number) - output_path = os.path.join(part_dir, output_filename) + file_identifier = get_file_identifier(metadata=metadata) + if part_number is not None: + output_filename = f"{file_identifier}.part{part_number}.ndjson" + else: + output_filename = f"{file_identifier}.ndjson.gz" + output_path = os.path.join(output_dir, output_filename) if touch: - os.makedirs(part_dir, exist_ok=True) + os.makedirs(output_dir, exist_ok=True) with open(output_path, "x") as initial_file: # create file pass diff --git a/tests/test_s3_to_json.py b/tests/test_s3_to_json.py index 5cede403..d98b460c 100644 --- a/tests/test_s3_to_json.py +++ b/tests/test_s3_to_json.py @@ -306,31 +306,19 @@ def test_transform_block_non_empty_file_all_blocks(self, s3_obj, sample_metadata # Should be 12 assert counter == record_count - def test_get_output_filename_generic(self, sample_metadata): - output_filename = s3_to_json.get_output_filename( - metadata=sample_metadata, part_number=0 - ) - assert ( - output_filename - == f"{sample_metadata['type']}_20220112-20230114.part0.ndjson" - ) + def test_get_file_identifier_generic(self, sample_metadata): + file_identifier = s3_to_json.get_file_identifier(metadata=sample_metadata) + assert file_identifier == f"{sample_metadata['type']}_20220112-20230114" - def test_get_output_filename_no_start_date(self, sample_metadata): + def test_get_file_identifier_no_start_date(self, sample_metadata): sample_metadata["start_date"] = None - output_filename = s3_to_json.get_output_filename( - metadata=sample_metadata, part_number=0 - ) - assert output_filename == f"{sample_metadata['type']}_20230114.part0.ndjson" + file_identifier = s3_to_json.get_file_identifier(metadata=sample_metadata) + assert file_identifier == f"{sample_metadata['type']}_20230114" - def test_get_output_filename_subtype(self, sample_metadata): + def test_get_file_identifier_subtype(self, sample_metadata): sample_metadata["type"] = "HealthKitV2Samples" - output_filename = s3_to_json.get_output_filename( - metadata=sample_metadata, part_number=0 - ) - assert ( - output_filename - == "HealthKitV2Samples_FakeSubtype_20220112-20230114.part0.ndjson" - ) + file_identifier = s3_to_json.get_file_identifier(metadata=sample_metadata) + assert file_identifier == "HealthKitV2Samples_FakeSubtype_20220112-20230114" def test_upload_file_to_json_dataset_delete_local_copy( self, namespace, sample_metadata, monkeypatch, shared_datadir @@ -391,10 +379,46 @@ def test_upload_file_to_json_dataset_s3_key( assert s3_key == correct_s3_key shutil.rmtree(temp_dir) + def test_upload_file_to_json_dataset_compressed_s3_key( + self, namespace, monkeypatch, shared_datadir + ): + monkeypatch.setattr("boto3.client", lambda x: MockAWSClient()) + sample_metadata = { + "type": "HealthKitV2Samples", + "subtype": "Weight", + } + workflow_run_properties = { + "namespace": namespace, + "json_prefix": "raw-json", + "json_bucket": "json-bucket", + } + original_file_path = os.path.join( + shared_datadir, "2023-01-13T21--08--51Z_TESTDATA" + ) + temp_dir = f"dataset={sample_metadata['type']}" + os.makedirs(temp_dir) + new_file_path = shutil.copy(original_file_path, temp_dir) + s3_key = s3_to_json._upload_file_to_json_dataset( + file_path=new_file_path, + s3_metadata=sample_metadata, + workflow_run_properties=workflow_run_properties, + delete_upon_successful_upload=True, + upload_to_compressed_s3_prefix=True, + ) + correct_s3_key = os.path.join( + workflow_run_properties["namespace"], + "compressed_json", + new_file_path, + ) + assert s3_key == correct_s3_key + shutil.rmtree(temp_dir) + def test_write_file_to_json_dataset_delete_local_copy( self, s3_obj, sample_metadata, namespace, monkeypatch ): - sample_metadata["type"] = "HealthKitV2Samples" + sample_metadata["type"] = "FitbitDevices" + sample_metadata["subtype"] = None + sample_metadata["start_date"] = None monkeypatch.setattr("boto3.client", lambda x: MockAWSClient()) workflow_run_properties = { "namespace": namespace, @@ -404,7 +428,7 @@ def test_write_file_to_json_dataset_delete_local_copy( with zipfile.ZipFile(io.BytesIO(s3_obj["Body"])) as z: output_files = s3_to_json.write_file_to_json_dataset( z=z, - json_path="HealthKitV2Samples_Weight_20230112-20230114.json", + json_path="FitbitDevices_20230114.json", metadata=sample_metadata, workflow_run_properties=workflow_run_properties, delete_upon_successful_upload=True, @@ -474,7 +498,8 @@ def test_write_file_to_json_dataset_multiple_parts( file_size_limit=1e6, ) output_line_count = 0 - for output_file in output_files: + # We only want to examine part files, so don't include the compressed file + for output_file in output_files[:-1]: with open(output_file, "r") as f_out: for json_line in f_out: output_line_count += 1 @@ -487,24 +512,35 @@ def test_derive_str_metadata(self, sample_metadata): assert isinstance(str_metadata["start_date"], str) assert isinstance(str_metadata["end_date"], str) - def test_get_part_path_no_touch(self, sample_metadata): + def test_get_output_path_no_touch(self, sample_metadata): sample_metadata["start_date"] = None - part_path = s3_to_json.get_part_path( + output_path = s3_to_json.get_output_path( metadata=sample_metadata, part_number=0, - part_dir=sample_metadata["type"], + output_dir=sample_metadata["type"], touch=False, ) - assert part_path == "FitbitDevices/FitbitDevices_20230114.part0.ndjson" + assert output_path == "FitbitDevices/FitbitDevices_20230114.part0.ndjson" - def test_get_part_path_touch(self, sample_metadata): - part_path = s3_to_json.get_part_path( + def test_get_output_path_touch(self, sample_metadata): + output_path = s3_to_json.get_output_path( metadata=sample_metadata, part_number=0, - part_dir=sample_metadata["type"], + output_dir=sample_metadata["type"], + touch=True, + ) + assert os.path.exists(output_path) + shutil.rmtree(sample_metadata["type"], ignore_errors=True) + + def test_get_output_path_gzip(self, sample_metadata): + sample_metadata["start_date"] = None + output_path = s3_to_json.get_output_path( + metadata=sample_metadata, + part_number=None, + output_dir=sample_metadata["type"], touch=True, ) - assert os.path.exists(part_path) + assert output_path == "FitbitDevices/FitbitDevices_20230114.ndjson.gz" shutil.rmtree(sample_metadata["type"], ignore_errors=True) def test_get_metadata_startdate_enddate(self, json_file_basenames_dict):