Skip to content

Commit

Permalink
squash bug where we upload file before buffer is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Aug 20, 2024
1 parent cd9e5e7 commit 77048b5
Showing 1 changed file with 70 additions and 69 deletions.
139 changes: 70 additions & 69 deletions src/glue/jobs/s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,13 +526,14 @@ def write_file_to_json_dataset(
Returns:
list: A list of files uploaded to S3
"""
# Configuration related to where we write our part files
# Configuration related to where we write the JSON
output_dir = os.path.join(
f"dataset={metadata['type']}", f"cohort={metadata['cohort']}"
)
os.makedirs(output_dir, exist_ok=True)
part_number = 0
part_output_path = get_output_path(
# we update this value as we write
current_part_path = get_output_path(
metadata=metadata, part_number=part_number, output_dir=output_dir, touch=True
)
compressed_output_path = get_output_path(
Expand All @@ -541,75 +542,75 @@ def write_file_to_json_dataset(

# We will attach file metadata to the uploaded S3 object
s3_metadata = _derive_str_metadata(metadata=metadata)
line_count = 0
uploaded_files = []
with z.open(json_path, "r") as input_json:
with gzip.open(
compressed_output_path, "wt", encoding="utf-8"
) as f_compressed_out:
current_part_path = part_output_path
line_count = 0
for transformed_block in transform_block(
input_json=input_json, metadata=metadata, logger_context=logger_context
):
current_file_size = os.path.getsize(current_part_path)
if current_file_size > file_size_limit:
# Upload completed part file
_upload_file_to_json_dataset(
file_path=current_part_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_part_path)
# Update output path to next part
part_number += 1
current_part_path = get_output_path(
metadata=metadata,
part_number=part_number,
output_dir=output_dir,
touch=True,
)
# Write block data to both part file and compressed file
for transformed_record in transformed_block:
line_count += 1
record_with_newline = "{}\n".format(json.dumps(transformed_record))
with open(current_part_path, "a") as f_out:
f_out.write(record_with_newline)
f_compressed_out.write(record_with_newline)
# Upload final block
_upload_file_to_json_dataset(
file_path=current_part_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_part_path)
logger_extra = dict(
merge_dicts(
logger_context,
{
"file.LineCount": line_count,
"event.kind": "metric",
"event.category": ["file"],
"event.type": ["info", "creation"],
"event.action": "list-file-properties",
"labels": {
k: v.isoformat() if isinstance(v, datetime.datetime) else v
for k, v in metadata.items()
},
},
# fmt: off
# <python3.10 requires this backslash syntax, we currently run 3.7
with z.open(json_path, "r") as input_json, \
open(current_part_path, "a") as f_out, \
gzip.open(compressed_output_path, "wt", encoding="utf-8") as f_compressed_out:
for transformed_block in transform_block(
input_json=input_json, metadata=metadata, logger_context=logger_context
):
current_file_size = os.path.getsize(current_part_path)
if current_file_size > file_size_limit:
# Upload completed part file
_upload_file_to_json_dataset(
file_path=current_part_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
)
logger.info("Output file attributes", extra=logger_extra)
# Upload compressed file
_upload_file_to_json_dataset(
file_path=compressed_output_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
upload_to_compressed_s3_prefix=True,
)
uploaded_files.append(compressed_output_path)
uploaded_files.append(current_part_path)
# Update output path to next part
part_number += 1
current_part_path = get_output_path(
metadata=metadata,
part_number=part_number,
output_dir=output_dir,
touch=True,
)
# Write block data to both part file and compressed file
for transformed_record in transformed_block:
line_count += 1
record_with_newline = "{}\n".format(json.dumps(transformed_record))
f_out.write(record_with_newline)
f_compressed_out.write(record_with_newline)
# fmt: on
# Upload final part
_upload_file_to_json_dataset(
file_path=current_part_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_part_path)
logger_extra = dict(
merge_dicts(
logger_context,
{
"file.LineCount": line_count,
"event.kind": "metric",
"event.category": ["file"],
"event.type": ["info", "creation"],
"event.action": "list-file-properties",
"labels": {
k: v.isoformat() if isinstance(v, datetime.datetime) else v
for k, v in metadata.items()
},
},
)
)
logger.info("Output file attributes", extra=logger_extra)
# Upload compressed file
_upload_file_to_json_dataset(
file_path=compressed_output_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
upload_to_compressed_s3_prefix=True,
)
uploaded_files.append(compressed_output_path)
return uploaded_files


Expand Down

0 comments on commit 77048b5

Please sign in to comment.