diff --git a/src/glue/jobs/s3_to_json.py b/src/glue/jobs/s3_to_json.py index 7decb52..0e3002e 100644 --- a/src/glue/jobs/s3_to_json.py +++ b/src/glue/jobs/s3_to_json.py @@ -526,13 +526,14 @@ def write_file_to_json_dataset( Returns: list: A list of files uploaded to S3 """ - # Configuration related to where we write our part files + # Configuration related to where we write the JSON output_dir = os.path.join( f"dataset={metadata['type']}", f"cohort={metadata['cohort']}" ) os.makedirs(output_dir, exist_ok=True) part_number = 0 - part_output_path = get_output_path( + # we update this value as we write + current_part_path = get_output_path( metadata=metadata, part_number=part_number, output_dir=output_dir, touch=True ) compressed_output_path = get_output_path( @@ -541,75 +542,75 @@ def write_file_to_json_dataset( # We will attach file metadata to the uploaded S3 object s3_metadata = _derive_str_metadata(metadata=metadata) + line_count = 0 uploaded_files = [] - with z.open(json_path, "r") as input_json: - with gzip.open( - compressed_output_path, "wt", encoding="utf-8" - ) as f_compressed_out: - current_part_path = part_output_path - line_count = 0 - for transformed_block in transform_block( - input_json=input_json, metadata=metadata, logger_context=logger_context - ): - current_file_size = os.path.getsize(current_part_path) - if current_file_size > file_size_limit: - # Upload completed part file - _upload_file_to_json_dataset( - file_path=current_part_path, - s3_metadata=s3_metadata, - workflow_run_properties=workflow_run_properties, - delete_upon_successful_upload=delete_upon_successful_upload, - ) - uploaded_files.append(current_part_path) - # Update output path to next part - part_number += 1 - current_part_path = get_output_path( - metadata=metadata, - part_number=part_number, - output_dir=output_dir, - touch=True, - ) - # Write block data to both part file and compressed file - for transformed_record in transformed_block: - line_count += 1 - record_with_newline = "{}\n".format(json.dumps(transformed_record)) - with open(current_part_path, "a") as f_out: - f_out.write(record_with_newline) - f_compressed_out.write(record_with_newline) - # Upload final block - _upload_file_to_json_dataset( - file_path=current_part_path, - s3_metadata=s3_metadata, - workflow_run_properties=workflow_run_properties, - delete_upon_successful_upload=delete_upon_successful_upload, - ) - uploaded_files.append(current_part_path) - logger_extra = dict( - merge_dicts( - logger_context, - { - "file.LineCount": line_count, - "event.kind": "metric", - "event.category": ["file"], - "event.type": ["info", "creation"], - "event.action": "list-file-properties", - "labels": { - k: v.isoformat() if isinstance(v, datetime.datetime) else v - for k, v in metadata.items() - }, - }, + # fmt: off + # file_size_limit: + # Upload completed part file + _upload_file_to_json_dataset( + file_path=current_part_path, + s3_metadata=s3_metadata, + workflow_run_properties=workflow_run_properties, + delete_upon_successful_upload=delete_upon_successful_upload, ) - ) - logger.info("Output file attributes", extra=logger_extra) - # Upload compressed file - _upload_file_to_json_dataset( - file_path=compressed_output_path, - s3_metadata=s3_metadata, - workflow_run_properties=workflow_run_properties, - delete_upon_successful_upload=delete_upon_successful_upload, - upload_to_compressed_s3_prefix=True, - ) - uploaded_files.append(compressed_output_path) + uploaded_files.append(current_part_path) + # Update output path to next part + part_number += 1 + current_part_path = get_output_path( + metadata=metadata, + part_number=part_number, + output_dir=output_dir, + touch=True, + ) + # Write block data to both part file and compressed file + for transformed_record in transformed_block: + line_count += 1 + record_with_newline = "{}\n".format(json.dumps(transformed_record)) + f_out.write(record_with_newline) + f_compressed_out.write(record_with_newline) + # fmt: on + # Upload final part + _upload_file_to_json_dataset( + file_path=current_part_path, + s3_metadata=s3_metadata, + workflow_run_properties=workflow_run_properties, + delete_upon_successful_upload=delete_upon_successful_upload, + ) + uploaded_files.append(current_part_path) + logger_extra = dict( + merge_dicts( + logger_context, + { + "file.LineCount": line_count, + "event.kind": "metric", + "event.category": ["file"], + "event.type": ["info", "creation"], + "event.action": "list-file-properties", + "labels": { + k: v.isoformat() if isinstance(v, datetime.datetime) else v + for k, v in metadata.items() + }, + }, + ) + ) + logger.info("Output file attributes", extra=logger_extra) + # Upload compressed file + _upload_file_to_json_dataset( + file_path=compressed_output_path, + s3_metadata=s3_metadata, + workflow_run_properties=workflow_run_properties, + delete_upon_successful_upload=delete_upon_successful_upload, + upload_to_compressed_s3_prefix=True, + ) + uploaded_files.append(compressed_output_path) return uploaded_files