Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETL-673] Upload compressed JSON as part of S3 to JSON #135

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 108 additions & 69 deletions src/glue/jobs/s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
(for example: json/dataset=EnrolledParticipants/) and only contains
files which share a similar schema.
"""

import datetime
import gzip
import io
import json
import logging
import os
import sys
import typing
import zipfile
from typing import Iterator, Optional

import boto3
import ecs_logging
Expand Down Expand Up @@ -409,9 +412,12 @@ def _transform_garmin_data_types(
return json_obj


def get_output_filename(metadata: dict, part_number: int) -> str:
def get_file_identifier(metadata: dict) -> str:
"""
Get a formatted file name.
Get an identifier for a file from the source file's metadata.

This function effectively reverse-engineers the process in `get_metadata`,
enabling us to reconstuct the source file's identifier.

The format depends on which metadata fields we have available to us.
Metadata fields we can potentially use:
Expand All @@ -428,33 +434,31 @@ def get_output_filename(metadata: dict, part_number: int) -> str:
str: A formatted file name.
"""
if metadata["type"] in DATA_TYPES_WITH_SUBTYPE:
output_fname = "{}_{}_{}-{}.part{}.ndjson".format(
identifier = "{}_{}_{}-{}".format(
metadata["type"],
metadata["subtype"],
metadata["start_date"].strftime("%Y%m%d"),
metadata["end_date"].strftime("%Y%m%d"),
part_number,
)
elif metadata["start_date"] is None:
output_fname = "{}_{}.part{}.ndjson".format(
metadata["type"], metadata["end_date"].strftime("%Y%m%d"), part_number
identifier = "{}_{}".format(
metadata["type"], metadata["end_date"].strftime("%Y%m%d")
)
else:
output_fname = "{}_{}-{}.part{}.ndjson".format(
identifier = "{}_{}-{}".format(
metadata["type"],
metadata["start_date"].strftime("%Y%m%d"),
metadata["end_date"].strftime("%Y%m%d"),
part_number,
)
return output_fname
return identifier


def transform_block(
input_json: typing.IO,
metadata: dict,
logger_context: dict = {},
block_size: int = 10000,
):
) -> Iterator[list]:
"""
A generator function which yields a block of transformed JSON records.

