-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ETL-580] JSON to Parquet - Write record count of each export to S3 #93
Conversation
Enables us to merge nested structures in `logger_context` with local logging info, particularly the `labels` object in `logger_context`
src/glue/jobs/json_to_parquet.py
Outdated
@@ -332,13 +414,133 @@ def write_table_to_s3( | |||
format = "parquet", | |||
transformation_ctx="write_dynamic_frame") | |||
|
|||
def count_records_for_event( | |||
table: "pyspark.sql.dataframe.DataFrame", | |||
event: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since event is restricted to a set of string would this make sense to convert this to a String enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically I can't. StrEnum was introduced in 3.11 and this runs in 3.10.
I've never used StrEnum as a type hint before. From the Enum tutorial it seems like it was designed to provide syntactic sugar for working with classes, rather than as a type hint. Is there a benefit to adding the type hint in addition to the docstring and the check we do inside the function itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was more thinking of Enum in general not specifically a StrEnum
(I didn't even know that was a thing).
from enum import Enum
class EventType(Enum):
"""The event associated with a count."""
READ = "READ"
"""This table has just now been read from the Glue table catalog
and has not yet had any transformations done to it."""
DROP_DUPLICATES = "DROP_DUPLICATES"
"""This table has just now had duplicate records dropped
(see the function `drop_table_duplicates`)."""
DROP_DELETED_SAMPLES = "DROP_DELETED_SAMPLES"
"""This table has just now had records which are
present in its respective "Deleted" table dropped (see the function
`drop_deleted_healthkit_data`)."""
WRITE = "WRITE"
"""This table has just now been written to S3."""
def do_thing(event_type: EventType):
print(event_type)
do_thing(EventType.READ)
do_thing(EventType.DROP_DUPLICATES)
do_thing(EventType.DROP_DELETED_SAMPLES)
do_thing(EventType.WRITE)
The point around this is that I don't need to know the string constant to use. I could check the docstring, but I could also use one of the available Enum values:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. It has some nifty features. I'll try it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Just a few comments
src/glue/jobs/s3_to_json.py
Outdated
@@ -643,6 +646,41 @@ def _upload_file_to_json_dataset( | |||
os.remove(file_path) | |||
return s3_output_key | |||
|
|||
def merge_dicts(x: dict, y: dict) -> Generator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Just out of curiosity, do you know if there is a feature that already exists from some package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not in the standard library afaik. IMO, it's not worth adding another dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 Awesome work!
Quality Gate passedThe SonarCloud Quality Gate passed, but some issues were introduced. 3 New issues |
Summary of changes:
src/glue/jobs/json_to_parquet.py
tests/test_json_to_parquet.py
Whenever a read, write, or transform operation is done with the data, we record the number of records for each
export_end_date
(which acts as an identifier for the original export). To support this functionality, functions which did any reading/writing/transforming were split up to be more atomic. Each of these functions requires arecord_counts
andlogger_context
object, which is passed to the function responsible for doing the counting:count_records_for_event
.Since there are typically multiple read/write/transform events per data type, we accumulate these within
record_counts
throughout the job. As the very last step in the job, we concatenate each of these event-specific counts within each data type and write them as a CSV file to S3. Oftentimes there is only one data type which needs counts in a job (after all, the jobs are data type specific), but for data types which have an associated "deleted" table containing deleted samples, we do counts for those under a separate data type. The docstrings forcount_record_for_event
andstore_record_counts
go into the nitty gritty details.Originally I had tried logging the count information in the ECS format, hence there being some ancillary changes related to ECS logging. This didn't work out (see Jira ticket), but the ECS work will support future work on https://sagebionetworks.jira.com/browse/ETL-573 .
Also, a note on tests: JSON to Parquet tests were originally written as a blend of unit and integration tests. This will eventually need to be rewritten as pure unit tests, so any additional tests added have been written as pure unit tests. Fixing the rest of the tests was outside the scope of this ticket (which already took me ~3 weeks to complete) so those are mostly unchanged.
tests/test_json_to_parquet/TestFlatInsertedDateDataType_20230512.ndjson
tests/test_json_to_parquet/TestFlatInsertedDateDataType_20230612.ndjson
We expect our JSON to always contain an
export_end_date
field since we use that field while counting records. This adds that field to these test data.config/develop/namespaced/glue-job-JSONToParquet.yaml
config/prod/namespaced/glue-job-JSONToParquet.yaml
templates/glue-job-JSONToParquet.j2
tests/Dockerfile.aws_glue_4
Add support for additional python modules in JSON to Parquet job (specifically, ECS logging)
src/glue/jobs/s3_to_json.py
Added a
merge_dicts
function which resolves the potential issue where: if a dict is passed to the logger, it could overwrite the same dict in thelogger_context
. Merging dictionaries in a smart way is not trivial, but I think this handles the most obvious case pretty well. Specifically: When passing additionallabels
(a dict), we merge this with thelabels
in thelogger_context
. Previously, the locally definedlabels
would overwrite thelabels
in thelogger_context
.