Skip to content

Commit

Permalink
Upload and delete part files immediately after final write
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Sep 19, 2023
1 parent faa1350 commit beead94
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 24 deletions.
78 changes: 54 additions & 24 deletions src/glue/jobs/s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ def write_file_to_json_dataset(
Returns:
list: A list of files uploaded to S3
"""
s3_client = boto3.client("s3")
part_dir = os.path.join(
f"dataset={dataset_identifier}", f"cohort={cohort}")
os.makedirs(part_dir, exist_ok=True)
Expand All @@ -361,6 +360,7 @@ def write_file_to_json_dataset(
s3_metadata["start_date"] = metadata["start_date"].isoformat()
s3_metadata["end_date"] = metadata["end_date"].isoformat()
part_number = 0
uploaded_files = []
output_path = get_part_path(
metadata=metadata,
part_number=part_number,
Expand All @@ -377,6 +377,13 @@ def write_file_to_json_dataset(
):
current_file_size = os.path.getsize(current_output_path)
if current_file_size > file_size_limit:
_upload_file_to_json_dataset(
file_path=current_output_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload
)
uploaded_files.append(current_output_path)
part_number += 1
current_output_path = get_part_path(
metadata=metadata,
Expand All @@ -387,32 +394,55 @@ def write_file_to_json_dataset(
with open(current_output_path, "a") as f_out:
for transformed_record in transformed_block:
f_out.write("{}\n".format(json.dumps(transformed_record)))
uploaded_files = []
for part_file in os.listdir(part_dir):
output_path = os.path.join(part_dir, part_file)
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
workflow_run_properties["json_prefix"],
output_path
)
logger.debug(
"Uploading %s to %s",
output_path,
s3_output_key
# Upload final block
_upload_file_to_json_dataset(
file_path=current_output_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload
)
with open(output_path, "rb") as f_in:
response = s3_client.put_object(
Body = f_in,
Bucket = workflow_run_properties["json_bucket"],
Key = s3_output_key,
Metadata = s3_metadata
)
uploaded_files.append(output_path)
logger.debug("S3 Put object response: %s", json.dumps(response))
if delete_upon_successful_upload:
os.remove(output_path)
uploaded_files.append(current_output_path)
return uploaded_files

def _upload_file_to_json_dataset(
file_path: str,
s3_metadata: dict,
workflow_run_properties: dict,
delete_upon_successful_upload: bool,) -> str:
"""
A helper function for `write_file_to_json_dataset` which handles
the actual uploading of the data to S3.
Args:
file_path (str): The path of the JSON file to upload.
s3_metadata (dict): S3 Metadata to include on the object.
workflow_run_properties (dict): The workflow arguments
delete_upon_successful_upload (bool): Whether to delete the local
copy of the JSON file after uploading to S3. Set to False
during testing.
Returns:
str: The S3 object key of the uploaded file.
"""
s3_client = boto3.client("s3")
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
workflow_run_properties["json_prefix"],
file_path
)
logger.debug("Uploading %s to %s", file_path, s3_output_key)
with open(file_path, "rb") as f_in:
response = s3_client.put_object(
Body = f_in,
Bucket = workflow_run_properties["json_bucket"],
Key = s3_output_key,
Metadata = s3_metadata
)
logger.debug("S3 Put object response: %s", json.dumps(response))
if delete_upon_successful_upload:
os.remove(file_path)
return s3_output_key

def get_part_path(
metadata: dict,
part_number: int,
Expand Down
55 changes: 55 additions & 0 deletions tests/test_s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,61 @@ def test_get_output_filename_subtype(self):
)
assert output_filename == "HealthKitV2Samples_Weight_20220112-20230114.part0.ndjson"

def test_upload_file_to_json_dataset_delete_local_copy(self, namespace, monkeypatch, shared_datadir):
monkeypatch.setattr("boto3.client", lambda x: MockAWSClient())
sample_metadata = {
"type": "HealthKitV2Samples",
"subtype": "Weight",
}
workflow_run_properties = {
"namespace": namespace,
"json_prefix": "raw-json",
"json_bucket": "json-bucket",
}
original_file_path = os.path.join(shared_datadir, "2023-01-13T21--08--51Z_TESTDATA")
temp_dir = f"dataset={sample_metadata['type']}"
os.makedirs(temp_dir)
new_file_path = shutil.copy(original_file_path, temp_dir)
s3_response = s3_to_json._upload_file_to_json_dataset(
file_path=new_file_path,
s3_metadata=sample_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=True,
)

assert not os.path.exists(new_file_path)
shutil.rmtree(temp_dir)

def test_upload_file_to_json_dataset_s3_key(self, namespace, monkeypatch, shared_datadir):
monkeypatch.setattr("boto3.client", lambda x: MockAWSClient())
sample_metadata = {
"type": "HealthKitV2Samples",
"subtype": "Weight",
}
workflow_run_properties = {
"namespace": namespace,
"json_prefix": "raw-json",
"json_bucket": "json-bucket",
}
original_file_path = os.path.join(shared_datadir, "2023-01-13T21--08--51Z_TESTDATA")
temp_dir = f"dataset={sample_metadata['type']}"
os.makedirs(temp_dir)
new_file_path = shutil.copy(original_file_path, temp_dir)
s3_key = s3_to_json._upload_file_to_json_dataset(
file_path=new_file_path,
s3_metadata=sample_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=True,
)

correct_s3_key = os.path.join(
workflow_run_properties["namespace"],
workflow_run_properties["json_prefix"],
new_file_path,
)
assert s3_key == correct_s3_key
shutil.rmtree(temp_dir)

def test_write_file_to_json_dataset_delete_local_copy(self, s3_obj, namespace, monkeypatch):
monkeypatch.setattr("boto3.client", lambda x: MockAWSClient())
sample_metadata = {
Expand Down

0 comments on commit beead94

Please sign in to comment.