Expand Down Expand Up @@ -522,74 +526,91 @@ def write_file_to_json_dataset(
Returns:
list: A list of files uploaded to S3
"""
# Configuration related to where we write our part files
part_dir = os.path.join(
# Configuration related to where we write the JSON
output_dir = os.path.join(
f"dataset={metadata['type']}", f"cohort={metadata['cohort']}"
)
os.makedirs(part_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
part_number = 0
output_path = get_part_path(
metadata=metadata, part_number=part_number, part_dir=part_dir, touch=True
# we update this value as we write
current_part_path = get_output_path(
metadata=metadata, part_number=part_number, output_dir=output_dir, touch=True
)
compressed_output_path = get_output_path(
metadata=metadata, part_number=None, output_dir=output_dir, touch=True
)

# We will attach file metadata to the uploaded S3 object
s3_metadata = _derive_str_metadata(metadata=metadata)
line_count = 0
uploaded_files = []
with z.open(json_path, "r") as input_json:
current_output_path = output_path
line_count = 0
# fmt: off
# <python3.10 requires this backslash syntax, we currently run 3.7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, we do have a ticket to update to python 3.10, do you think we should do that sooner than later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we may end up abandoning Glue altogether so it doesn't make sense spending time upgrading python just to avoid these minor inconveniences.

with z.open(json_path, "r") as input_json, \
open(current_part_path, "a") as f_out, \
gzip.open(compressed_output_path, "wt", encoding="utf-8") as f_compressed_out:
for transformed_block in transform_block(
input_json=input_json, metadata=metadata, logger_context=logger_context
):
current_file_size = os.path.getsize(current_output_path)
current_file_size = os.path.getsize(current_part_path)
if current_file_size > file_size_limit:
# Upload completed part file
_upload_file_to_json_dataset(
file_path=current_output_path,
file_path=current_part_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_output_path)

uploaded_files.append(current_part_path)
# Update output path to next part
part_number += 1
current_output_path = get_part_path(
current_part_path = get_output_path(
metadata=metadata,
part_number=part_number,
part_dir=part_dir,
output_dir=output_dir,
touch=True,
)
with open(current_output_path, "a") as f_out:
# Write block data to part file
for transformed_record in transformed_block:
line_count += 1
f_out.write("{}\n".format(json.dumps(transformed_record)))
# Upload final block
_upload_file_to_json_dataset(
file_path=current_output_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_output_path)
logger_extra = dict(
merge_dicts(
logger_context,
{
"file.LineCount": line_count,
"event.kind": "metric",
"event.category": ["file"],
"event.type": ["info", "creation"],
"event.action": "list-file-properties",
"labels": {
k: v.isoformat() if isinstance(v, datetime.datetime) else v
for k, v in metadata.items()
},
# Write block data to both part file and compressed file
for transformed_record in transformed_block:
line_count += 1
record_with_newline = "{}\n".format(json.dumps(transformed_record))
f_out.write(record_with_newline)
f_compressed_out.write(record_with_newline)
# fmt: on
# Upload final part
_upload_file_to_json_dataset(
file_path=current_part_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_part_path)
logger_extra = dict(
merge_dicts(
logger_context,
{
"file.LineCount": line_count,
"event.kind": "metric",
"event.category": ["file"],
"event.type": ["info", "creation"],
"event.action": "list-file-properties",
"labels": {
k: v.isoformat() if isinstance(v, datetime.datetime) else v
for k, v in metadata.items()
},
)
},
)
logger.info("Output file attributes", extra=logger_extra)
)
logger.info("Output file attributes", extra=logger_extra)
# Upload compressed file
_upload_file_to_json_dataset(
file_path=compressed_output_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
upload_to_compressed_s3_prefix=True,
)
uploaded_files.append(compressed_output_path)
return uploaded_files


Expand All @@ -615,6 +636,7 @@ def _upload_file_to_json_dataset(
s3_metadata: dict,
workflow_run_properties: dict,
delete_upon_successful_upload: bool,
upload_to_compressed_s3_prefix: bool = False,
) -> str:
"""
A helper function for `write_file_to_json_dataset` which handles
Expand All @@ -627,16 +649,27 @@ def _upload_file_to_json_dataset(
delete_upon_successful_upload (bool): Whether to delete the local
copy of the JSON file after uploading to S3. Set to False
during testing.
upload_to_compressed_s3_prefix (bool): Whether to upload this file
to the compressed JSON S3 prefix. Effectively, this substitutes
`workflow_run_properties['json_prefix']` with the string
"compressed_json".

Returns:
str: The S3 object key of the uploaded file.
"""
s3_client = boto3.client("s3")
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
workflow_run_properties["json_prefix"],
file_path,
)
if upload_to_compressed_s3_prefix:
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
"compressed_json",
file_path,
)
else:
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
workflow_run_properties["json_prefix"],
file_path,
)
basic_file_info = get_basic_file_info(file_path=file_path)
with open(file_path, "rb") as f_in:
response = s3_client.put_object(
Expand Down Expand Up @@ -701,23 +734,25 @@ def merge_dicts(x: dict, y: dict) -> typing.Generator:
yield (key, y[key])


def get_part_path(
def get_output_path(
metadata: dict,
part_number: int,
part_dir: str,
output_dir: str,
part_number: Optional[int],
touch: bool,
):
) -> str:
"""
A helper function for `write_file_to_json_dataset`

This function returns a part path where we can write data to. Optionally,
create empty file at path.
This function returns a file path where we can write data to. If a part
number is provided, we assume that this is a part file. Otherwise, we
assume that this is a gzip file.

Args:
metadata (dict): Metadata about the source JSON file.
part_number (int): Which part we need a file name for.
part_dir (str): The directory to which we write the part file.
touch (bool): Whether to create an empty file at the part path
output_dir (str): The directory to which we write the file.
part_number (int): Which part we need a file name for. Set to None if
this is the path to a gzip file.
touch (bool): Whether to create an empty file at the path

Returns:
str: A new part path
Expand All @@ -726,10 +761,14 @@ def get_part_path(
FileExistsError: If touch is True and a file already exists at
the part path.
"""
output_filename = get_output_filename(metadata=metadata, part_number=part_number)
output_path = os.path.join(part_dir, output_filename)
file_identifier = get_file_identifier(metadata=metadata)
if part_number is not None:
output_filename = f"{file_identifier}.part{part_number}.ndjson"
else:
output_filename = f"{file_identifier}.ndjson.gz"
output_path = os.path.join(output_dir, output_filename)
if touch:
os.makedirs(part_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
with open(output_path, "x") as initial_file:
# create file
pass
Expand Down
Loading
Loading