Skip to content

Commit

Permalink
Upload compressed JSON in S3 to JSON
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Aug 20, 2024
1 parent 1f47dfc commit 355ea44
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 112 deletions.
198 changes: 118 additions & 80 deletions src/glue/jobs/s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
(for example: json/dataset=EnrolledParticipants/) and only contains
files which share a similar schema.
"""

import datetime
import gzip
import io
import json
import logging
import os
import sys
import typing
import zipfile
from typing import Iterator, Optional

import boto3
import ecs_logging
Expand Down Expand Up @@ -409,9 +412,12 @@ def _transform_garmin_data_types(
return json_obj


def get_output_filename(metadata: dict, part_number: int) -> str:
def get_file_identifier(metadata: dict) -> str:
"""
Get a formatted file name.
Get an identifier for a file from the source file's metadata.
This function effectively reverse-engineers the process in `get_metadata`,
enabling us to reconstuct the source file's identifier.
The format depends on which metadata fields we have available to us.
Metadata fields we can potentially use:
Expand All @@ -428,33 +434,31 @@ def get_output_filename(metadata: dict, part_number: int) -> str:
str: A formatted file name.
"""
if metadata["type"] in DATA_TYPES_WITH_SUBTYPE:
output_fname = "{}_{}_{}-{}.part{}.ndjson".format(
identifier = "{}_{}_{}-{}".format(
metadata["type"],
metadata["subtype"],
metadata["start_date"].strftime("%Y%m%d"),
metadata["end_date"].strftime("%Y%m%d"),
part_number,
)
elif metadata["start_date"] is None:
output_fname = "{}_{}.part{}.ndjson".format(
metadata["type"], metadata["end_date"].strftime("%Y%m%d"), part_number
identifier = "{}_{}".format(
metadata["type"], metadata["end_date"].strftime("%Y%m%d")
)
else:
output_fname = "{}_{}-{}.part{}.ndjson".format(
identifier = "{}_{}-{}".format(
metadata["type"],
metadata["start_date"].strftime("%Y%m%d"),
metadata["end_date"].strftime("%Y%m%d"),
part_number,
)
return output_fname
return identifier


def transform_block(
input_json: typing.IO,
metadata: dict,
logger_context: dict = {},
block_size: int = 10000,
):
) -> Iterator[list]:
"""
A generator function which yields a block of transformed JSON records.
Expand Down Expand Up @@ -523,73 +527,89 @@ def write_file_to_json_dataset(
list: A list of files uploaded to S3
"""
# Configuration related to where we write our part files
part_dir = os.path.join(
output_dir = os.path.join(
f"dataset={metadata['type']}", f"cohort={metadata['cohort']}"
)
os.makedirs(part_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
part_number = 0
output_path = get_part_path(
metadata=metadata, part_number=part_number, part_dir=part_dir, touch=True
part_output_path = get_output_path(
metadata=metadata, part_number=part_number, output_dir=output_dir, touch=True
)
compressed_output_path = get_output_path(
metadata=metadata, part_number=None, output_dir=output_dir, touch=True
)

# We will attach file metadata to the uploaded S3 object
s3_metadata = _derive_str_metadata(metadata=metadata)
uploaded_files = []
with z.open(json_path, "r") as input_json:
current_output_path = output_path
line_count = 0
for transformed_block in transform_block(
input_json=input_json, metadata=metadata, logger_context=logger_context
):
current_file_size = os.path.getsize(current_output_path)
if current_file_size > file_size_limit:
# Upload completed part file
_upload_file_to_json_dataset(
file_path=current_output_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_output_path)

# Update output path to next part
part_number += 1
current_output_path = get_part_path(
metadata=metadata,
part_number=part_number,
part_dir=part_dir,
touch=True,
)
with open(current_output_path, "a") as f_out:
# Write block data to part file
with gzip.open(
compressed_output_path, "wt", encoding="utf-8"
) as f_compressed_out:
current_part_path = part_output_path
line_count = 0
for transformed_block in transform_block(
input_json=input_json, metadata=metadata, logger_context=logger_context
):
current_file_size = os.path.getsize(current_part_path)
if current_file_size > file_size_limit:
# Upload completed part file
_upload_file_to_json_dataset(
file_path=current_part_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_part_path)
# Update output path to next part
part_number += 1
current_part_path = get_output_path(
metadata=metadata,
part_number=part_number,
output_dir=output_dir,
touch=True,
)
# Write block data to both part file and compressed file
for transformed_record in transformed_block:
line_count += 1
f_out.write("{}\n".format(json.dumps(transformed_record)))
# Upload final block
_upload_file_to_json_dataset(
file_path=current_output_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_output_path)
logger_extra = dict(
merge_dicts(
logger_context,
{
"file.LineCount": line_count,
"event.kind": "metric",
"event.category": ["file"],
"event.type": ["info", "creation"],
"event.action": "list-file-properties",
"labels": {
k: v.isoformat() if isinstance(v, datetime.datetime) else v
for k, v in metadata.items()
record_with_newline = "{}\n".format(json.dumps(transformed_record))
with open(current_part_path, "a") as f_out:
f_out.write(record_with_newline)
f_compressed_out.write(record_with_newline)
# Upload final block
_upload_file_to_json_dataset(
file_path=current_part_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_part_path)
logger_extra = dict(
merge_dicts(
logger_context,
{
"file.LineCount": line_count,
"event.kind": "metric",
"event.category": ["file"],
"event.type": ["info", "creation"],
"event.action": "list-file-properties",
"labels": {
k: v.isoformat() if isinstance(v, datetime.datetime) else v
for k, v in metadata.items()
},
},
},
)
)
)
logger.info("Output file attributes", extra=logger_extra)
logger.info("Output file attributes", extra=logger_extra)
# Upload compressed file
_upload_file_to_json_dataset(
file_path=compressed_output_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
upload_to_compressed_s3_prefix=True,
)
uploaded_files.append(compressed_output_path)
return uploaded_files


Expand All @@ -615,6 +635,7 @@ def _upload_file_to_json_dataset(
s3_metadata: dict,
workflow_run_properties: dict,
delete_upon_successful_upload: bool,
upload_to_compressed_s3_prefix: bool = False,
) -> str:
"""
A helper function for `write_file_to_json_dataset` which handles
Expand All @@ -627,16 +648,27 @@ def _upload_file_to_json_dataset(
delete_upon_successful_upload (bool): Whether to delete the local
copy of the JSON file after uploading to S3. Set to False
during testing.
upload_to_compressed_s3_prefix (bool): Whether to upload this file
to the compressed JSON S3 prefix. Effectively, this substitutes
`workflow_run_properties['json_prefix']` with the string
"compressed_json".
Returns:
str: The S3 object key of the uploaded file.
"""
s3_client = boto3.client("s3")
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
workflow_run_properties["json_prefix"],
file_path,
)
if upload_to_compressed_s3_prefix:
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
"compressed_json",
file_path,
)
else:
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
workflow_run_properties["json_prefix"],
file_path,
)
basic_file_info = get_basic_file_info(file_path=file_path)
with open(file_path, "rb") as f_in:
response = s3_client.put_object(
Expand Down Expand Up @@ -701,23 +733,25 @@ def merge_dicts(x: dict, y: dict) -> typing.Generator:
yield (key, y[key])


def get_part_path(
def get_output_path(
metadata: dict,
part_number: int,
part_dir: str,
output_dir: str,
part_number: Optional[int],
touch: bool,
):
"""
A helper function for `write_file_to_json_dataset`
This function returns a part path where we can write data to. Optionally,
create empty file at path.
This function returns a file path where we can write data to. If a part
number is provided, we assume that this is a part file. Otherwise, we
assume that this is a gzip file.
Args:
metadata (dict): Metadata about the source JSON file.
part_number (int): Which part we need a file name for.
part_dir (str): The directory to which we write the part file.
touch (bool): Whether to create an empty file at the part path
output_dir (str): The directory to which we write the file.
part_number (int): Which part we need a file name for. Set to None if
this is the path to a gzip file.
touch (bool): Whether to create an empty file at the path
Returns:
str: A new part path
Expand All @@ -726,10 +760,14 @@ def get_part_path(
FileExistsError: If touch is True and a file already exists at
the part path.
"""
output_filename = get_output_filename(metadata=metadata, part_number=part_number)
output_path = os.path.join(part_dir, output_filename)
file_identifier = get_file_identifier(metadata=metadata)
if part_number is not None:
output_filename = f"{file_identifier}.part{part_number}.ndjson"
else:
output_filename = f"{file_identifier}.ndjson.gz"
output_path = os.path.join(output_dir, output_filename)
if touch:
os.makedirs(part_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
with open(output_path, "x") as initial_file:
# create file
pass
Expand Down
Loading

0 comments on commit 355ea44

Please sign in to comment.