Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New pipeline to load BlackCat API data #3129

Merged
merged 15 commits into from
Dec 7, 2023

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions airflow/dags/ntd_report_from_blackcat/METADATA.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
description: "Process raw files from a GCS bucket (NTD reports from BlackCat API) directly into BigQuery"
schedule_interval: "0 10 * * 1" #10 am every Monday
tags:
- ntd, blackcat
default_args:
owner: airflow
depends_on_past: False
start_date: "2023-10-02"
catchup: False
email:
- "kim.engie@slalom.com"
- "christian.suyat@dot.ca.gov"
- "katrina.kaiser@dot.ca.gov"
email_on_failure: True
pool: default_pool
concurrency: 50
wait_for_defaults:
timeout: 3600
9 changes: 9 additions & 0 deletions airflow/dags/ntd_report_from_blackcat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# `ntd_report_from_blackcat`

Type: [Now|Scheduled](https://docs.calitp.org/data-infra/airflow/dags-maintenance.html)

This DAG orchestrates the publishing and storing of data, in the form of NTD report submissions, first pushing API data into Google Cloud Storage in the bucket `calitp-ntd-report-validation`.

Another DAG (part of the `create_external_tables` existing DAG) reads the GCS data in BigQuery in the Cal-ITP data warehouse. The job will take the most recent file of each report type (which has all submitted reports by Caltrans 5311 subrecipients) and publish it into BigQuery `external` tables, if it is not yet there. This job uses the Cal-ITP existing infrastructure for creating external tables, outlined [here](https://docs.calitp.org/data-infra/architecture/data.html).

In the event of failure, the job can be rerun without backfilling.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
operator: operators.BlackCatApiToGCSOperator

bucket: "gs://calitp-ntd-report-validation"
api_url: "https://services.blackcattransit.com/api/APIModules/GetNTDReportsByYear/BCG_CA/"
api_tablename_suffix: "NTDReporting"
form: "all"
bq_table_name_suffix: "ntdreports"
18 changes: 18 additions & 0 deletions airflow/dags/ntd_report_publish_validation/METADATA.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
description: "Process BigQuery tables into Excel files, save in GCS"
schedule_interval: "0 20 * * 1" #8 pm every Monday
tags:
- ntd, blackcat
default_args:
owner: airflow
depends_on_past: False
start_date: "2023-10-02"
catchup: False
email:
- "kim.engie@slalom.com"
- "christian.suyat@dot.ca.gov"
- "katrina.kaiser@dot.ca.gov"
email_on_failure: True
pool: default_pool
concurrency: 50
wait_for_defaults:
timeout: 3600
7 changes: 7 additions & 0 deletions airflow/dags/ntd_report_publish_validation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# `ntd_report_publish_validation`

Type: [Now|Scheduled](https://docs.calitp.org/data-infra/airflow/dags-maintenance.html)

This DAG orchestrates the publishing of NTD Report validation checks in the form of Excel files, that it saves into Google Cloud Storage. Checks conducted on submitted NTD report submissions, previously stored into BigQuery with dbt models. They are then converted to Excel files and saves in the Google Cloud Storage bucket `calitp-ntd-report-validation`.

In the event of failure, the job can be rerun without backfilling.
1 change: 1 addition & 0 deletions airflow/plugins/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# flake8: noqa
from operators.airtable_to_gcs import AirtableToGCSOperator
from operators.blackcat_to_gcs import BlackCatApiToGCSOperator
from operators.external_table import ExternalTable
from operators.gtfs_csv_to_jsonl import GtfsGcsToJsonlOperator
from operators.gtfs_csv_to_jsonl_hourly import GtfsGcsToJsonlOperatorHourly
Expand Down
162 changes: 162 additions & 0 deletions airflow/plugins/operators/blackcat_to_gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import gzip
import logging
import os
from typing import Optional

import pandas as pd
import pendulum
import requests
from calitp_data_infra.storage import get_fs, make_name_bq_safe
from pydantic import BaseModel

from airflow.models import BaseOperator


def write_to_log(logfilename):
"""
Creates a logger object that outputs to a log file, to the filename specified,
and also streams to console.
"""
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s:%(levelname)s: %(message)s", datefmt="%y-%m-%d %H:%M:%S"
)
file_handler = logging.FileHandler(logfilename)
file_handler.setFormatter(formatter)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)

if not logger.hasHandlers():
logger.addHandler(file_handler)
logger.addHandler(stream_handler)

return logger


class BlackCatApiExtract(BaseModel):
api_url: str
form: str
api_tablename_suffix: str
bq_table_name_suffix: str
data: Optional[pd.DataFrame]
logger: Optional[logging.Logger]
extract_time: Optional[pendulum.DateTime]

logger = write_to_log("load_bc_apidata_output.log")
extract_time = pendulum.now()

# pydantic doesn't know dataframe type
# see https://stackoverflow.com/a/69200069
class Config:
arbitrary_types_allowed = True

def fetch_from_bc_api(self):
"""Download a BlackCat table as a DataFrame.

Note that BlackCat API reports have rows structured as follows:
[{'ReportId': <id>,
'Organization': <organization>,
'ReportPeriod': <year>,
'ReportStatus': <status>,
'ReportLastModifiedDate': <timestamp>,
'<table_name>': {'Data': [{colname: value, ...}, {colname: value, ...} ...]}},
{'ReportId': <id>, ...etc. to the next organization}]

This function applies renames in the following order.
1. rename column names from snakecase to names utilizing underscores
2. rename fields
3. apply column prefix (to columns not renamed by 1 or 2)
"""

self.logger.info(
f"Downloading BlackCat data for {self.extract_time.format('YYYY')}_{self.bq_table_name_suffix}."
)
# will automatically add the current year to the API url so that it ends with "/YYYY".
url = self.api_url + self.extract_time.format("YYYY")
response = requests.get(url)
blob = response.json()

raw_df = pd.json_normalize(blob)
raw_df["ReportLastModifiedDate"] = raw_df["ReportLastModifiedDate"].astype(
"datetime64[ns]"
)

self.data = raw_df.rename(make_name_bq_safe, axis="columns")
self.logger.info(
f"Downloaded {self.bq_table_name_suffix} data for {self.extract_time.format('YYYY')} with {len(self.data)} rows!"
)

def make_hive_path(self, form: str, bucket: str):
if not self.extract_time:
raise ValueError(
"An extract time must be set before a hive path can be generated."
)
bq_form_name = str.lower(form).replace("-", "")
return os.path.join(
bucket,
f"{bq_form_name}_{self.api_tablename_suffix}",
f"year={self.extract_time.format('YYYY')}",
f"dt={self.extract_time.to_date_string()}",
f"ts={self.extract_time.to_iso8601_string()}",
f"{bq_form_name}_{self.bq_table_name_suffix}.jsonl.gz",
)

def save_to_gcs(self, fs, bucket):
hive_path = self.make_hive_path(self.form, bucket)
self.logger.info(f"Uploading to GCS at {hive_path}")
if len(self.data) == 0:
self.logger.info(
f"There is no data for {self.api_tablename_suffix} for {self.extract_time.format('YYYY')}, not saving anything. Pipeline exiting."
)
pass
else:
fs.pipe(
hive_path,
gzip.compress(self.data.to_json(orient="records", lines=True).encode()),
)
return hive_path


class BlackCatApiToGCSOperator(BaseOperator):
template_fields = ("bucket",)

def __init__(
self,
bucket,
api_url,
form,
api_tablename_suffix,
bq_table_name_suffix,
**kwargs,
):
"""An operator that downloads all data from a BlackCat API
and saves it as one JSON file hive-partitioned by date in Google Cloud
Storage (GCS). Each org's data will be in 1 row, and for each separate table in the API,
a nested column will hold all of it's data.

Args:
bucket (str): GCS bucket where the scraped BlackCat report will be saved.
api_url (str): The URL to hit that gets the data. This is dynamically appended with the current year, so that
... in 2023 it will pull data from the ".../2023" url and in 2024, ".../2024" etc.
api_tablename_suffix (str): The table that should be extracted from the BlackCat API.
MUST MATCH THE API JSON EXACTLY
bq_table_name_suffix (str): The table name that will be given in BigQuery. Appears in the GCS bucket path and the filename.
form: the NTD form that this report belongs to. E.g., RR-20, A-10, etc. Since it's all forms, here it's "all"
"""
self.bucket = bucket
# Instantiating an instance of the BlackCatApiExtract()
self.extract = BlackCatApiExtract(
api_url=api_url,
form=form,
api_tablename_suffix=api_tablename_suffix,
bq_table_name_suffix=bq_table_name_suffix,
)

super().__init__(**kwargs)

def execute(self, **kwargs):
fs = get_fs()
self.extract.fetch_from_bc_api()
# inserts into xcoms
return self.extract.save_to_gcs(fs, self.bucket)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
--- get the # of active VINS in the inventory - DON'T HAVE
--- get the # of VOMS in the rr-20
-- get the # of vins in the A30

with voms_rr20 as (
select organization,
fiscal_year,
AVG(VOMX) as rr20_voms
FROM {{ ref('int_ntd_rr20_service_alldata') }}
GROUP BY organization, fiscal_year
),

vins_a30 as (
SELECT organization,
api_report_period as fiscal_year,
COUNT(DISTINCT VIN) as a30_vin_n
FROM {{ ref('stg_ntd_2023_a30_assetandresourceinfo') }}
GROUP BY organization, fiscal_year
)

select voms_rr20.*, vins_a30.a30_vin_n
FROM voms_rr20
FULL OUTER JOIN vins_a30
ON voms_rr20.organization = vins_a30.organization
AND voms_rr20.fiscal_year = vins_a30.fiscal_year
ORDER BY organization, fiscal_year
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
-- need fare rev and upt for each year.

WITH fare_rev_2023 as (
select
organization,
api_report_period as fiscal_year,
item as mode,
operations_expended + capital_expended as Fare_Revenues,
from {{ ref('stg_ntd_2023_rr20_rural') }}
WHERE type = "Fare Revenues"
),
upt_2023 as (
select
organization,
api_report_period as fiscal_year,
item as mode,
annual_unlinked_pass_trips as Annual_UPT
from {{ ref('stg_ntd_2023_rr20_rural') }}
WHERE type = "Service Data"
),
all_2023 as (
select fare_rev_2023.organization,
fare_rev_2023.fiscal_year,
fare_rev_2023.mode,
fare_rev_2023.Fare_Revenues,
upt_2023.Annual_UPT
FROM fare_rev_2023
FULL OUTER JOIN upt_2023
ON fare_rev_2023.organization = upt_2023.organization
AND fare_rev_2023.mode = upt_2023.mode
),
fare_rev_2022 as (
SELECT Organization_Legal_Name as organization,
Fiscal_Year as fiscal_year,
sum(Fare_Revenues) as Fare_Revenues
FROM {{ ref('stg_ntd_2022_rr20_financial') }}
GROUP BY organization, fiscal_year
),
upt_2022 as (
select
Organization_Legal_Name as organization,
Fiscal_Year as fiscal_year,
Mode as mode,
Annual_UPT
from {{ ref('stg_ntd_2022_rr20_service') }}
),
all_2022 as (
select fare_rev_2022.organization, fare_rev_2022.fiscal_year,
upt_2022.Mode, fare_rev_2022.Fare_Revenues, upt_2022.Annual_UPT
FROM fare_rev_2022
FULL OUTER JOIN upt_2022
ON fare_rev_2022.organization = upt_2022.organization
)

SELECT * from all_2023

UNION ALL

SELECT * from all_2022
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
-------
-- NTD validation errors about these 1 specific funding sources.
--- ID #s RR20F-070, RR20F-065, RR20F-068, RR20F-066, RR20F-013. Sums the capital expenses across all funding sources
--- In 2022 the data is a different format than 2023 **and onwards**.
--- Only needed for the 2023 error checking (to compare to "last year"). In 2024 you don't need 2022 data.
-------

WITH longform_2023 AS (
SELECT
organization,
api_report_period AS fiscal_year,
operations_expended + capital_expended AS total_expended,
REPLACE(
REPLACE(
REPLACE(item, 'FTA Formula Grants for Rural Areas (§5311)', 'FTA_Formula_Grants_for_Rural_Areas_5311'),
'Other Directly Generated Funds', 'Other_Directly_Generated_Funds'),
'Local Funds', 'Local_Funds') as item
FROM {{ ref('stg_ntd_2023_rr20_rural') }}
WHERE item LIKE "%Directly Generated Funds%"
OR item LIKE "%Formula Grants for Rural Areas%"
OR item LIKE "Local Funds"
),
wide_2023 AS (
SELECT * FROM
(SELECT * FROM longform_2023)
PIVOT(AVG(total_expended) FOR item IN ('FTA_Formula_Grants_for_Rural_Areas_5311', 'Other_Directly_Generated_Funds', 'Local_Funds'))
ORDER BY organization
),
data_2022 AS (
SELECT Organization_Legal_Name as organization,
Fiscal_Year as fiscal_year,
SUM(Other_Directly_Generated_Funds) as Other_Directly_Generated_Funds_2022,
SUM(FTA_Formula_Grants_for_Rural_Areas_5311) as FTA_Formula_Grants_for_Rural_Areas_5311_2022,
Null as Local_Funds_2022
FROM {{ ref('stg_ntd_2022_rr20_financial') }}
GROUP BY 1,2 -- noqa: L054
ORDER BY organization
)

select wide_2023.organization,
wide_2023.FTA_Formula_Grants_for_Rural_Areas_5311 as FTA_Formula_Grants_for_Rural_Areas_5311_2023,
wide_2023.Other_Directly_Generated_Funds as Other_Directly_Generated_Funds_2023,
wide_2023.Local_Funds as Local_Funds_2023,
data_2022.FTA_Formula_Grants_for_Rural_Areas_5311_2022,
data_2022.Other_Directly_Generated_Funds_2022,
data_2022.Local_Funds_2022
from wide_2023
FULL OUTER JOIN data_2022
ON wide_2023.organization = data_2022.organization
ORDER BY organization
Loading
Loading