diff --git a/.gitignore b/.gitignore index 05a9a9f56..658cefc54 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,4 @@ venv/ # Ignore most CSVs, except those that are used as dbt seeds *.csv !dbt/seeds/**/*.csv - +*.parquet.gzip diff --git a/dbt/dbt_project.yml b/dbt/dbt_project.yml index f38d21355..e2ea129d1 100644 --- a/dbt/dbt_project.yml +++ b/dbt/dbt_project.yml @@ -73,5 +73,7 @@ seeds: +schema: location model: +schema: model + reporting: + +schema: reporting spatial: +schema: spatial diff --git a/dbt/models/reporting/docs.md b/dbt/models/reporting/docs.md index 862e260e1..80261b340 100644 --- a/dbt/models/reporting/docs.md +++ b/dbt/models/reporting/docs.md @@ -30,6 +30,75 @@ Materialized to speed up queries for Tableau. `property_group` {% enddocs %} +# sot_assessment_roll + +{% docs table_sot_assessment_roll %} +Table to feed the Python dbt job that creates the +`reporting.sot_assessment_roll` table. Feeds public reporting assets. + +**Primary Key**: `year`, `stage_name`, `geography_id`, `group_id` +{% enddocs %} + +# sot_assessment_roll_input + +{% docs table_sot_assessment_roll_input %} +Table to feed the Python dbt job that creates the +`reporting.sot_assessment_roll` table. Feeds public reporting assets. + +**Primary Key**: `year`, `stage_name`, `geography_id`, `group_id` +{% enddocs %} + +# sot_ratio_stats + +{% docs table_sot_ratio_stats %} +Feeds public reporting assets. + +**Primary Key**: `year`, `stage_name`, `geography_id`, `group_id` +{% enddocs %} + +# sot_ratio_stats_input + +{% docs table_sot_ratio_stats_input %} +Table to feed the Python dbt job that creates the +`reporting.sot_ratio_stats` table. Feeds public reporting assets. + +**Primary Key**: `year`, `stage_name`, `geography_id`, `group_id` +{% enddocs %} + +# sot_sales + +{% docs table_sot_sales %} +Feeds public reporting assets. + +**Primary Key**: `year`, `geography_id`, `group_id` +{% enddocs %} + +# sot_sales_input + +{% docs table_sot_sales_input %} +Table to feed the Python dbt job that creates the +`reporting.sot_sales` table. Feeds public reporting assets. + +**Primary Key**: `year`, `geography_id`, `group_id` +{% enddocs %} + +# sot_taxes_exemptions + +{% docs table_sot_taxes_exemptions %} +Feeds public reporting assets. + +**Primary Key**: `year`, `geography_id`, `group_id` +{% enddocs %} + +# sot_taxes_exemptions_input + +{% docs table_sot_taxes_exemptions_input %} +Table to feed the Python dbt job that creates the +`reporting.sot_taxes_exemptions` table. Feeds public reporting assets. + +**Primary Key**: `year`, `geography_id`, `group_id` +{% enddocs %} + # vw_assessment_roll {% docs view_vw_assessment_roll %} diff --git a/dbt/models/reporting/reporting.sot_assessment_roll.py b/dbt/models/reporting/reporting.sot_assessment_roll.py new file mode 100644 index 000000000..eef930eb3 --- /dev/null +++ b/dbt/models/reporting/reporting.sot_assessment_roll.py @@ -0,0 +1,328 @@ +# This script generates aggregated summary stats on assessed values across a +# number of geographies, class combinations, and time. + +# Import libraries +import pandas as pd + +# Declare class groupings +groups = ["no_group", "class", "major_class", "modeling_group", "res_other"] + + +# Define aggregation functions. These are just wrappers for basic python +# functions that make using them easier to use with pandas.agg(). +def q10(x): + return x.quantile(0.1) + + +def q25(x): + return x.quantile(0.25) + + +def q75(x): + return x.quantile(0.75) + + +def q90(x): + return x.quantile(0.9) + + +def first(x): + if len(x) >= 1: + output = x.iloc[0] + else: + output = None + + return output + + +more_stats = [ + "min", + q10, + q25, + "median", + q75, + q90, + "max", + "mean", + "sum", +] + +stats = { + "av_tot": ["size", "count"] + more_stats, + "av_bldg": more_stats, + "av_land": more_stats, + "triad": [first], + "geography_data_year": [first], +} + + +def aggregrate(data, geography_type, group_type): + """ + Function to group a dataframe by whichever geography and group types it is + passed and output aggregate stats for that grouping. + """ + + print(geography_type, group_type) + + group = [geography_type, group_type, "year", "stage_name"] + summary = data.groupby(group).agg(stats).round(2) + summary["geography_type"] = geography_type + summary["group_type"] = group_type + summary.index.names = ["geography_id", "group_id", "year", "stage_name"] + summary = summary.reset_index().set_index( + [ + "geography_type", + "geography_id", + "group_type", + "group_id", + "year", + "stage_name", + ] + ) + + return summary + + +def assemble(df, geos, groups): + """ + Function that loops over predefined geography and class groups and passes + them to the aggregate function. Returns stacked output from the aggregate + function. + """ + + # Create an empty dataframe to fill with output + output = pd.DataFrame() + + # Loop through group combinations and stack output + for key, value in geos.items(): + df["geography_data_year"] = df[key] + + for x in value: + for z in groups: + output = pd.concat([output, aggregrate(df, x, z)]) + + # Flatten multi-index + output.columns = ["_".join(col) for col in output.columns] + output = output.reset_index() + output = output.rename(columns={"triad_first": "triad"}) + + # Create additional stat columns post-aggregation + output["av_tot_pct_w_value"] = ( + output["av_tot_count"] / output["av_tot_size"] + ) + + output = output.sort_values("year") + + diff_cols = [ + "geography_id", + "group_id", + "stage_name", + "av_tot_median", + "av_tot_mean", + "av_tot_sum", + "av_bldg_median", + "av_bldg_mean", + "av_bldg_sum", + "av_land_median", + "av_land_mean", + "av_land_sum", + ] + + output[ + [ + "av_tot_delta_median", + "av_tot_delta_mean", + "av_tot_delta_sum", + "av_bldg_delta_median", + "av_bldg_delta_mean", + "av_bldg_delta_sum", + "av_land_delta_median", + "av_land_delta_mean", + "av_land_delta_sum", + ] + ] = ( + output[diff_cols] + .groupby(["geography_id", "group_id", "stage_name"]) + .diff() + ) + + output[ + [ + "av_tot_delta_pct_median", + "av_tot_delta_pct_mean", + "av_tot_delta_pct_sum", + "av_bldg_delta_pct_median", + "av_bldg_delta_pct_mean", + "av_bldg_delta_pct_sum", + "av_land_delta_pct_median", + "av_land_delta_pct_mean", + "av_land_delta_pct_sum", + ] + ] = ( + output[diff_cols] + .groupby(["geography_id", "group_id", "stage_name"]) + .pct_change() + ) + + output["year"] = output["year"].astype(int) + output["triennial"] = output["geography_type"].isin( + ["triad", "township", "nbhd"] + ) + + # Reassessment year is constructed as a string rather than a boolean to + # avoid PySpark errors with nullable booleans that can likely be resolved. + output["reassessment_year"] = "" + output.loc[ + (output["triennial"] == True), "reassessment_year" # noqa: E712 + ] = "No" + output.loc[ + ((output["year"] % 3 == 0) & (output["triad"] == "North")) + | ((output["year"] % 3 == 1) & (output["triad"] == "South")) + | ((output["year"] % 3 == 2) & (output["triad"] == "City")) + & (output["triennial"] == True), # noqa: E712 + "reassessment_year", + ] = "Yes" + output = output.drop(["triennial", "triad"], axis=1) + + output = clean_names(output) + + return output + + +def clean_names(x): + """ + Function to rename and reorder columns. + """ + + output = x.rename( + columns={ + "av_tot_size": "pin_n_tot", + "av_tot_count": "pin_n_w_value", + "av_tot_pct_w_value": "pin_pct_w_value", + "geography_data_year_first": "geography_data_year", + } + ) + + output = output[ + [ + "geography_type", + "geography_id", + "geography_data_year", + "group_type", + "group_id", + "year", + "reassessment_year", + "stage_name", + "pin_n_tot", + "pin_n_w_value", + "pin_pct_w_value", + "av_tot_min", + "av_tot_q10", + "av_tot_q25", + "av_tot_median", + "av_tot_q75", + "av_tot_q90", + "av_tot_max", + "av_tot_mean", + "av_tot_sum", + "av_tot_delta_median", + "av_tot_delta_mean", + "av_tot_delta_sum", + "av_tot_delta_pct_median", + "av_tot_delta_pct_mean", + "av_tot_delta_pct_sum", + "av_bldg_min", + "av_bldg_q10", + "av_bldg_q25", + "av_bldg_median", + "av_bldg_q75", + "av_bldg_q90", + "av_bldg_max", + "av_bldg_mean", + "av_bldg_sum", + "av_bldg_delta_median", + "av_bldg_delta_mean", + "av_bldg_delta_sum", + "av_bldg_delta_pct_median", + "av_bldg_delta_pct_mean", + "av_bldg_delta_pct_sum", + "av_land_min", + "av_land_q10", + "av_land_q25", + "av_land_median", + "av_land_q75", + "av_land_q90", + "av_land_max", + "av_land_mean", + "av_land_sum", + "av_land_delta_median", + "av_land_delta_mean", + "av_land_delta_sum", + "av_land_delta_pct_median", + "av_land_delta_pct_mean", + "av_land_delta_pct_sum", + ] + ] + + return output + + +def ingest_geos(geos): + """ + Function to convert dbt seed into a dictionary that can be iterated over. + """ + + geos = geos.toPandas() + output = { + k: list(geos[k].unique()[pd.notnull(geos[k].unique())]) + for k in geos.columns + } + + return output + + +def model(dbt, spark_session): + """ + Function to build a dbt python model using PySpark. + """ + dbt.config(materialized="table") + + # Ingest geographies and their associated data years + geos = ingest_geos(dbt.ref("reporting.sot_data_years")) + + input = dbt.ref("reporting.sot_assessment_roll_input") + + # Convert the Spark input dataframe to Pandas for + # compatibility with assesspy functions + input = input.toPandas() + + df = assemble(input, geos=geos, groups=groups) + + schema = ( + "geography_type: string, geography_id: string, " + + "geography_data_year: string, group_type: string, group_id: string, " + + "year: string, reassessment_year: string, stage_name: string, " + + "pin_n_tot: bigint, pin_n_w_value: bigint, pin_pct_w_value: double, " + + "av_tot_min: double, av_tot_q10: double, av_tot_q25: double, " + + "av_tot_median: double, av_tot_q75: double, av_tot_q90: double, " + + "av_tot_max: double, av_tot_mean: double, av_tot_sum: double, " + + "av_tot_delta_median: double, av_tot_delta_mean: double, " + + "av_tot_delta_sum: double, av_tot_delta_pct_median: double, " + + "av_tot_delta_pct_mean: double, av_tot_delta_pct_sum: double, " + + "av_bldg_min: double, av_bldg_q10: double, av_bldg_q25: double, " + + "av_bldg_median: double, av_bldg_q75: double, av_bldg_q90: double, " + + "av_bldg_max: double, av_bldg_mean: double, av_bldg_sum: double, " + + "av_bldg_delta_median: double, av_bldg_delta_mean: double, " + + "av_bldg_delta_sum: double, av_bldg_delta_pct_median: double, " + + "av_bldg_delta_pct_mean: double, av_bldg_delta_pct_sum: double, " + + "av_land_min: double, av_land_q10: double, av_land_q25: double, " + + "av_land_median: double, av_land_q75: double, av_land_q90: double, " + + "av_land_max: double, av_land_mean: double, av_land_sum: double, " + + "av_land_delta_median: double, av_land_delta_mean: double, " + + "av_land_delta_sum: double, av_land_delta_pct_median: double, " + + "av_land_delta_pct_mean: double, av_land_delta_pct_sum: double" + ) + + spark_df = spark_session.createDataFrame(df, schema=schema) + + return spark_df diff --git a/dbt/models/reporting/reporting.sot_assessment_roll_input.sql b/dbt/models/reporting/reporting.sot_assessment_roll_input.sql new file mode 100644 index 000000000..89724635c --- /dev/null +++ b/dbt/models/reporting/reporting.sot_assessment_roll_input.sql @@ -0,0 +1,106 @@ +-- This script gathers parcel-level geographies and joins them to values and +-- class groupings. Its sole purpose is to feed reporting.sot_assessment_roll, +-- and should not be used otherwise. +{{ + config( + materialized='table', + partitioned_by=['year'] + ) +}} + +/* Ensure every municipality/class/year has a row for every stage through +cross-joining. This is to make sure that combinations that do not yet +exist in iasworld.asmt_all for the current year will exist in the view, but have +largely empty columns. For example: even if no class 4s in the City of Chicago +have been mailed yet for the current assessment year, we would still like an +empty City of Chicago/class 4 row to exist for the mailed stage. */ +WITH stages AS ( + + SELECT 'MAILED' AS stage_name + UNION + SELECT 'ASSESSOR CERTIFIED' AS stage_name + UNION + SELECT 'BOR CERTIFIED' AS stage_name + +), + +-- Universe of all parcels as defined by iasworld.pardat, expanded with +-- assessment stages. +uni AS ( + SELECT + vw_pin_universe.*, + stages.* + FROM {{ ref('default.vw_pin_universe') }} + CROSS JOIN stages +) + +SELECT + uni.stage_name, + uni.class, + CAST(vals.tot AS INT) AS av_tot, + CAST(vals.bldg AS INT) AS av_bldg, + CAST(vals.land AS INT) AS av_land, + 'Cook' AS county, + uni.triad_name AS triad, + uni.township_name AS township, + uni.nbhd_code AS nbhd, + uni.tax_code, + uni.zip_code, + uni.chicago_community_area_name AS community_area, + uni.census_place_geoid AS census_place, + uni.census_tract_geoid AS census_tract, + uni.census_congressional_district_geoid + AS + census_congressional_district, + uni.census_zcta_geoid AS census_zcta, + uni.cook_board_of_review_district_num AS cook_board_of_review_district, + uni.cook_commissioner_district_num AS cook_commissioner_district, + uni.cook_judicial_district_num AS cook_judicial_district, + uni.ward_num, + uni.chicago_police_district_num AS police_district, + uni.school_elementary_district_geoid AS school_elementary_district, + uni.school_secondary_district_geoid AS school_secondary_district, + uni.school_unified_district_geoid AS school_unified_district, + ARRAY_JOIN(uni.tax_municipality_name, ', ') AS tax_municipality, + ARRAY_JOIN(uni.tax_park_district_name, ', ') AS tax_park_district, + ARRAY_JOIN(uni.tax_library_district_name, ', ') AS tax_library_district, + ARRAY_JOIN(uni.tax_fire_protection_district_name, ', ') + AS tax_fire_protection_district, + ARRAY_JOIN(uni.tax_community_college_district_name, ', ') + AS + tax_community_college_district, + ARRAY_JOIN(uni.tax_sanitation_district_name, ', ') + AS tax_sanitation_district, + ARRAY_JOIN(uni.tax_special_service_area_name, ', ') + AS tax_special_service_area, + ARRAY_JOIN(uni.tax_tif_district_name, ', ') AS tax_tif_district, + uni.econ_central_business_district_num AS central_business_district, + uni.census_data_year, + uni.cook_board_of_review_district_data_year, + uni.cook_commissioner_district_data_year, + uni.cook_judicial_district_data_year, + COALESCE( + uni.ward_chicago_data_year, uni.ward_evanston_data_year) AS + ward_data_year, + uni.chicago_community_area_data_year AS community_area_data_year, + uni.chicago_police_district_data_year AS police_district_data_year, + uni.econ_central_business_district_data_year + AS + central_business_district_data_year, + uni.school_data_year, + uni.tax_data_year, + 'no_group' AS no_group, + class_dict.major_class_type AS major_class, + class_dict.modeling_group, + CASE WHEN class_dict.major_class_code = '2' THEN 'RES' ELSE 'OTHER' END + AS res_other, + uni.year +FROM uni +LEFT JOIN {{ ref('reporting.vw_pin_value_long') }} AS vals + ON uni.pin = vals.pin + AND uni.year = vals.year + AND uni.stage_name = vals.stage_name +LEFT JOIN {{ ref('ccao.class_dict') }} + ON uni.class = class_dict.class_code +-- Temporary limit on feeder table to avoid GitHub runner memory issues. +WHERE uni.class = '278' AND uni.year IN ('2019', '2020', '2021') diff --git a/dbt/models/reporting/reporting.sot_ratio_stats.py b/dbt/models/reporting/reporting.sot_ratio_stats.py new file mode 100644 index 000000000..a2677b98f --- /dev/null +++ b/dbt/models/reporting/reporting.sot_ratio_stats.py @@ -0,0 +1,383 @@ +# pylint: skip-file +# type: ignore +sc.addPyFile( # noqa: F821 + "s3://ccao-athena-dependencies-us-east-1/assesspy==1.1.0.zip" +) + +# This script generates aggregated summary stats on sales ratios across a +# number of geographies, class combinations, and time. + +# Import libraries +import assesspy as ass # noqa: E402 +import numpy as np # noqa: E402 +import pandas as pd # noqa: E402 + +# Declare class groupings +groups = ["no_group", "class", "major_class", "modeling_group", "res_other"] + + +# Wrap assesspy functions to avoid GitHub runner errors for length 0 groupings +def cod_safe(ratio): + if len(ratio) >= 1: + output = ass.cod(ratio) + else: + output = None + + return output + + +def prd_safe(assessed, sale_price): + if len(sale_price) >= 1: + output = ass.prd(assessed=assessed, sale_price=sale_price) + else: + output = None + + return output + + +def prb_safe(assessed, sale_price): + if len(sale_price) >= 1: + output = ass.prb(assessed=assessed, sale_price=sale_price, round=3)[ + "prb" + ] + else: + output = None + + return output + + +def mki_safe(assessed, sale_price): + if len(sale_price) >= 1: + output = ass.mki(assessed=assessed, sale_price=sale_price) + else: + output = None + + return output + + +# Define aggregation functions +def first(x): + if len(x) >= 1: + output = x.iloc[0] + else: + output = None + + return output + + +def met(x, lower_limit, upper_limit): + return np.logical_and(lower_limit <= x, x <= upper_limit) + + +def within(x, limit): + return np.logical_and(1 - limit < x, x < 1 + limit) + + +def aggregrate(data, geography_type, group_type): + """ + Function to group a dataframe by whichever geography and group types it is + passed and output aggregate stats for that grouping. Works + differently than in other SoT scripts since assesspy functions need + multiple inputs. + """ + + print(geography_type, group_type) + + group = [geography_type, group_type, "year", "stage_name"] + data["pin_n_tot"] = data.groupby(group)["tot_mv"].transform("size") + data["sale_n_tot"] = data.groupby(group)["sale_price"].transform("count") + data["pin_n_w_value"] = data.groupby(group)["tot_mv"].transform("count") + + # Remove parcels with MVs of 0 since they screw up ratios + data = data[data["tot_mv"] > 0] + + # Remove groups that only have one sale since we can't calculate stats + data = data.dropna(subset=["sale_price"]) + data = data[data["sale_n_tot"] >= 20] + + summary = data.groupby(group).apply( + lambda x: pd.Series( + { + "triad": first(x["triad"]), + "pin_n_tot": np.size(x["ratio"]), + "pin_n_w_value": x["pin_n_w_value"].min(), + "sale_n_tot": x["sale_n_tot"].min(), + "mv_min": x["tot_mv"].min(), + "mv_q10": x["tot_mv"].quantile(0.1), + "mv_q25": x["tot_mv"].quantile(0.25), + "mv_median": x["tot_mv"].median(), + "mv_q75": x["tot_mv"].quantile(0.75), + "mv_q90": x["tot_mv"].quantile(0.90), + "mv_max": x["tot_mv"].max(), + "mv_mean": x["tot_mv"].mean(), + "mv_sum": x["tot_mv"].sum(), + "ratio_min": x["ratio"].min(), + "ratio_q10": x["ratio"].quantile(0.1), + "ratio_q25": x["ratio"].quantile(0.25), + "ratio_median": x["ratio"].median(), + "ratio_q75": x["ratio"].quantile(0.75), + "ratio_q90": x["ratio"].quantile(0.90), + "ratio_max": x["ratio"].max(), + "ratio_mean": x["ratio"].mean(), + "cod": cod_safe(ratio=x["ratio"]), + "prd": prd_safe( + assessed=x["tot_mv"], sale_price=x["sale_price"] + ), + "prb": prb_safe( + assessed=x["tot_mv"], sale_price=x["sale_price"] + ), + "mki": mki_safe( + assessed=x["tot_mv"], sale_price=x["sale_price"] + ), + "geography_data_year": first(x["data_year"]), + } + ) + ) + summary["geography_type"] = geography_type + summary["group_type"] = group_type + + return summary + + +def assemble(df, geos, groups): + """ + Function that loops over predefined geography and class groups and passes + them to the aggregate function. Returns stacked output from the aggregate + function. + """ + + # Create an empty dataframe to fill with output + output = pd.DataFrame() + + # Loop through group combinations and stack output + for key, value in geos.items(): + df["data_year"] = df[key] + + for x in value: + for z in groups: + output = pd.concat([output, aggregrate(df, x, z)]) + + output.dropna(how="all", axis=1, inplace=True) + + output.index.names = ["geography_id", "group_id", "year", "stage_name"] + output = output.reset_index().set_index( + [ + "geography_type", + "geography_id", + "group_type", + "group_id", + "year", + "stage_name", + ] + ) + + output = output.reset_index() + + # Create additional stat columns post-aggregation + output["pin_pct_w_value"] = output["pin_n_w_value"] / output["pin_n_tot"] + + output = output.sort_values("year") + + diff_cols = [ + "geography_id", + "group_id", + "stage_name", + "mv_median", + "mv_mean", + "mv_sum", + ] + + output[ + [ + "mv_delta_median", + "mv_delta_mean", + "mv_delta_sum", + ] + ] = ( + output[diff_cols] + .groupby(["geography_id", "group_id", "stage_name"]) + .diff() + ) + + output[ + [ + "mv_delta_pct_median", + "mv_delta_pct_mean", + "mv_delta_pct_sum", + ] + ] = ( + output[diff_cols] + .groupby(["geography_id", "group_id", "stage_name"]) + .pct_change() + ) + + output["year"] = output["year"].astype(int) + output["triennial"] = output["geography_type"].isin( + ["triad", "township", "nbhd"] + ) + output["reassessment_year"] = "" + output.loc[ + (output["triennial"] == True), "reassessment_year" # noqa: E712 + ] = "No" + output.loc[ + ((output["year"] % 3 == 0) & (output["triad"] == "North")) + | ((output["year"] % 3 == 1) & (output["triad"] == "South")) + | ((output["year"] % 3 == 2) & (output["triad"] == "City")) + & (output["triennial"] == True), # noqa: E712 + "reassessment_year", + ] = "Yes" + output = output.drop(["triennial", "triad"], axis=1) + + output["cod_met"] = met(output["cod"], 5, 15) + output["prd_met"] = met(output["prd"], 0.98, 1.03) + output["prb_met"] = met(output["prb"], -0.05, 0.05) + output["mki_met"] = met(output["mki"], 0.95, 1.05) + + output["within_05_pct"] = within(output["ratio_mean"], 0.05) + output["within_10_pct"] = within(output["ratio_mean"], 0.1) + output["within_15_pct"] = within(output["ratio_mean"], 0.15) + output["within_20_pct"] = within(output["ratio_mean"], 0.2) + + # PySpark rejects nan, convert them to None + output = output.replace(np.nan, None) + + output = clean(output) + + return output + + +def clean(dirty): + """ + Function to change column types and reorder them. + """ + + dirty = dirty.astype( + { + "group_id": "str", + "year": "str", + "stage_name": "str", + "reassessment_year": "str", + "pin_n_w_value": np.int64, + "pin_n_tot": np.int64, + "sale_n_tot": np.int64, + "mv_min": np.int64, + "mv_q10": np.int64, + "mv_q25": np.int64, + "mv_median": np.int64, + "mv_q75": np.int64, + "mv_q90": np.int64, + "mv_max": np.int64, + "mv_mean": np.int64, + "mv_sum": np.int64, + } + ) + + dirty = dirty[ + [ + "geography_type", + "geography_id", + "geography_data_year", + "group_type", + "group_id", + "year", + "reassessment_year", + "stage_name", + "pin_n_tot", + "pin_n_w_value", + "pin_pct_w_value", + "sale_n_tot", + "mv_min", + "mv_q10", + "mv_q25", + "mv_median", + "mv_q75", + "mv_q90", + "mv_max", + "mv_mean", + "mv_sum", + "mv_delta_median", + "mv_delta_mean", + "mv_delta_sum", + "mv_delta_pct_median", + "mv_delta_pct_mean", + "mv_delta_pct_sum", + "ratio_min", + "ratio_q10", + "ratio_q25", + "ratio_median", + "ratio_q75", + "ratio_q90", + "ratio_max", + "ratio_mean", + "cod", + "prd", + "prb", + "mki", + "cod_met", + "prd_met", + "prb_met", + "mki_met", + "within_05_pct", + "within_10_pct", + "within_15_pct", + "within_20_pct", + ] + ] + + return dirty + + +def ingest_geos(geos): + """ + Function to convert dbt seed into a dictionary that can be iterated over. + """ + + geos = geos.toPandas() + output = { + k: list(geos[k].unique()[pd.notnull(geos[k].unique())]) + for k in geos.columns + } + + return output + + +def model(dbt, spark_session): + """ + Function to build a dbt python model using PySpark. + """ + dbt.config(materialized="table") + + # Ingest geographies and their associated data years + geos = ingest_geos(dbt.ref("reporting.sot_data_years")) + + input = dbt.ref("reporting.sot_ratio_stats_input") + + # Convert the Spark input dataframe to Pandas for + # compatibility with assesspy functions + input = input.toPandas() + + df = assemble(input, geos=geos, groups=groups) + + schema = ( + "geography_type: string, geography_id: string, " + + "geography_data_year: string, group_type: string, group_id: string, " + + "year: string, reassessment_year: string, stage_name: string, " + + "pin_n_tot: int, pin_n_w_value: bigint, pin_pct_w_value: double, " + + "sale_n_tot: bigint, mv_min: bigint, mv_q10: bigint, " + + "mv_q25: bigint, mv_median: bigint, mv_q75: bigint, " + + "mv_q90: bigint, mv_max: bigint, mv_mean: bigint, mv_sum: bigint, " + + "mv_delta_median: double, mv_delta_mean: double, " + + "mv_delta_sum: double, mv_delta_pct_median: double, " + + "mv_delta_pct_mean: double, mv_delta_pct_sum: double, " + + "ratio_min: double, ratio_q10: double, ratio_q25: double, " + + "ratio_median: double, ratio_q75: double, ratio_q90: double, " + + "ratio_max: double, ratio_mean: double, cod: double, prd: double, " + + "prb: double, mki: double, cod_met: boolean, prd_met: boolean, " + + "prb_met: boolean, mki_met: boolean, within_05_pct: boolean, " + + "within_10_pct: boolean, within_15_pct: boolean, " + + "within_20_pct: boolean" + ) + + spark_df = spark_session.createDataFrame(df, schema=schema) + + return spark_df diff --git a/dbt/models/reporting/reporting.sot_ratio_stats_input.sql b/dbt/models/reporting/reporting.sot_ratio_stats_input.sql new file mode 100644 index 000000000..17ed6ef83 --- /dev/null +++ b/dbt/models/reporting/reporting.sot_ratio_stats_input.sql @@ -0,0 +1,116 @@ +/* This script gathers parcel-level geographies and joins them to values and +sale prices, and class groupings in order to construct sales ratios. Its sole +purpose is to feed reporting.sot_ratio_stats, and should not be used +otherwise. */ +{{ + config( + materialized='table', + partitioned_by=['year'] + ) +}} + +/* Ensure every municipality/class/year has a row for every stage through +cross-joining. This is to make sure that combinations that do not yet +exist in iasworld.asmt_all for the current year will exist in the view, but have +largely empty columns. For example: even if no class 4s in the City of Chicago +have been mailed yet for the current assessment year, we would still like an +empty City of Chicago/class 4 row to exist for the mailed stage. */ +WITH stages AS ( + + SELECT 'MAILED' AS stage_name + UNION + SELECT 'ASSESSOR CERTIFIED' AS stage_name + UNION + SELECT 'BOR CERTIFIED' AS stage_name + +), + +-- Universe of all parcels as defined by iasworld.pardat, expanded with +-- assessment stages. +uni AS ( + SELECT + vw_pin_universe.*, + stages.* + FROM {{ ref('default.vw_pin_universe') }} + CROSS JOIN stages +) + +SELECT + CAST(sales.sale_price AS DOUBLE) AS sale_price, + uni.stage_name, + uni.class, + CAST(vals.tot_mv AS DOUBLE) AS tot_mv, + CAST(vals.tot_mv AS DOUBLE) / CAST(sales.sale_price AS DOUBLE) AS ratio, + 'Cook' AS county, + uni.triad_name AS triad, + uni.township_name AS township, + uni.nbhd_code AS nbhd, + uni.tax_code, + uni.zip_code, + uni.chicago_community_area_name AS community_area, + uni.census_place_geoid AS census_place, + uni.census_tract_geoid AS census_tract, + uni.census_congressional_district_geoid + AS + census_congressional_district, + uni.census_zcta_geoid AS census_zcta, + uni.cook_board_of_review_district_num AS cook_board_of_review_district, + uni.cook_commissioner_district_num AS cook_commissioner_district, + uni.cook_judicial_district_num AS cook_judicial_district, + uni.ward_num, + uni.chicago_police_district_num AS police_district, + uni.school_elementary_district_geoid AS school_elementary_district, + uni.school_secondary_district_geoid AS school_secondary_district, + uni.school_unified_district_geoid AS school_unified_district, + ARRAY_JOIN(uni.tax_municipality_name, ', ') AS tax_municipality, + ARRAY_JOIN(uni.tax_park_district_name, ', ') AS tax_park_district, + ARRAY_JOIN(uni.tax_library_district_name, ', ') AS tax_library_district, + ARRAY_JOIN(uni.tax_fire_protection_district_name, ', ') + AS tax_fire_protection_district, + ARRAY_JOIN(uni.tax_community_college_district_name, ', ') + AS + tax_community_college_district, + ARRAY_JOIN(uni.tax_sanitation_district_name, ', ') + AS tax_sanitation_district, + ARRAY_JOIN(uni.tax_special_service_area_name, ', ') + AS tax_special_service_area, + ARRAY_JOIN(uni.tax_tif_district_name, ', ') AS tax_tif_district, + uni.econ_central_business_district_num AS central_business_district, + uni.census_data_year, + uni.cook_board_of_review_district_data_year, + uni.cook_commissioner_district_data_year, + uni.cook_judicial_district_data_year, + COALESCE( + uni.ward_chicago_data_year, uni.ward_evanston_data_year) AS + ward_data_year, + uni.chicago_community_area_data_year AS community_area_data_year, + uni.chicago_police_district_data_year AS police_district_data_year, + uni.econ_central_business_district_data_year + AS + central_business_district_data_year, + uni.school_data_year, + uni.tax_data_year, + 'no_group' AS no_group, + class_dict.major_class_type AS major_class, + class_dict.modeling_group, + CASE WHEN class_dict.major_class_code = '2' THEN 'RES' ELSE 'OTHER' END + AS res_other, + uni.year +FROM uni +LEFT JOIN + {{ ref('reporting.vw_pin_value_long') }} AS vals + ON uni.pin = vals.pin + AND uni.year = vals.year + AND uni.stage_name = vals.stage_name +LEFT JOIN {{ ref('ccao.class_dict') }} + ON uni.class = class_dict.class_code +LEFT JOIN {{ ref('default.vw_pin_sale') }} AS sales + ON uni.pin = sales.pin + AND uni.year = sales.year + AND NOT sales.is_multisale + AND NOT sales.sale_filter_deed_type + AND NOT sales.sale_filter_less_than_10k + AND NOT sales.sale_filter_same_sale_within_365 +-- Temporary limit on feeder table to avoid GitHub runner memory issues. +WHERE uni.year >= '2020' + AND uni.year IN ('2022', '2023') AND uni.class IN ('278', '597') diff --git a/dbt/models/reporting/reporting.sot_sales.py b/dbt/models/reporting/reporting.sot_sales.py new file mode 100644 index 000000000..b3d496172 --- /dev/null +++ b/dbt/models/reporting/reporting.sot_sales.py @@ -0,0 +1,258 @@ +# This script generates aggregated summary stats on sales across a number of +# geographies, class combinations, and time. + +import statistics as stats + +# Import libraries +import numpy as np +import pandas as pd + +# Declare class groupings +groups = ["no_group", "class", "major_class", "modeling_group", "res_other"] + + +# Define aggregation functions. These are just wrappers for basic python +# functions that make using them easier to use with pandas.agg(). +def q10(x): + return x.quantile(0.1) + + +def q25(x): + return x.quantile(0.25) + + +def q75(x): + return x.quantile(0.75) + + +def q90(x): + return x.quantile(0.9) + + +def first(x): + return x.iloc[0] + + +more_stats = [ + "min", + q10, + q25, + "median", + q75, + q90, + "max", + "mean", + "sum", +] + +agg_func_math = { + "sale_price": ["size", "count"] + more_stats, + "sale_price_per_sf": more_stats, + "sale_char_bldg_sf": ["median"], + "sale_char_land_sf": ["median"], + "sale_char_yrblt": ["median"], + "class": [stats.multimode], + "geography_data_year": [first], +} + + +def aggregrate(data, geography_type, group_type): + """ + Function to group a dataframe by whichever geography and group types it is + passed and output aggregate stats for that grouping. + """ + print(geography_type, group_type) + + group = [geography_type, group_type, "year"] + summary = data.groupby(group).agg(agg_func_math).round(2) + summary["geography_type"] = geography_type + summary["group_type"] = group_type + summary.index.names = ["geography_id", "group_id", "year"] + summary = summary.reset_index().set_index( + [ + "geography_type", + "geography_id", + "group_type", + "group_id", + "year", + ] + ) + + return summary + + +def assemble(df, geos, groups): + """ + Function that loops over predefined geography and class groups and passes + them to the aggregate function. Returns stacked output from the aggregate + function. + """ + + # Create an empty dataframe to fill with output + output = pd.DataFrame() + + # Loop through group combinations and stack output + for key, value in geos.items(): + df["geography_data_year"] = df[key] + + for x in value: + for z in groups: + output = pd.concat([output, aggregrate(df, x, z)]) + + # Flatten multi-index + output.columns = ["_".join(col) for col in output.columns] + output = output.reset_index() + + # Create additional stat columns post-aggregation + output["sale_price_sum"] = output["sale_price_sum"].replace(0, np.NaN) + output["sale_price_per_sf_sum"] = output["sale_price_per_sf_sum"].replace( + 0, np.NaN + ) + + output = output.sort_values("year") + + diff_cols = [ + "geography_id", + "group_id", + "sale_price_median", + "sale_price_mean", + "sale_price_sum", + "sale_price_per_sf_median", + "sale_price_per_sf_mean", + "sale_price_per_sf_sum", + ] + + output[ + [ + "sale_price_delta_median", + "sale_price_delta_mean", + "sale_price_delta_sum", + "sale_price_per_sf_delta_median", + "sale_price_per_sf_delta_mean", + "sale_price_per_sf_delta_sum", + ] + ] = ( + output[diff_cols].groupby(["geography_id", "group_id"]).diff() + ) + + output = clean_names(output) + + return output + + +def clean_names(x): + """ + Function to rename and reorder columns. + """ + + output = x.rename( + columns={ + "sale_price_size": "pin_n_tot", + "year": "sale_year", + "sale_price_count": "sale_n_tot", + "class_multimode": "sale_class_mode", + "geography_data_year_first": "geography_data_year", + } + ) + + output = output[ + [ + "geography_type", + "geography_id", + "geography_data_year", + "group_type", + "group_id", + "sale_year", + "pin_n_tot", + "sale_n_tot", + "sale_price_min", + "sale_price_q10", + "sale_price_q25", + "sale_price_median", + "sale_price_q75", + "sale_price_q90", + "sale_price_max", + "sale_price_mean", + "sale_price_sum", + "sale_price_delta_median", + "sale_price_delta_mean", + "sale_price_delta_sum", + "sale_price_per_sf_min", + "sale_price_per_sf_q10", + "sale_price_per_sf_q25", + "sale_price_per_sf_median", + "sale_price_per_sf_q75", + "sale_price_per_sf_q90", + "sale_price_per_sf_max", + "sale_price_per_sf_mean", + "sale_price_per_sf_sum", + "sale_price_per_sf_delta_median", + "sale_price_per_sf_delta_mean", + "sale_price_per_sf_delta_sum", + "sale_char_bldg_sf_median", + "sale_char_land_sf_median", + "sale_char_yrblt_median", + "sale_class_mode", + ] + ] + + return output + + +def ingest_geos(geos): + """ + Function to convert dbt seed into a dictionary that can be iterated over. + """ + + geos = geos.toPandas() + output = { + k: list(geos[k].unique()[pd.notnull(geos[k].unique())]) + for k in geos.columns + } + + return output + + +def model(dbt, spark_session): + """ + Function to build a dbt python model using PySpark. + """ + dbt.config(materialized="table") + + # Ingest geographies and their associated data years + geos = ingest_geos(dbt.ref("reporting.sot_data_years")) + + input = dbt.ref("reporting.sot_sales_input") + + # Convert the Spark input dataframe to Pandas for + # compatibility with assesspy functions + input = input.toPandas() + + df = assemble(input, geos=geos, groups=groups) + + schema = ( + "geography_type: string, geography_id: string, " + + "geography_data_year: string, group_type: string, " + + "group_id: string, sale_year: string, pin_n_tot: bigint, " + + "sale_n_tot: int, sale_price_min: double, sale_price_q10: double, " + + "sale_price_q25: double, sale_price_median: double, " + + "sale_price_q75: double, sale_price_q90: double, " + + "sale_price_max: double, sale_price_mean: double, " + + "sale_price_sum: double, sale_price_delta_median: double, " + + "sale_price_delta_mean: double, sale_price_delta_sum: double, " + + "sale_price_per_sf_min: double, sale_price_per_sf_q10: double, " + + "sale_price_per_sf_q25: double, sale_price_per_sf_median: double, " + + "sale_price_per_sf_q75: double, sale_price_per_sf_q90: double, " + + "sale_price_per_sf_max: double, sale_price_per_sf_mean: double, " + + "sale_price_per_sf_sum: double, " + + "sale_price_per_sf_delta_median: double, " + + "sale_price_per_sf_delta_mean: double, " + + "sale_price_per_sf_delta_sum: double, " + + "sale_char_bldg_sf_median: double, " + + "sale_char_land_sf_median: double, " + + "sale_char_yrblt_median: double, sale_class_mode: array" + ) + + spark_df = spark_session.createDataFrame(df, schema=schema) + + return spark_df diff --git a/dbt/models/reporting/reporting.sot_sales_input.sql b/dbt/models/reporting/reporting.sot_sales_input.sql new file mode 100644 index 000000000..18d0960aa --- /dev/null +++ b/dbt/models/reporting/reporting.sot_sales_input.sql @@ -0,0 +1,104 @@ +-- This script gathers parcel-level geographies and joins them to sales and +-- class groupings. Its sole purpose is to feed reporting.sot_sales, +-- and should not be used otherwise. + +{{ + config( + materialized='table', + partitioned_by=['year'] + ) +}} + +-- Gather parcel-level land and yrblt +WITH sf AS ( + SELECT + pin, + year, + SUM(char_bldg_sf) AS char_bldg_sf, + SUM(char_land_sf) AS char_land_sf, + ARBITRARY(char_yrblt) AS char_yrblt + FROM {{ ref('default.vw_card_res_char') }} + GROUP BY pin, year +) + +SELECT + sales.doc_no, + sales.sale_price, + CASE WHEN sf.char_bldg_sf > 0 + THEN + CAST(sales.sale_price / sf.char_bldg_sf AS DOUBLE) + END AS sale_price_per_sf, + CAST(sf.char_bldg_sf AS INT) AS sale_char_bldg_sf, + CAST(sf.char_land_sf AS INT) AS sale_char_land_sf, + CAST(sf.char_yrblt AS INT) AS sale_char_yrblt, + uni.class, + 'Cook' AS county, + uni.triad_name AS triad, + uni.township_name AS township, + uni.nbhd_code AS nbhd, + uni.tax_code, + uni.zip_code, + uni.chicago_community_area_name AS community_area, + uni.census_place_geoid AS census_place, + uni.census_tract_geoid AS census_tract, + uni.census_congressional_district_geoid + AS + census_congressional_district, + uni.census_zcta_geoid AS census_zcta, + uni.cook_board_of_review_district_num AS cook_board_of_review_district, + uni.cook_commissioner_district_num AS cook_commissioner_district, + uni.cook_judicial_district_num AS cook_judicial_district, + uni.ward_num, + uni.chicago_police_district_num AS police_district, + uni.school_elementary_district_geoid AS school_elementary_district, + uni.school_secondary_district_geoid AS school_secondary_district, + uni.school_unified_district_geoid AS school_unified_district, + ARRAY_JOIN(uni.tax_municipality_name, ', ') AS tax_municipality, + ARRAY_JOIN(uni.tax_park_district_name, ', ') AS tax_park_district, + ARRAY_JOIN(uni.tax_library_district_name, ', ') AS tax_library_district, + ARRAY_JOIN(uni.tax_fire_protection_district_name, ', ') + AS tax_fire_protection_district, + ARRAY_JOIN(uni.tax_community_college_district_name, ', ') + AS + tax_community_college_district, + ARRAY_JOIN(uni.tax_sanitation_district_name, ', ') + AS tax_sanitation_district, + ARRAY_JOIN(uni.tax_special_service_area_name, ', ') + AS tax_special_service_area, + ARRAY_JOIN(uni.tax_tif_district_name, ', ') AS tax_tif_district, + uni.econ_central_business_district_num AS central_business_district, + uni.census_data_year, + uni.cook_board_of_review_district_data_year, + uni.cook_commissioner_district_data_year, + uni.cook_judicial_district_data_year, + COALESCE( + uni.ward_chicago_data_year, uni.ward_evanston_data_year) AS + ward_data_year, + uni.chicago_community_area_data_year AS community_area_data_year, + uni.chicago_police_district_data_year AS police_district_data_year, + uni.econ_central_business_district_data_year + AS + central_business_district_data_year, + uni.school_data_year, + uni.tax_data_year, + 'no_group' AS no_group, + class_dict.major_class_type AS major_class, + class_dict.modeling_group, + CASE WHEN class_dict.major_class_code = '2' THEN 'RES' ELSE 'OTHER' END + AS res_other, + uni.year +FROM {{ ref('default.vw_pin_universe') }} AS uni +LEFT JOIN sf + ON uni.pin = sf.pin + AND uni.year = sf.year +LEFT JOIN {{ ref('ccao.class_dict') }} + ON uni.class = class_dict.class_code +LEFT JOIN {{ ref('default.vw_pin_sale') }} AS sales + ON uni.pin = sales.pin + AND uni.year = sales.year + AND NOT sales.is_multisale + AND NOT sales.sale_filter_deed_type + AND NOT sales.sale_filter_less_than_10k + AND NOT sales.sale_filter_same_sale_within_365 +-- Temporary limit on feeder table to avoid GitHub runner memory issues. +WHERE uni.year = '2023' diff --git a/dbt/models/reporting/reporting.sot_taxes_exemptions.py b/dbt/models/reporting/reporting.sot_taxes_exemptions.py new file mode 100644 index 000000000..2aa0cef5a --- /dev/null +++ b/dbt/models/reporting/reporting.sot_taxes_exemptions.py @@ -0,0 +1,302 @@ +# This script generates aggregated summary stats on taxes and exemptions data +# across a number of geographies, class combinations, and time. + +# Import libraries +import pandas as pd + +# Declare class groupings +groups = ["no_group", "class", "major_class", "modeling_group", "res_other"] + + +# Define aggregation functions. These are just wrappers for basic python +# functions that make using them easier to use with pandas.agg(). +def q10(x): + return x.quantile(0.1) + + +def q25(x): + return x.quantile(0.25) + + +def q75(x): + return x.quantile(0.75) + + +def q90(x): + return x.quantile(0.9) + + +def first(x): + return x.iloc[0] + + +more_stats = [ + "min", + q10, + q25, + "median", + q75, + q90, + "max", + "mean", + "sum", +] + +less_stats = ["count", "sum"] + +agg_func_math = { + "tax_eq_factor_final": ["size", first], + "tax_eq_factor_tentative": [first], + "tax_bill_total": more_stats, + "tax_rate": more_stats, + "tax_av": more_stats, + "tax_exe_homeowner": less_stats, + "tax_exe_senior": less_stats, + "tax_exe_freeze": less_stats, + "tax_exe_longtime_homeowner": less_stats, + "tax_exe_disabled": less_stats, + "tax_exe_vet_returning": less_stats, + "tax_exe_vet_dis_lt50": less_stats, + "tax_exe_vet_dis_50_69": less_stats, + "tax_exe_vet_dis_ge70": less_stats, + "tax_exe_abate": less_stats, + "tax_exe_total": less_stats, + "geography_data_year": [first], +} + + +def aggregrate(data, geography_type, group_type): + """ + Function to group a dataframe by whichever geography and group types it is + passed and output aggregate stats for that grouping. + """ + + print(geography_type, group_type) + + group = [geography_type, group_type, "year"] + summary = data.groupby(group).agg(agg_func_math).round(2) + summary["geography_type"] = geography_type + summary["group_type"] = group_type + summary.index.names = ["geography_id", "group_id", "year"] + summary = summary.reset_index().set_index( + [ + "geography_type", + "geography_id", + "group_type", + "group_id", + "year", + ] + ) + + return summary + + +def assemble(df, geos, groups): + """ + Function that loops over predefined geography and class groups and passes + them to the aggregate function. Returns stacked output from the aggregate + function. + """ + # Create an empty dataframe to fill with output + output = pd.DataFrame() + + # Loop through group combinations and stack output + for key, value in geos.items(): + df["geography_data_year"] = df[key] + + for x in value: + for z in groups: + output = pd.concat([output, aggregrate(df, x, z)]) + + # Flatten multi-index + output.columns = ["_".join(col) for col in output.columns] + output = output.reset_index() + + # Create additional stat columns post-aggregation + output = output.sort_values("year") + + diff_cols = [ + "geography_id", + "group_id", + "tax_bill_total_median", + "tax_bill_total_mean", + "tax_bill_total_sum", + ] + + output[ + [ + "tax_bill_total_delta_median", + "tax_bill_total_delta_mean", + "tax_bill_total_delta_sum", + ] + ] = ( + output[diff_cols].groupby(["geography_id", "group_id"]).diff() + ) + + output = clean_names(output) + + return output + + +def clean_names(x): + """ + Function to rename and reorder columns. + """ + + output = x.rename( + columns={ + "tax_eq_factor_final_size": "pin_n_tot", + "year": "tax_year", + "tax_exe_homeowner_count": "tax_exe_n_homeowner", + "tax_exe_senior_count": "tax_exe_n_senior", + "tax_exe_freeze_count": "tax_exe_n_freeze", + "tax_exe_longtime_homeowner_count": "tax_exe_n_longtime_homeowner", + "tax_exe_disabled_count": "tax_exe_n_disabled", + "tax_exe_vet_returning_count": "tax_exe_n_vet_returning", + "tax_exe_vet_dis_lt50_count": "tax_exe_n_vet_dis_lt50", + "tax_exe_vet_dis_50_69_count": "tax_exe_n_vet_dis_50_69", + "tax_exe_vet_dis_ge70_count": "tax_exe_n_vet_dis_ge70", + "tax_exe_abate_count": "tax_exe_n_abate", + "tax_exe_total_count": "tax_exe_n_total", + "tax_eq_factor_final_first": "tax_eq_factor_final", + "tax_eq_factor_tentative_first": "tax_eq_factor_tentative", + "geography_data_year_first": "geography_data_year", + } + ) + + output = output[ + [ + "geography_type", + "geography_id", + "geography_data_year", + "group_type", + "group_id", + "tax_year", + "pin_n_tot", + "tax_eq_factor_final", + "tax_eq_factor_tentative", + "tax_bill_total_min", + "tax_bill_total_q10", + "tax_bill_total_q25", + "tax_bill_total_median", + "tax_bill_total_q75", + "tax_bill_total_q90", + "tax_bill_total_max", + "tax_bill_total_mean", + "tax_bill_total_sum", + "tax_bill_total_delta_median", + "tax_bill_total_delta_mean", + "tax_bill_total_delta_sum", + "tax_rate_min", + "tax_rate_q10", + "tax_rate_q25", + "tax_rate_median", + "tax_rate_q75", + "tax_rate_q90", + "tax_rate_max", + "tax_rate_mean", + "tax_rate_sum", + "tax_av_min", + "tax_av_q10", + "tax_av_q25", + "tax_av_median", + "tax_av_q75", + "tax_av_q90", + "tax_av_max", + "tax_av_mean", + "tax_av_sum", + "tax_exe_n_homeowner", + "tax_exe_homeowner_sum", + "tax_exe_n_senior", + "tax_exe_senior_sum", + "tax_exe_n_freeze", + "tax_exe_freeze_sum", + "tax_exe_n_longtime_homeowner", + "tax_exe_longtime_homeowner_sum", + "tax_exe_n_disabled", + "tax_exe_disabled_sum", + "tax_exe_n_vet_returning", + "tax_exe_vet_returning_sum", + "tax_exe_n_vet_dis_lt50", + "tax_exe_vet_dis_lt50_sum", + "tax_exe_n_vet_dis_50_69", + "tax_exe_vet_dis_50_69_sum", + "tax_exe_n_vet_dis_ge70", + "tax_exe_vet_dis_ge70_sum", + "tax_exe_n_abate", + "tax_exe_abate_sum", + "tax_exe_n_total", + "tax_exe_total_sum", + ] + ] + return output + + +def ingest_geos(geos): + """ + Function to convert dbt seed into a dictionary that can be iterated over. + """ + + geos = geos.toPandas() + output = { + k: list(geos[k].unique()[pd.notnull(geos[k].unique())]) + for k in geos.columns + } + + return output + + +def model(dbt, spark_session): + """ + Function to build a dbt python model using PySpark. + """ + dbt.config(materialized="table") + + # Ingest geographies and their associated data years + geos = ingest_geos(dbt.ref("reporting.sot_data_years")) + + input = dbt.ref("reporting.sot_taxes_exemptions_input") + + # Convert the Spark input dataframe to Pandas for + # compatibility with assesspy functions + input = input.toPandas() + + df = assemble(input, geos=geos, groups=groups) + + schema = ( + "geography_type: string, geography_id: string, " + + "geography_data_year: string, group_type: string, " + + "group_id: string, tax_year: string, pin_n_tot: bigint, " + + "tax_eq_factor_final: double, tax_eq_factor_tentative: double, " + + "tax_bill_total_min: double, tax_bill_total_q10: double, " + + "tax_bill_total_q25: double, tax_bill_total_median: double, " + + "tax_bill_total_q75: double, tax_bill_total_q90: double, " + + "tax_bill_total_max: double, tax_bill_total_mean: double, " + + "tax_bill_total_sum: double, tax_bill_total_delta_median: double, " + + "tax_bill_total_delta_mean: double, " + + "tax_bill_total_delta_sum: double , tax_rate_min: double, " + + "tax_rate_q10: double, tax_rate_q25: double, " + + "tax_rate_median: double, tax_rate_q75: double, " + + "tax_rate_q90: double, tax_rate_max: double, " + + "tax_rate_mean: double, tax_rate_sum: double, " + + "tax_av_min: int, tax_av_q10: double, tax_av_q25: double, " + + "tax_av_median: double, tax_av_q75: double, " + + "tax_av_q90: double, tax_av_max: int, tax_av_mean: double, " + + "tax_av_sum: double, tax_exe_n_homeowner: bigint, " + + "tax_exe_homeowner_sum: double, tax_exe_n_senior: bigint, " + + "tax_exe_senior_sum: double, tax_exe_n_freeze: bigint, " + + "tax_exe_freeze_sum: double, tax_exe_n_longtime_homeowner: bigint, " + + "tax_exe_longtime_homeowner_sum: double, " + + "tax_exe_n_disabled: bigint, tax_exe_disabled_sum: double, " + + "tax_exe_n_vet_returning: bigint, " + + "tax_exe_vet_returning_sum: double, tax_exe_n_vet_dis_lt50: bigint, " + + "tax_exe_vet_dis_lt50_sum: double, tax_exe_n_vet_dis_50_69: bigint, " + + "tax_exe_vet_dis_50_69_sum: double, tax_exe_n_vet_dis_ge70: bigint, " + + "tax_exe_vet_dis_ge70_sum: double, tax_exe_n_abate: bigint, " + + "tax_exe_abate_sum: double, tax_exe_n_total: bigint, " + + "tax_exe_total_sum: double" + ) + + spark_df = spark_session.createDataFrame(df, schema=schema) + + return spark_df diff --git a/dbt/models/reporting/reporting.sot_taxes_exemptions_input.sql b/dbt/models/reporting/reporting.sot_taxes_exemptions_input.sql new file mode 100644 index 000000000..95718dca9 --- /dev/null +++ b/dbt/models/reporting/reporting.sot_taxes_exemptions_input.sql @@ -0,0 +1,132 @@ +-- This script gathers parcel-level geographies and joins them to values, tax +-- amounts, exemptions and class groupings. Its sole purpose is to feed +-- reporting.sot_taxes_and_exemptions, and should not be used otherwise. +{{ + config( + materialized='table', + partitioned_by=['year'] + ) +}} + +-- Gather unique tax codes and rates +WITH tcd AS ( + SELECT DISTINCT + tax_code_num, + tax_code_rate, + year + FROM {{ source('tax', 'tax_code') }} +) + +SELECT + uni.pin, + tax.av_clerk AS tax_av, + tax.tax_bill_total, + -- Setting exemptions with values of 0 allows us to count the number of + -- exemptions more easily and doesn't skew stats. + CASE WHEN tax.exe_homeowner = 0 THEN NULL ELSE tax.exe_homeowner END + AS tax_exe_homeowner, + CASE WHEN tax.exe_senior = 0 THEN NULL ELSE tax.exe_senior END + AS tax_exe_senior, + CASE WHEN tax.exe_freeze = 0 THEN NULL ELSE tax.exe_freeze END + AS tax_exe_freeze, + CASE + WHEN tax.exe_longtime_homeowner = 0 THEN NULL ELSE + tax.exe_longtime_homeowner + END AS tax_exe_longtime_homeowner, + CASE WHEN tax.exe_disabled = 0 THEN NULL ELSE tax.exe_disabled END + AS tax_exe_disabled, + CASE + WHEN tax.exe_vet_returning = 0 THEN NULL ELSE tax.exe_vet_returning + END AS tax_exe_vet_returning, + CASE WHEN tax.exe_vet_dis_lt50 = 0 THEN NULL ELSE tax.exe_vet_dis_lt50 END + AS tax_exe_vet_dis_lt50, + CASE + WHEN tax.exe_vet_dis_50_69 = 0 THEN NULL ELSE tax.exe_vet_dis_50_69 + END AS tax_exe_vet_dis_50_69, + CASE WHEN tax.exe_vet_dis_ge70 = 0 THEN NULL ELSE tax.exe_vet_dis_ge70 END + AS tax_exe_vet_dis_ge70, + CASE WHEN tax.exe_abate = 0 THEN NULL ELSE tax.exe_abate END + AS tax_exe_abate, + CASE + WHEN tax.exe_homeowner + tax.exe_senior + tax.exe_freeze + + tax.exe_longtime_homeowner + tax.exe_disabled + + tax.exe_vet_returning + tax.exe_vet_dis_lt50 + + tax.exe_vet_dis_50_69 + tax.exe_vet_dis_ge70 + tax.exe_abate = 0 + THEN NULL ELSE + tax.exe_homeowner + tax.exe_senior + tax.exe_freeze + + tax.exe_longtime_homeowner + tax.exe_disabled + + tax.exe_vet_returning + tax.exe_vet_dis_lt50 + + tax.exe_vet_dis_50_69 + tax.exe_vet_dis_ge70 + tax.exe_abate + END AS tax_exe_total, + tcd.tax_code_rate AS tax_rate, + eqf.eq_factor_tentative AS tax_eq_factor_tentative, + eqf.eq_factor_final AS tax_eq_factor_final, + uni.class, + 'Cook' AS county, + uni.triad_name AS triad, + uni.township_name AS township, + uni.nbhd_code AS nbhd, + uni.tax_code, + uni.zip_code, + uni.chicago_community_area_name AS community_area, + uni.census_place_geoid AS census_place, + uni.census_tract_geoid AS census_tract, + uni.census_congressional_district_geoid + AS + census_congressional_district, + uni.census_zcta_geoid AS census_zcta, + uni.cook_board_of_review_district_num AS cook_board_of_review_district, + uni.cook_commissioner_district_num AS cook_commissioner_district, + uni.cook_judicial_district_num AS cook_judicial_district, + uni.ward_num, + uni.chicago_police_district_num AS police_district, + uni.school_elementary_district_geoid AS school_elementary_district, + uni.school_secondary_district_geoid AS school_secondary_district, + uni.school_unified_district_geoid AS school_unified_district, + ARRAY_JOIN(uni.tax_municipality_name, ', ') AS tax_municipality, + ARRAY_JOIN(uni.tax_park_district_name, ', ') AS tax_park_district, + ARRAY_JOIN(uni.tax_library_district_name, ', ') AS tax_library_district, + ARRAY_JOIN(uni.tax_fire_protection_district_name, ', ') + AS tax_fire_protection_district, + ARRAY_JOIN(uni.tax_community_college_district_name, ', ') + AS + tax_community_college_district, + ARRAY_JOIN(uni.tax_sanitation_district_name, ', ') + AS tax_sanitation_district, + ARRAY_JOIN(uni.tax_special_service_area_name, ', ') + AS tax_special_service_area, + ARRAY_JOIN(uni.tax_tif_district_name, ', ') AS tax_tif_district, + uni.econ_central_business_district_num AS central_business_district, + uni.census_data_year, + uni.cook_board_of_review_district_data_year, + uni.cook_commissioner_district_data_year, + uni.cook_judicial_district_data_year, + COALESCE( + uni.ward_chicago_data_year, uni.ward_evanston_data_year) AS + ward_data_year, + uni.chicago_community_area_data_year AS community_area_data_year, + uni.chicago_police_district_data_year AS police_district_data_year, + uni.econ_central_business_district_data_year + AS + central_business_district_data_year, + uni.school_data_year, + uni.tax_data_year, + 'no_group' AS no_group, + class_dict.major_class_type AS major_class, + class_dict.modeling_group, + CASE WHEN class_dict.major_class_code = '2' THEN 'RES' ELSE 'OTHER' END + AS res_other, + tax.year +FROM {{ ref('default.vw_pin_universe') }} AS uni +INNER JOIN {{ source('tax', 'pin') }} AS tax + ON uni.pin = tax.pin + AND uni.year = tax.year +INNER JOIN {{ source('tax', 'eq_factor') }} AS eqf + ON uni.year = eqf.year +INNER JOIN tcd + ON tax.tax_code_num = tcd.tax_code_num + AND tax.year = tcd.year +INNER JOIN {{ ref('ccao.class_dict') }} + ON uni.class = class_dict.class_code +-- Temporary limit on feeder table to avoid GitHub runner memory issues. +WHERE uni.class = '206' diff --git a/dbt/models/reporting/schema.yml b/dbt/models/reporting/schema.yml index 2168f9b99..18c0e148d 100644 --- a/dbt/models/reporting/schema.yml +++ b/dbt/models/reporting/schema.yml @@ -34,6 +34,54 @@ models: within_20_pct >= within_10_pct AND within_10_pct >= within_05_pct + - name: reporting.sot_assessment_roll + description: '{{ doc("table_sot_assessment_roll") }}' + config: + tags: + - daily + + - name: reporting.sot_assessment_roll_input + description: '{{ doc("table_sot_assessment_roll_input") }}' + config: + tags: + - daily + + - name: reporting.sot_ratio_stats + description: '{{ doc("table_sot_ratio_stats") }}' + config: + tags: + - daily + + - name: reporting.sot_ratio_stats_input + description: '{{ doc("table_sot_ratio_stats_input") }}' + config: + tags: + - daily + + - name: reporting.sot_sales + description: '{{ doc("table_sot_sales") }}' + config: + tags: + - daily + + - name: reporting.sot_sales_input + description: '{{ doc("table_sot_sales_input") }}' + config: + tags: + - daily + + - name: reporting.sot_taxes_exemptions + description: '{{ doc("table_sot_taxes_exemptions") }}' + config: + tags: + - daily + + - name: reporting.sot_taxes_exemptions_input + description: '{{ doc("table_sot_taxes_exemptions_input") }}' + config: + tags: + - daily + - name: reporting.ratio_stats_input description: '{{ doc("table_ratio_stats_input") }}' config: diff --git a/dbt/seeds/reporting/docs.md b/dbt/seeds/reporting/docs.md new file mode 100644 index 000000000..ac26fa1a4 --- /dev/null +++ b/dbt/seeds/reporting/docs.md @@ -0,0 +1,6 @@ +# sot_data_years + +{% docs seed_sot_data_years %} +A table containing reporting geographies and their associated data year identifiers. + +{% enddocs %} diff --git a/dbt/seeds/reporting/reporting.sot_data_years.csv b/dbt/seeds/reporting/reporting.sot_data_years.csv new file mode 100644 index 000000000..eed8df432 --- /dev/null +++ b/dbt/seeds/reporting/reporting.sot_data_years.csv @@ -0,0 +1,9 @@ +year,census_data_year,cook_board_of_review_district_data_year,cook_commissioner_district_data_year,cook_judicial_district_data_year,ward_data_year,community_area_data_year,police_district_data_year,central_business_district_data_year,school_data_year,tax_data_year +county,census_place,cook_board_of_review_district,cook_commissioner_district,cook_judicial_district,ward_num,community_area,police_district,central_business_district,school_elementary_district,tax_municipality +triad,census_tract,,,,,,,,school_secondary_district,tax_park_district +township,census_congressional_district,,,,,,,,school_unified_district,tax_library_district +nbhd,census_zcta,,,,,,,,,tax_fire_protection_district +tax_code,,,,,,,,,,tax_community_college_district +zip_code,,,,,,,,,,tax_sanitation_district +,,,,,,,,,,tax_special_service_area +,,,,,,,,,,tax_tif_district diff --git a/dbt/seeds/reporting/schema.yml b/dbt/seeds/reporting/schema.yml new file mode 100644 index 000000000..3dca39299 --- /dev/null +++ b/dbt/seeds/reporting/schema.yml @@ -0,0 +1,6 @@ +seeds: + - name: reporting.sot_data_years + description: '{{ doc("seed_sot_data_years") }}' + config: + column_types: + year: string