-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New pipeline to load BlackCat API data #3129
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Kim! Sorry for delayed review since I was out of office. I agree with Evan's comments above and have a few more -- I mostly just want to clarify the intention for using Python models. While we do technically support them, we don't actually use them anywhere in the project yet and it seems like most of the operations here can be performed in SQL in ways that might be easier to maintain (since it would be more in keeping with the rest of the warehouse.)
Edit: I do defer to the folks who will maintain this workflow long term on whether the Python vs. SQL consideration holds weight, it's just my instinct from a historical perspective to stick with SQL unless there's a specific reason otherwise.
|
||
return logger | ||
|
||
def make_ratio_cols(df, numerator, denominator, col_name, logger, operation="sum"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little confused why this is a Python model; it seems that these ratio operations can be performed relatively straightforwardly in SQL (generally a Python model is only used if the relevant operations cannot be performed in SQL or would be very verbose)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirming that @csuyat-dot and I talked to Kim about translating the ratio calculation step to SQL next year, Kim will prioritize automating the other checks.
ignore_index=True).sort_values(by="Organization") | ||
|
||
## Part 1: save Excel file to GCS | ||
GCS_FILE_PATH_VALIDATED = f"gs://calitp-ntd-report-validation/validation_reports_{this_year}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted in Slack, the intention for a Python model in dbt is not to output files to other locations but is simply to perform operations on data models (totally akin to SQL models). Exports should be orchestrated using Airflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moving over to a different script, will try to deploy with Airflow as suggested.
|
||
|
||
def check_rr20_ratios(df, variable, threshold, this_year, last_year, logger): | ||
'''Validation checks where a ratio must be within a certain threshold limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the goal to create columns with these ratio checks, or to treat this as a validation that the rows are valid? (I.e., do you want to have a column with a boolean value for pass/fail, or do you want to just alert if some fail? If the latter, could leverage dbt tests.)
Warehouse report 📦 Checks/potential follow-upsChecks indicate the following action items may be necessary.
New models 🌱calitp_warehouse.mart.ntd_validation.fct_ntd_rr20_service_checks calitp_warehouse.intermediate.ntd_validation.int_ntd_rr20_service_alldata calitp_warehouse.intermediate.ntd_validation.int_ntd_rr20_service_ratios calitp_warehouse.staging.ntd_validation.stg_ntd_2022_rr20_exp_by_mode calitp_warehouse.staging.ntd_validation.stg_ntd_2022_rr20_financial calitp_warehouse.staging.ntd_validation.stg_ntd_2022_rr20_service calitp_warehouse.staging.ntd_validation.stg_ntd_2023_a10 calitp_warehouse.staging.ntd_validation.stg_ntd_2023_rr20_rural calitp_warehouse.staging.ntd_validation.stg_ntd_2023_rr20_urban_tribal calitp_warehouse.staging.ntd_validation.stg_ntd_subrecipients DAGLegend (in order of precedence)
|
One stray comment -- I see that the |
Thanks @lauriemerrell , @evansiroky , I updated the code to pull everything from the API (which is indeed just one URL, and then save it as one external table. In this table, each NTD report is one a nested column in BigQuery. Then, dbt models will parse out the reports into separate tables. In terms of python vs SQL, at this point its a partially a pragmatic action to start out with, since the python is already written and understood by @csuyat-dot who will be taking over, and I have about one week of dev time left on this project. My intention is to transfer all of the checks into dbt first, then as time allows convert as much python as possible into SQL. Imperfect but ensures coverage. |
It looks like there are other eyes on this. I like the refactor of downloading the raw data. Also, I generally agree with @lauriemerrell that dbt should be leveraged as much as possible to keep the coding style similar to the rest of the warehouse scripts. |
49702cd
to
91e7fad
Compare
Description
This code creates a new data pipeline with 2 dags to ingest NTD submitted forms from BlackCat, a third-party vendor, into the Cal-ITP data warehouse. The first dag hits the API and saves all of its contents into one json zipped file in Google Cloud Storage, using the new operator
blackcat_to_gcs.py
. This code follows the same structure as theairtable_to_gcs.py
operator. Hive partitions are created by date and timestamp, and date and timestamp also make up part of the file path to each report's jsonl zipped file. Even though it is a small amount of data and the pipeline will usually not run more than once per day, we included hive-partitioning by timestamp in the event that it is run > 1x/day so that filepaths will differ for each run. Otherwise, the Airflow job will fail since it cannot overwrite a JSONL file of the same name.Secondly, we create an external table so it can be viewed in BigQuery. This is done just by adding a new folder in the
create_external_tables
dag with YAMLs for each table. They call upon the existingexternal_table.py
operator.Thirdly, we have 3 levels of dbt models to transform this data and conduct some validation checks on the submitted answers.
staging/ntd_validation
folder has SQL statements that take the API data from the external table and unnest the columns into individual tables (In the external table, each NTD table from the API is a nested column).intermediate/ntd_validation
folder has dbt models (either SQL or python) that further massage the data into the form needed for the final validation checks.mart/mtd_validation
folder has validation checks for RR-20. They are all of those that are in therr20_service_check.py
script and the 'rr20_financials_check.py` scripts, except for the one that checks against the vehicle inventory since the API doesn't have that info yet.I also added columns for the date checked and added the NTD error ID into the validation tables.
Resolves #[issue]
Type of change
How has this been tested?
The new
blackcat_to_gcs.py
operator have been tested using local Airflow, successfully writing JSONL files to GCS. One can view the API data saved in test runs, in the test GCS foldertest-calitp-ntd-report-validation
The dbt models have been tested using local dbt, and exist in BigQuery on thecal-itp-data-infra-staging
project in thestaging_staging
database.Post-merge follow-ups
This Airflow DAG will remain turned off. The NTD reporting season officially closed on October 31, so there is no purpose in keeping it running since it only pulls NTD-related data from BlackCat. @csuyat-dot on Cal-ITP team will own this pipeline in 2024, and turn it on when sub-recipients start submitting 2024 data, around September 2024.