Skip to content

Commit

Permalink
Include cohort property in all JSON and Parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Sep 5, 2023
1 parent f186062 commit 0a04e3b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/glue/jobs/json_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def get_table(
glue_context.create_dynamic_frame.from_catalog(
database=database_name,
table_name=table_name,
additional_options={"groupFiles": "inPartition"},
transformation_ctx="create_dynamic_frame"
)
.resolveChoice(
Expand Down Expand Up @@ -322,7 +323,8 @@ def write_table_to_s3(
frame = dynamic_frame,
connection_type = "s3",
connection_options = {
"path": s3_write_path
"path": s3_write_path,
"partitionKeys": ["cohort"]
},
format = "parquet",
transformation_ctx="write_dynamic_frame")
Expand Down Expand Up @@ -390,8 +392,9 @@ def add_index_to_table(
logger.info(f"Adding index to {original_field_name}")
parent_index = (parent_table
.select(
[selectable_original_field_name] + INDEX_FIELD_MAP[table_data_type])
.distinct())
([selectable_original_field_name, "cohort"]
+ INDEX_FIELD_MAP[table_data_type])
).distinct())
this_index = parent_index.withColumnRenamed(original_field_name, "id")
df_with_index = this_table.join(
this_index,
Expand Down
8 changes: 8 additions & 0 deletions src/glue/jobs/s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ def transform_object_to_array_of_objects(
def transform_json(
json_obj: dict,
dataset_identifier: str,
cohort: str,
metadata: dict,) -> dict:
"""
Perform the following transformations:
For every JSON:
- Add an export_start_date property (may be None)
- Add an export_end_date property (may be None)
- Add a cohort property
For JSON whose data types have a subtype:
- Add subtype as "Type" property
Expand All @@ -120,6 +122,7 @@ def transform_json(
Args:
json_obj (str): A JSON object sourced from the JSON file of this data type.
dataset_identifier (str): The data type of `json_obj`.
cohort (str): The cohort which this data associates with.
metadata (dict): Metadata derived from the file basename.
Returns:
Expand All @@ -130,6 +133,7 @@ def transform_json(
else:
json_obj["export_start_date"] = None
json_obj["export_end_date"] = metadata.get("end_date").isoformat()
json_obj["cohort"] = cohort
if dataset_identifier in DATA_TYPES_WITH_SUBTYPE:
# This puts the `Type` property back where Apple intended it to be
json_obj["Type"] = metadata["subtype"]
Expand Down Expand Up @@ -269,6 +273,7 @@ def get_output_filename(metadata: dict, part_number: int) -> str:
def transform_block(
input_json: typing.IO,
dataset_identifier: str,
cohort: str,
metadata: dict,
block_size: int=10000):
"""
Expand All @@ -283,6 +288,7 @@ def transform_block(
input_json (typing.IO): A file-like object of the JSON to be transformed.
dataset_identifier (str): The data type of `input_json`.
metadata (dict): Metadata derived from the file basename. See `get_metadata`.
cohort (str): The cohort which this data associates with.
block_size (int, optional): The number of records to process in each block.
Default is 10000.
Expand All @@ -295,6 +301,7 @@ def transform_block(
json_obj = transform_json(
json_obj=json_obj,
dataset_identifier=dataset_identifier,
cohort=cohort,
metadata=metadata
)
block.append(json_obj)
Expand Down Expand Up @@ -360,6 +367,7 @@ def write_file_to_json_dataset(
for transformed_block in transform_block(
input_json=input_json,
dataset_identifier=dataset_identifier,
cohort=cohort,
metadata=metadata
):
current_file_size = os.path.getsize(current_output_path)
Expand Down

0 comments on commit 0a04e3b

Please sign in to comment.