diff --git a/_shared_utils/shared_utils/gtfs_utils_v2.py b/_shared_utils/shared_utils/gtfs_utils_v2.py index f114bf3c1..61f669af4 100644 --- a/_shared_utils/shared_utils/gtfs_utils_v2.py +++ b/_shared_utils/shared_utils/gtfs_utils_v2.py @@ -233,8 +233,8 @@ def fill_in_metrolink_trips_df_with_shape_id( def get_transit_organizations_gtfs_dataset_keys( - keep_cols: list[str], custom_filtering: dict = None -): + keep_cols: list[str], custom_filtering: dict = None, get_df: bool = False +) -> Union[pd.DataFrame, siuba.sql.verbs.LazyTbl]: """ From Airtable GTFS datasets, get the datasets (and gtfs_dataset_key) for usable feeds. @@ -249,6 +249,9 @@ def get_transit_organizations_gtfs_dataset_keys( >> rename(gtfs_dataset_key="key") ) + if get_df: + dim_gtfs_datasets = dim_gtfs_datasets >> collect() + return dim_gtfs_datasets @@ -293,6 +296,7 @@ def schedule_daily_feed_to_organization( dim_gtfs_datasets = get_transit_organizations_gtfs_dataset_keys( keep_cols=["key", "name", "type", "regional_feed_type"], custom_filtering={"type": ["schedule"]}, + get_df=False, ) # Merge on gtfs_dataset_key to get organization name diff --git a/_shared_utils/shared_utils/portfolio_utils.py b/_shared_utils/shared_utils/portfolio_utils.py index 2201c2adb..8dee96a9a 100644 --- a/_shared_utils/shared_utils/portfolio_utils.py +++ b/_shared_utils/shared_utils/portfolio_utils.py @@ -10,11 +10,15 @@ need to import different pandas to add type hint for styler object """ +import base64 + +import dask.dataframe as dd +import dask_geopandas as dg import pandas as pd import pandas.io.formats.style # for type hint: https://github.com/pandas-dev/pandas/issues/24884 from calitp.tables import tbls from IPython.display import HTML -from shared_utils import rt_utils +from shared_utils import gtfs_utils_v2, rt_utils from siuba import * @@ -36,6 +40,54 @@ def clean_organization_name(df: pd.DataFrame) -> pd.DataFrame: return df +def decode_base64_url(row): + """ + Provide decoded version of URL as ASCII. + WeHo gets an incorrect padding, but urlsafe_b64decode works. + Just in case, return uri truncated. + """ + try: + decoded = base64.urlsafe_b64decode(row.base64_url).decode("ascii") + except base64.binascii.Error: + decoded = row.uri.split("?")[0] + + return decoded + + +def add_agency_identifiers(df: pd.DataFrame) -> pd.DataFrame: + """ + Find the current base64_url for the organization name and + decode it as ASCII (Chad Baker request for CKAN data). + The encoded version might not be as usable for users. + """ + dim_gtfs_datasets = gtfs_utils_v2.get_transit_organizations_gtfs_dataset_keys( + keep_cols=None, get_df=True + ) + + current_feeds = ( + dim_gtfs_datasets[ + (dim_gtfs_datasets.data_quality_pipeline == True) + & (dim_gtfs_datasets._is_current == True) + ] + .drop_duplicates(subset="name") + .reset_index(drop=True) + ) + + current_feeds2 = current_feeds.assign( + feed_url=current_feeds.apply(lambda x: decode_base64_url(x), axis=1) + ) + + df2 = pd.merge( + df, + current_feeds2[["name", "base64_url", "feed_url"]], + on="name", + how="inner", + validate="m:1", + ) + + return df2 + + # https://github.com/cal-itp/data-analyses/blob/main/bus_service_increase/E5_make_stripplot_data.py def add_caltrans_district() -> pd.DataFrame: """ @@ -87,16 +139,25 @@ def add_route_name(df: pd.DataFrame) -> pd.DataFrame: """ route_cols = ["route_id", "route_short_name", "route_long_name", "route_desc"] - if route_cols not in list(df.columns): + if not (set(route_cols).issubset(set(list(df.columns)))): raise ValueError(f"Input a df that contains {route_cols}") - df = df.assign(route_name_used=df.apply(lambda x: rt_utils.which_desc(x), axis=1)) + if isinstance(df, pd.DataFrame): + ddf = dd.from_pandas(df, npartitions=2) + elif isinstance(df, gpd.GeoDataFrame): + ddf = dg.from_geopandas(df, npartitions=2) - # If route names show up with leading comma - df = df.assign( - route_name_used=route_names.route_name_used.str.lstrip(",").str.strip() + ddf = ddf.assign( + route_name_used=ddf.apply( + lambda x: rt_utils.which_desc(x), axis=1, meta=("route_name_used", "str") + ) ) + df = ddf.compute() + + # If route names show up with leading comma + df = df.assign(route_name_used=df.route_name_used.str.lstrip(",").str.strip()) + return df diff --git a/_shared_utils/shared_utils/shared_data.py b/_shared_utils/shared_utils/shared_data.py index cfe74889b..8df3a46dd 100644 --- a/_shared_utils/shared_utils/shared_data.py +++ b/_shared_utils/shared_utils/shared_data.py @@ -4,11 +4,17 @@ import geopandas as gpd import pandas as pd +GCS_FILE_PATH = "gs://calitp-analytics-data/data-analyses/" + -# Function to set the county centroids and zoom levels -# used in folium and ipyleaflet maps def make_county_centroids(): - URL = "https://opendata.arcgis.com/datasets/8713ced9b78a4abb97dc130a691a8695_0.geojson" + """ + Find a county's centroids from county polygons. + """ + URL = ( + "https://opendata.arcgis.com/datasets/" + "8713ced9b78a4abb97dc130a691a8695_0.geojson" + ) gdf = gpd.read_file(URL).to_crs(geography_utils.CA_StatePlane) gdf.columns = gdf.columns.str.lower() @@ -46,17 +52,44 @@ def make_county_centroids(): print("County centroids dataset created") # Save as parquet, because lat/lon held in list, not point geometry anymore - gdf2.to_parquet( - "gs://calitp-analytics-data/data-analyses/ca_county_centroids.parquet" - ) + gdf2.to_parquet(f"{GCS_FILE_PATH}ca_county_centroids.parquet") print("County centroids exported to GCS") +def make_clean_state_highway_network(): + """ + Create State Highway Network dataset. + """ + HIGHWAY_URL = ( + "https://opendata.arcgis.com/datasets/" + "77f2d7ba94e040a78bfbe36feb6279da_0.geojson" + ) + gdf = gpd.read_file(HIGHWAY_URL) + + keep_cols = ["Route", "County", "District", "RouteType", "Direction", "geometry"] + + gdf = gdf[keep_cols] + print(f"# rows before dissolve: {len(gdf)}") + + # See if we can dissolve further - use all cols except geometry + # Should we dissolve further and use even longer lines? + dissolve_cols = [c for c in list(gdf.columns) if c != "geometry"] + + gdf2 = gdf.dissolve(by=dissolve_cols).reset_index() + print(f"# rows after dissolve: {len(gdf2)}") + + # Export to GCS + utils.geoparquet_gcs_export(gdf2, GCS_FILE_PATH, "state_highway_network") + + # Run functions to create these datasets...store in GCS if __name__ == "__main__": # Don't use from shared_utils import geography_utils # Those have other dependencies...like map_utils imports from geography_utils import geography_utils + import utils make_county_centroids() + + make_clean_state_highway_network() diff --git a/_shared_utils/shared_utils/shared_data_catalog.yml b/_shared_utils/shared_utils/shared_data_catalog.yml index d7b59388b..ddb698a2b 100644 --- a/_shared_utils/shared_utils/shared_data_catalog.yml +++ b/_shared_utils/shared_utils/shared_data_catalog.yml @@ -6,29 +6,34 @@ sources: description: CA county polygons args: urlpath: https://opendata.arcgis.com/datasets/8713ced9b78a4abb97dc130a691a8695_0.geojson + caltrans_districts: + driver: geojson + description: Caltrans district polygons + args: + urlpath: https://caltrans-gis.dot.ca.gov/arcgis/rest/services/CHboundary/District_Tiger_Lines/FeatureServer/0/query?outFields=*&where=1%3D1&f=geojson ca_county_centroids: - driver: geoparquet + driver: parquet description: CA county centroids args: # source: shared_utils/shared_data.py urlpath: gs://calitp-analytics-data/data-analyses/ca_county_centroids.parquet + state_highway_network: + driver: geoparquet + description: Cleaned State Highway Network + args: + # source: shared_utils/shared_data.py + urlpath: gs://calitp-analytics-data/data-analyses/state_highway_network.parquet ca_transit_routes: driver: geoparquet description: CA transit routes with line geometry at the operator-level (open data) args: - # source: traffic_ops/make_routes_stops_shapefiles.py.py + # source: traffic_ops/create_routes_data.py urlpath: gs://calitp-analytics-data/data-analyses/traffic_ops/ca_transit_routes.parquet - ca_transit_routes_feed: - driver: geoparquet - description: CA transit routes with line geometry at the feed-level (not on open data) - args: - # source: traffic_ops/make_routes_stops_shapefiles.py.py - urlpath: gs://calitp-analytics-data/data-analyses/traffic_ops/ca_transit_routes_feed.parquet ca_transit_stops: driver: geoparquet description: CA transit stops with point geometry (open data) args: - # source: traffic_ops/make_routes_stops_shapefiles.py.py + # source: traffic_ops/create_stops_data.py urlpath: gs://calitp-analytics-data/data-analyses/traffic_ops/ca_transit_stops.parquet hqta_stops: driver: geoparquet diff --git a/bus_service_increase/bus_service_utils/__init__.py b/bus_service_increase/bus_service_utils/__init__.py index 2a1926d47..0c38700e9 100644 --- a/bus_service_increase/bus_service_utils/__init__.py +++ b/bus_service_increase/bus_service_utils/__init__.py @@ -5,12 +5,9 @@ create_parallel_corridors, gtfs_build, #publish_single_report, - report_utils, utils, ) -__version__ = "0.1.1" - __all__ = [ "better_bus_utils", "calenviroscreen_lehd_utils", @@ -18,6 +15,5 @@ "create_parallel_corridors", "gtfs_build", #"publish_single_report", - "report_utils", "utils", ] \ No newline at end of file diff --git a/bus_service_increase/bus_service_utils/create_parallel_corridors.py b/bus_service_increase/bus_service_utils/create_parallel_corridors.py index adb764616..1c2071f2d 100644 --- a/bus_service_increase/bus_service_utils/create_parallel_corridors.py +++ b/bus_service_increase/bus_service_utils/create_parallel_corridors.py @@ -6,86 +6,64 @@ Processed data: run raw data through `create_parallel_corridors.py` export to GCS, put in catalog. -Transit Routes: Use `traffic_ops/export_shapefiles.py` that -creates `routes_assembled.parquet` in GCS, put in catalog. - -Instead of catalog[file_name].read(), put in GCS path -because it's easier to import once these utility functions need to -be importable across directories. +Transit Routes: Need a shapes level table with route info. +Usually, this can be achieved by merging `trips` and `shapes`. """ import geopandas as gpd import pandas as pd -import shared_utils -from bus_service_utils import utils +from typing import Literal + +from shared_utils import geography_utils, utils DATA_PATH = "./data/" -#--------------------------------------------------------# -### State Highway Network -#--------------------------------------------------------# -def clean_highways(): - HIGHWAY_URL = ("https://opendata.arcgis.com/datasets/" - "77f2d7ba94e040a78bfbe36feb6279da_0.geojson") - gdf = gpd.read_file(HIGHWAY_URL) - - keep_cols = ['Route', 'County', 'District', 'RouteType', - 'Direction', 'geometry'] - - gdf = gdf[keep_cols] - print(f"# rows before dissolve: {len(gdf)}") - - # See if we can dissolve further - use all cols except geometry - # Should we dissolve further and use even longer lines? - dissolve_cols = list(gdf.columns) - dissolve_cols.remove('geometry') - - gdf2 = gdf.dissolve(by=dissolve_cols).reset_index() - print(f"# rows after dissolve: {len(gdf2)}") - - # Export to GCS - shared_utils.utils.geoparquet_gcs_export(gdf2, - utils.GCS_FILE_PATH, - "state_highway_network") - - #--------------------------------------------------------# ### Functions to create overlay dataset + further cleaning #--------------------------------------------------------# -# Can this function be reworked to take a df? -def process_transit_routes(alternate_df: - gpd.GeoDataFrame = None) -> gpd.GeoDataFrame: - if alternate_df is None: - df = gpd.read_parquet( - f"{utils.GCS_FILE_PATH}2022_Jan/shapes_processed.parquet") - else: - df = alternate_df +def process_transit_routes( + df: gpd.GeoDataFrame, + warehouse_version: Literal["v1", "v2"] +) -> gpd.GeoDataFrame: + """ + At operator level, pick the route with the longest length + to overlay with SHN. + At operator level, sum up how many unique routes there are. + """ + + if warehouse_version == "v1": + operator_cols = ["calitp_itp_id"] + + elif warehouse_version == "v2": + operator_cols = ["feed_key"] ## Clean transit routes - df = df.to_crs(shared_utils.geography_utils.CA_StatePlane) - - # Transit routes included some extra info about - # route_long_name, route_short_name, agency_id - # Get it down to route_id instead of shape_id, pick longest shape - df2 = df.assign(route_length = df.geometry.length) - df3 = (df2.sort_values(["itp_id", "route_id", "route_length"], - ascending=[True, True, False]) - .drop_duplicates(subset=["itp_id", "route_id"]) - .reset_index(drop=True) - ) + df = df.assign( + route_length = df.to_crs( + geography_utils.CA_StatePlane).geometry.length + ).to_crs(geography_utils.CA_StatePlane) + + # Get it down to route_id and pick longest shape + df2 = (df.sort_values(operator_cols + ["route_id", "route_length"], + ascending = [True, True, False]) + .drop_duplicates(subset=operator_cols + ["route_id"]) + .reset_index(drop=True) + ) # Then count how many unique route_ids appear for operator (that's denominator) - keep_cols = [ - "itp_id", "route_id", "total_routes", - "route_length", "geometry" - ] + route_cols = ["route_id", "total_routes", "route_length", "geometry"] - df4 = df3.assign( - total_routes = df3.groupby("itp_id")["route_id"].transform( + if warehouse_version == "v2": + keep_cols = operator_cols + ["name"] + route_cols + else: + keep_cols = operator_cols + route_cols + + df3 = df2.assign( + total_routes = df2.groupby(operator_cols)["route_id"].transform( "nunique").astype("Int64") )[keep_cols] - return df4 + return df3 def prep_highway_directions_for_dissolve( @@ -95,8 +73,9 @@ def prep_highway_directions_for_dissolve( Put in a list of group_cols, and aggregate highway segments with the direction info up to the group_col level. ''' - df = (gpd.read_parquet(f"{utils.GCS_FILE_PATH}state_highway_network.parquet") - .to_crs(shared_utils.geography_utils.CA_StatePlane)) + df = (gpd.read_parquet("gs://calitp-analytics-data/data-analyses/" + "state_highway_network.parquet") + .to_crs(geography_utils.CA_StatePlane)) # Get dummies for direction # Can make data wide instead of long @@ -110,11 +89,17 @@ def prep_highway_directions_for_dissolve( df[c] = df.groupby(group_cols)[c].transform('max').astype(int) return df - - -def process_highways(buffer_feet: int = - shared_utils.geography_utils.FEET_PER_MI - ) -> gpd.GeoDataFrame: + + +def process_highways( + buffer_feet: int = geography_utils.FEET_PER_MI +) -> gpd.GeoDataFrame: + """ + For each highway, store what directions it runs in + as dummy variables. This will allow us to dissolve + the geometry and get fewer rows for highways + without losing direction info. + """ group_cols = ["Route", "County", "District", "RouteType"] df = prep_highway_directions_for_dissolve(group_cols) @@ -133,11 +118,12 @@ def process_highways(buffer_feet: int = df[direction_cols] = df[direction_cols].astype(int) return df - - + + def overlay_transit_to_highways( - hwy_buffer_feet: int = shared_utils.geography_utils.FEET_PER_MI, - alternate_df = None + hwy_buffer_feet: int = geography_utils.FEET_PER_MI, + transit_routes_df: gpd.GeoDataFrame = None, + warehouse_version: Literal["v1", "v2"] = "v2" ) -> gpd.GeoDataFrame: """ Function to find areas of intersection between @@ -149,17 +135,17 @@ def overlay_transit_to_highways( # Can pass a different buffer zone to determine parallel corridors highways = process_highways(buffer_feet = hwy_buffer_feet) - transit_routes = process_transit_routes(alternate_df) + transit_routes = process_transit_routes(transit_routes_df, warehouse_version) # Overlay # Note: an overlay based on intersection changes the geometry column # The new geometry column will reflect that area of intersection - gdf = gpd.overlay(transit_routes, - highways, - how = "intersection", - keep_geom_type = False - ) - + gdf = gpd.overlay( + transit_routes, + highways, + how = "intersection", + keep_geom_type = False + ) # Using new geometry column, calculate what % that intersection # is of the route and hwy @@ -185,12 +171,20 @@ def overlay_transit_to_highways( # Fix geometry - don't want the overlay geometry, which is intersection # Want to use a transit route's line geometry + if warehouse_version == "v1": + operator_cols = ["calitp_itp_id"] + + elif warehouse_version == "v2": + operator_cols = ["feed_key", "name"] + gdf2 = pd.merge( - transit_routes[["itp_id", "route_id", "geometry"]].drop_duplicates(), + transit_routes[operator_cols + ["route_id", "geometry"] + ].drop_duplicates(), gdf.drop(columns = "geometry"), - on = ["itp_id", "route_id"], + on = operator_cols + ["route_id"], how = "left", - # Allow 1:m merge because the same transit route can overlap with various highways + # Allow 1:m merge because the same transit route + # can overlap with various highways validate = "1:m", indicator=True ) @@ -198,9 +192,11 @@ def overlay_transit_to_highways( return gdf2 -def parallel_or_intersecting(df: gpd.GeoDataFrame, - pct_route_threshold: float = 0.5, - pct_highway_threshold: float = 0.1) -> gpd.GeoDataFrame: +def parallel_or_intersecting( + df: gpd.GeoDataFrame, + pct_route_threshold: float = 0.5, + pct_highway_threshold: float = 0.1 +) -> gpd.GeoDataFrame: # Play with various thresholds to decide how to designate parallel df = df.assign( @@ -212,45 +208,55 @@ def parallel_or_intersecting(df: gpd.GeoDataFrame, ) return df - + # Use this in notebook # Can pass different parameters if buffer or thresholds need adjusting -def make_analysis_data(hwy_buffer_feet= - shared_utils.geography_utils.FEET_PER_MI, - alternate_df = None, - pct_route_threshold = 0.5, - pct_highway_threshold = 0.1, - DATA_PATH = "", FILE_NAME = "parallel_or_intersecting" - ): +def make_analysis_data( + hwy_buffer_feet:int = geography_utils.FEET_PER_MI, + transit_routes_df: gpd.GeoDataFrame = None, + pct_route_threshold: float = 0.5, + pct_highway_threshold: float = 0.1, + data_path: str = "", + file_name: str = "parallel_or_intersecting", + warehouse_version: Literal["v1", "v2"] = "v2" +): ''' hwy_buffer_feet: int, number of feet to draw hwy buffers, defaults to 1 mile - alternate_df: None or pandas.DataFrame - can input an alternate df for transit routes - otherwise, use the `shapes_processed` from other analysis + transit_routes_df: a table, typically shapes, with route_id too pct_route_threshold: float, between 0-1 pct_highway_threshold: float, between 0-1 - DATA_PATH: str, file path for saving parallel transit routes dataset - FILE_NAME: str, file name for saving parallel transit routes dataset + data_path: str, file path for saving parallel transit routes dataset + file_name: str, file name for saving parallel transit routes dataset ''' # Get overlay between highway and transit routes # Draw buffer around highways - gdf = overlay_transit_to_highways(hwy_buffer_feet, alternate_df) + gdf = overlay_transit_to_highways( + hwy_buffer_feet, transit_routes_df, warehouse_version) # Remove transit routes that do not overlap with highways - gdf = gdf[gdf._merge!="left_only"].drop(columns = "_merge").reset_index(drop=True) + gdf = (gdf[gdf._merge!="left_only"] + .drop(columns = "_merge") + .reset_index(drop=True) + ) # Categorize whether route is parallel or intersecting based on threshold - gdf2 = parallel_or_intersecting(gdf, - pct_route_threshold, - pct_highway_threshold) + gdf2 = parallel_or_intersecting( + gdf, + pct_route_threshold, + pct_highway_threshold + ) - if "gs://" in DATA_PATH: - shared_utils.utils.geoparquet_gcs_export(gdf2, DATA_PATH, FILE_NAME) + if "gs://" in data_path: + utils.geoparquet_gcs_export( + gdf2, + data_path, + file_name + ) else: - gdf2.to_parquet(f"{DATA_PATH}{FILE_NAME}.parquet") - + gdf2.to_parquet(f"{data_path}{file_name}.parquet") + # For map, need highway to be 250 ft buffer #highways = process_highways(buffer_feet=250) #highways.to_parquet(f"{DATA_PATH}highways.parquet") \ No newline at end of file diff --git a/bus_service_increase/bus_service_utils/gtfs_build.py b/bus_service_increase/bus_service_utils/gtfs_build.py index 2195f2b34..c08e1ac5e 100644 --- a/bus_service_increase/bus_service_utils/gtfs_build.py +++ b/bus_service_increase/bus_service_utils/gtfs_build.py @@ -3,52 +3,9 @@ from `gtfs_utils` queries. """ import dask.dataframe as dd -import dask_geopandas as dg -import geopandas as gpd import pandas as pd from calitp.sql import to_snakecase -from typing import Literal, Union - -from shared_utils import geography_utils - -def merge_routes_trips( - routelines: Union[gpd.GeoDataFrame, dg.GeoDataFrame], - trips: Union[pd.DataFrame, dd.DataFrame], - merge_cols: list = ["calitp_itp_id", "calitp_url_number", "shape_id"], - crs: str = geography_utils.WGS84, - join: Literal["left", "inner", "outer", "right", "cross"] = "left", -) -> gpd.GeoDataFrame: - """ - Merge routes (which has shape_id, geometry) with trips - and returns a trips table with line geometry - """ - - routes = (routelines.drop_duplicates(subset=merge_cols) - .reset_index(drop=True) - [merge_cols + ["geometry"]] - ) - - if isinstance(routes, dg.GeoDataFrame): - trips_with_geom = dd.merge( - routes, - trips, - on = merge_cols, - how = join, - indicator=True - ).to_crs(crs).compute() - - else: - trips_with_geom = pd.merge( - routes, - trips, - on = merge_cols, - how = join, - indicator=True - ).to_crs(crs) - - return trips_with_geom - def aggregate_stat_by_group( df: dd.DataFrame, diff --git a/bus_service_increase/setup.py b/bus_service_increase/setup.py index e789ba9f1..a08dc6c23 100644 --- a/bus_service_increase/setup.py +++ b/bus_service_increase/setup.py @@ -3,7 +3,7 @@ setup( name="bus_service_utils", packages=find_packages(), - version="0.1.1", + version="1.0", description="Shared utility functions for bus service data analyses", author="Cal-ITP", license="Apache", diff --git a/daskify_rt/B2_avg_speeds_by_segment.py b/daskify_rt/B2_avg_speeds_by_segment.py new file mode 100644 index 000000000..456d8c2bc --- /dev/null +++ b/daskify_rt/B2_avg_speeds_by_segment.py @@ -0,0 +1,110 @@ +""" +Quick aggregation for avg speeds by segment +""" +import dask.dataframe as dd +import geopandas as gpd +import pandas as pd + +from shared_utils import utils + +GCS_FILE_PATH = "gs://calitp-analytics-data/data-analyses/" +DASK_TEST = f"{GCS_FILE_PATH}dask_test/" + +analysis_date = "2022-10-12" + +def avg_speeds_with_segment_geom( + analysis_date: str, + max_speed_cutoff: int = 70 +) -> gpd.GeoDataFrame: + """ + Import the segment-trip table. + Average the speed_mph across all trips present in the segment. + """ + df = dd.read_parquet( + f"{DASK_TEST}speeds_{analysis_date}/") + + # Take the average after dropping unusually high speeds + segment_cols = ["calitp_itp_id", "route_dir_identifier", + "segment_sequence"] + + avg_speeds = (df[(df.speed_mph <= max_speed_cutoff)].compute() + .groupby(segment_cols) + .agg({ + "speed_mph": "mean", + "trip_id": "nunique" + }).reset_index() + ) + + # Clean up for map + avg_speeds = avg_speeds.assign( + speed_mph = avg_speeds.speed_mph.round(2), + ).rename(columns = {"trip_id": "n_trips"}) + + # Merge in segment geometry + segments = gpd.read_parquet( + f"{DASK_TEST}longest_shape_segments.parquet", + columns = segment_cols + ["geometry", "geometry_arrowized"] + ).drop_duplicates().reset_index(drop=True) + + segments2 = segments.set_geometry("geometry_arrowized").drop( + columns = "geometry") + segments2.crs = segments.crs + + gdf = pd.merge( + segments2[~segments2.geometry_arrowized.is_empty], + avg_speeds, + on = segment_cols, + how = "inner" + ) + + return gdf + + + +if __name__ == "__main__": + URL = ("https://caltrans-gis.dot.ca.gov/arcgis/rest/services/CHboundary/" + "District_Tiger_Lines/FeatureServer/0/query?" + "outFields=*&where=1%3D1&f=geojson" + ) + + caltrans_districts = gpd.read_file(URL)[["DISTRICT", "geometry"]] + + district_name_dict = { + 1: "District 1 - Eureka", + 2: "District 2 - Redding", + 3: "District 3 - Marysville", + 4: "District 4 - Oakland", + 5: "District 5 - San Luis Obispo", + 6: "District 6 - Fresno", + 7: "District 7 - Los Angeles", + 8: "District 8 - San Bernardino", + 9: "District 9 - Bishop", + 10: "District 10 - Stockton", + 11: "District 11 - San Diego", + 12: "District 12 - Irvine", + } + + caltrans_districts = caltrans_districts.assign( + district_name = caltrans_districts.DISTRICT.map(district_name_dict) + ).rename(columns = {"DISTRICT": "district"}) + + # Average the speeds for segment for entire day + # Drop speeds above our max cutoff + gdf = avg_speeds_with_segment_geom( + analysis_date, + max_speed_cutoff = 70 + ) + + # Spatial join to Caltrans district + gdf2 = gpd.sjoin( + gdf, + caltrans_districts.to_crs(gdf.crs), + how = "inner", + predicate = "intersects" + ).drop(columns = "index_right") + + utils.geoparquet_gcs_export( + gdf2, + DASK_TEST, + f"avg_speeds_{analysis_date}" + ) diff --git a/daskify_rt/ca_segment_speeds.ipynb b/daskify_rt/ca_segment_speeds.ipynb index e3d463e67..5c3732d26 100644 --- a/daskify_rt/ca_segment_speeds.ipynb +++ b/daskify_rt/ca_segment_speeds.ipynb @@ -2,14 +2,9 @@ "cells": [ { "cell_type": "markdown", - "id": "8e469eec-03e7-4f34-8c3e-e3934735e1c7", + "id": "2ac2d210-7493-49c0-9fc4-baddc4978814", "metadata": {}, - "source": [ - "# Goal: Statewide Daily Speeds by Segments\n", - "\n", - "* v1 warehouse (10/12/22)\n", - "* calculate time-of-day averages" - ] + "source": [] }, { "cell_type": "code", @@ -18,122 +13,78 @@ "metadata": {}, "outputs": [], "source": [ + "%%capture\n", "import warnings\n", "warnings.filterwarnings(\"ignore\")\n", "\n", "import branca\n", - "import dask.dataframe as dd\n", + "import calitp.magics\n", "import geopandas as gpd\n", "import pandas as pd\n", "\n", - "#from shared_utils import rt_utils\n", - "\n", "GCS_FILE_PATH = \"gs://calitp-analytics-data/data-analyses/\"\n", "DASK_TEST = f\"{GCS_FILE_PATH}dask_test/\"\n", "\n", "analysis_date = \"2022-10-12\"\n", "\n", - "\n", "ZERO_FIFTY_COLORSCALE = branca.colormap.step.RdYlGn_10.scale(\n", " vmin=0, \n", " vmax=50\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "96b9b53d-6bbc-408d-813f-1306fb6d04fe", - "metadata": {}, - "outputs": [], - "source": [ - "def avg_speeds_with_segment_geom(\n", - " analysis_date: str, \n", - " max_speed_cutoff: int = 70\n", - ") -> gpd.GeoDataFrame: \n", - " \"\"\"\n", - " Import the segment-trip table. \n", - " Average the speed_mph across all trips present in the segment.\n", - " \"\"\"\n", - " # Test on BBB for now and get arrowizing right - double check \n", - " # where it's running in both directions that you can see both lines\n", - " df = dd.read_parquet(\n", - " f\"{DASK_TEST}speeds_{analysis_date}/\")\n", - " \n", - " # Take the average after dropping unusually high speeds\n", - " segment_cols = [\"calitp_itp_id\", \"route_dir_identifier\", \n", - " \"segment_sequence\"]\n", - " \n", - " avg_speeds = (df[(df.speed_mph <= max_speed_cutoff)].compute()\n", - " .groupby(segment_cols)\n", - " .agg({\n", - " \"speed_mph\": \"mean\",\n", - " \"trip_id\": \"nunique\"\n", - " }).reset_index()\n", - " )\n", - " \n", - " # Clean up for map\n", - " avg_speeds = avg_speeds.assign(\n", - " speed_mph = avg_speeds.speed_mph.round(2),\n", - " ).rename(columns = {\"trip_id\": \"n_trips\"})\n", - " \n", - " # Merge in segment geometry\n", - " segments = gpd.read_parquet(\n", - " f\"{DASK_TEST}longest_shape_segments.parquet\",\n", - " columns = segment_cols + [\"geometry\", \"geometry_arrowized\"]\n", - " ).drop_duplicates().reset_index(drop=True)\n", - " \n", - " segments2 = segments.set_geometry(\"geometry_arrowized\")\n", - " segments2.crs = segments.crs\n", - " \n", - " gdf = pd.merge(\n", - " segments2[~segments2.geometry_arrowized.is_empty], \n", - " avg_speeds,\n", - " on = segment_cols,\n", - " how = \"inner\"\n", - " )\n", - " \n", - " return gdf" + ")\n", + "\n", + "district_name_dict = {\n", + " 1: \"District 1 - Eureka\", \n", + " 2: \"District 2 - Redding\", \n", + " 3: \"District 3 - Marysville\", \n", + " 4: \"District 4 - Oakland\", \n", + " 5: \"District 5 - San Luis Obispo\", \n", + " 6: \"District 6 - Fresno\", \n", + " 7: \"District 7 - Los Angeles\", \n", + " 8: \"District 8 - San Bernardino\", \n", + " 9: \"District 9 - Bishop\", \n", + " 10: \"District 10 - Stockton\", \n", + " 11: \"District 11 - San Diego\", \n", + " 12: \"District 12 - Irvine\", \n", + "}" ] }, { "cell_type": "code", "execution_count": null, - "id": "eb940c3b-cdc8-40a9-885d-bc7e6013bd9a", + "id": "4e37258d-b752-467f-948c-6a2d5391e8b0", "metadata": {}, "outputs": [], "source": [ - "URL = \"https://caltrans-gis.dot.ca.gov/arcgis/rest/services/CHboundary/District_Tiger_Lines/FeatureServer/0/query?outFields=*&where=1%3D1&f=geojson\"\n", - "caltrans_districts = gpd.read_file(URL)[\n", - " [\"DISTRICT\", \"geometry\"]]" + "# Read in data\n", + "\n", + "gdf = gpd.read_parquet(f\"{DASK_TEST}avg_speeds_{analysis_date}.parquet\")\n", + "gdf = gdf[gdf.district==district].reset_index(drop=True)\n", + "\n", + "# Use dict to key into district name because D9 is missing data\n", + "district_name = district_name_dict[district]" ] }, { "cell_type": "code", "execution_count": null, - "id": "356fa070-ed53-4740-a0cd-06abc624d93f", + "id": "7a222bbd-4fb0-43f4-b189-63edc35ce1f8", "metadata": {}, "outputs": [], "source": [ - "gdf = avg_speeds_with_segment_geom(\n", - " analysis_date, \n", - " max_speed_cutoff = 70\n", - ")" + "%%capture_parameters\n", + "district, district_name" ] }, { - "cell_type": "code", - "execution_count": null, - "id": "1a209889-0e75-4f05-8393-2a2fbfba17c4", + "cell_type": "markdown", + "id": "0dda7c36-29f0-457f-9932-f2277d2c8ea1", "metadata": {}, - "outputs": [], "source": [ - "gdf2 = gpd.sjoin(\n", - " gdf, \n", - " caltrans_districts.to_crs(gdf.crs),\n", - " how = \"inner\",\n", - " predicate = \"intersects\"\n", - " ).drop(columns = \"index_right\")" + "# {district_name}\n", + "\n", + "* Goal: statewide map\n", + "* v1 warehouse (10/12/22)\n", + "* calculate time-of-day averages" ] }, { @@ -155,11 +106,8 @@ "outputs": [], "source": [ "def make_map(gdf: gpd.GeoDataFrame, district: int):\n", - " subset = gdf[gdf.DISTRICT == district\n", - " ].drop(columns = \"geometry\")\n", - " \n", - " if len(subset) > 0:\n", - " m = subset.explore(\n", + " if len(gdf) > 0:\n", + " m = gdf.explore(\n", " \"speed_mph\",\n", " tiles = \"CartoDB Positron\",\n", " cmap = ZERO_FIFTY_COLORSCALE\n", @@ -170,14 +118,6 @@ " print(f\"No RT trip info available in district {district}.\")" ] }, - { - "cell_type": "markdown", - "id": "97fba8c6-f1e2-45b3-a7d3-dc336a1eabad", - "metadata": {}, - "source": [ - "## District 1" - ] - }, { "cell_type": "code", "execution_count": null, @@ -185,226 +125,8 @@ "metadata": {}, "outputs": [], "source": [ - "d = 1\n", - "make_map(gdf2, d)" - ] - }, - { - "cell_type": "markdown", - "id": "4dfadcf3-305c-41d2-b2db-497ff3f6b889", - "metadata": {}, - "source": [ - "## District 2" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "012b32c8-7db4-4a65-9d99-a85b50a83799", - "metadata": {}, - "outputs": [], - "source": [ - "d = 2\n", - "make_map(gdf2, d)" + "make_map(gdf, district)" ] - }, - { - "cell_type": "markdown", - "id": "ea6a12af-954b-4f6d-818d-ff29731ada8f", - "metadata": {}, - "source": [ - "## District 3" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "d69dac93-5fed-4cbd-89a7-a6092048b49d", - "metadata": {}, - "outputs": [], - "source": [ - "d = 3\n", - "make_map(gdf2, d)" - ] - }, - { - "cell_type": "markdown", - "id": "63ab9f01-c8df-4356-a625-381061541d03", - "metadata": {}, - "source": [ - "## District 4" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "86d945b6-dfbc-43ea-9ab8-70b4233d251d", - "metadata": {}, - "outputs": [], - "source": [ - "d = 4\n", - "make_map(gdf2, d)" - ] - }, - { - "cell_type": "markdown", - "id": "b0cdd64d-0fcf-49fb-9074-baf47c1f752c", - "metadata": {}, - "source": [ - "## District 5" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "adf84bde-ba07-4747-8bf6-e3ac19e014d3", - "metadata": {}, - "outputs": [], - "source": [ - "d = 5\n", - "make_map(gdf2, d)" - ] - }, - { - "cell_type": "markdown", - "id": "f3cdbcd8-85a0-4268-844c-85a2f82bb3c0", - "metadata": {}, - "source": [ - "## District 6" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "6d077236-cc7a-4a57-b0a2-18669ee4b0ef", - "metadata": {}, - "outputs": [], - "source": [ - "d = 6\n", - "make_map(gdf2, d)" - ] - }, - { - "cell_type": "markdown", - "id": "09296410-a025-4ce5-864d-12877e59208e", - "metadata": {}, - "source": [ - "## District 7" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ac0b094d-2384-4c6c-afba-3a4585c02500", - "metadata": {}, - "outputs": [], - "source": [ - "d = 7\n", - "make_map(gdf2, d)" - ] - }, - { - "cell_type": "markdown", - "id": "315096cf-adb5-4619-96ec-21c194a02607", - "metadata": {}, - "source": [ - "## District 8" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "64d3dd7f-be89-4126-b3cc-9e7638891abc", - "metadata": {}, - "outputs": [], - "source": [ - "d = 8\n", - "make_map(gdf2, d)" - ] - }, - { - "cell_type": "markdown", - "id": "a1e497c3-e702-4e98-9a89-9ab59644c20a", - "metadata": {}, - "source": [ - "## District 9" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e88fc57e-a775-42fa-b45d-7ba206c46940", - "metadata": {}, - "outputs": [], - "source": [ - "d = 9\n", - "make_map(gdf2, d)" - ] - }, - { - "cell_type": "markdown", - "id": "861acaea-61e8-4ac7-9a1d-f91e640a6cab", - "metadata": {}, - "source": [ - "## District 10" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e643e533-f9cf-40da-af43-2cc46afe73ce", - "metadata": {}, - "outputs": [], - "source": [ - "d = 10\n", - "make_map(gdf2, d)" - ] - }, - { - "cell_type": "markdown", - "id": "ab4433e7-4445-4f69-95a4-40a9dabd64d6", - "metadata": {}, - "source": [ - "## District 11" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "62aa1344-5c27-48ba-b755-a6f7d2927762", - "metadata": {}, - "outputs": [], - "source": [ - "d = 11\n", - "make_map(gdf2, d)" - ] - }, - { - "cell_type": "markdown", - "id": "8f7bc19f-a199-4efe-a62b-dfe09d5bd90d", - "metadata": {}, - "source": [ - "## District 12" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a9d477c8-d364-41c9-9f48-d1dd59996782", - "metadata": {}, - "outputs": [], - "source": [ - "d = 12\n", - "make_map(gdf2, d)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "29397c96-d823-484b-9e58-98e308eeef0a", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/high_quality_transit_areas/download_shapes.py b/high_quality_transit_areas/download_shapes.py index 69132fa2c..d9d356253 100644 --- a/high_quality_transit_areas/download_shapes.py +++ b/high_quality_transit_areas/download_shapes.py @@ -15,9 +15,6 @@ from update_vars import analysis_date, COMPILED_CACHED_VIEWS if __name__ == "__main__": - #from dask.distributed import Client - - #client = Client("dask-scheduler.dask.svc.cluster.local:8786") logger.add("./logs/download_data.log", retention="3 months") logger.add(sys.stderr, diff --git a/high_quality_transit_areas/download_stop_times.py b/high_quality_transit_areas/download_stop_times.py index 2bbd00aa7..2d30022ec 100644 --- a/high_quality_transit_areas/download_stop_times.py +++ b/high_quality_transit_areas/download_stop_times.py @@ -15,43 +15,7 @@ from update_vars import analysis_date, COMPILED_CACHED_VIEWS -def feeds_across_trips_shapes_stops(analysis_date: str) -> list: - """ - Return only the feeds that are in common - across the trips, shapes, and stops tables. - - Use this to subset the trips table down when - inputting that for stop_times. - - Bug: shapes doesn't have Metrolink feeds, because shape_array_keys are None. - """ - keep_cols = ["feed_key"] - - trip_feeds = pd.read_parquet( - f"{COMPILED_CACHED_VIEWS}trips_{analysis_date}.parquet", - columns = keep_cols - ).drop_duplicates().feed_key.unique().tolist() - - stop_feeds = gpd.read_parquet( - f"{COMPILED_CACHED_VIEWS}stops_{analysis_date}.parquet", - columns = keep_cols - ).drop_duplicates().feed_key.unique().tolist() - - shape_feeds = gpd.read_parquet( - f"{COMPILED_CACHED_VIEWS}routelines_{analysis_date}.parquet", - columns = keep_cols - ).drop_duplicates().feed_key.unique().tolist() - - feeds_in_common = list(set(trip_feeds) & set(shape_feeds) & set(stop_feeds)) - - return feeds_in_common - - -if __name__=="__main__": - # Connect to dask distributed client, put here so it only runs for this script - #from dask.distributed import Client - - #client = Client("dask-scheduler.dask.svc.cluster.local:8786") +if __name__=="__main__": logger.add("./logs/download_data.log", retention="3 months") logger.add(sys.stderr, @@ -65,16 +29,9 @@ def feeds_across_trips_shapes_stops(analysis_date: str) -> list: f"{COMPILED_CACHED_VIEWS}trips_{analysis_date}.parquet") FEEDS_TO_RUN = full_trips.feed_key.unique().tolist() - #FEEDS_TO_RUN = feeds_across_trips_shapes_stops(analysis_date) logger.info(f"# operators to run: {len(FEEDS_TO_RUN)}") - # There may be some feeds missing with complete info - # subset it now -- should we drop Metrolink knowingly? - #trips_on_day = full_trips[ - # full_trips.feed_key.isin(FEEDS_TO_RUN) - #].reset_index(drop=True) - # st already used, keep for continuity dataset = "st" logger.info(f"*********** Download {dataset} data ***********") @@ -98,6 +55,4 @@ def feeds_across_trips_shapes_stops(analysis_date: str) -> list: f"{COMPILED_CACHED_VIEWS}{dataset}_{analysis_date}.parquet") end = dt.datetime.now() - logger.info(f"execution time: {end-start}") - - #client.close() \ No newline at end of file + logger.info(f"execution time: {end-start}") \ No newline at end of file diff --git a/high_quality_transit_areas/download_stops.py b/high_quality_transit_areas/download_stops.py index 487e1d161..c8d248898 100644 --- a/high_quality_transit_areas/download_stops.py +++ b/high_quality_transit_areas/download_stops.py @@ -14,10 +14,7 @@ from shared_utils import gtfs_utils_v2, geography_utils, utils from update_vars import analysis_date, COMPILED_CACHED_VIEWS -if __name__ == "__main__": - #from dask.distributed import Client - - #client = Client("dask-scheduler.dask.svc.cluster.local:8786") +if __name__ == "__main__": logger.add("./logs/download_data.log", retention="3 months") logger.add(sys.stderr, diff --git a/high_quality_transit_areas/download_trips.py b/high_quality_transit_areas/download_trips.py index 86ad9f491..65bcd4f89 100644 --- a/high_quality_transit_areas/download_trips.py +++ b/high_quality_transit_areas/download_trips.py @@ -16,11 +16,6 @@ if __name__=="__main__": - # Connect to dask distributed client, put here so it only runs - # for this script - #from dask.distributed import Client - - #client = Client("dask-scheduler.dask.svc.cluster.local:8786") logger.add("./logs/download_data.log", retention="3 months") logger.add(sys.stderr, @@ -61,6 +56,4 @@ trips.to_parquet(f"{COMPILED_CACHED_VIEWS}{dataset}_{analysis_date}.parquet") end = dt.datetime.now() - logger.info(f"execution time: {end-start}") - - #client.close() + logger.info(f"execution time: {end-start}") \ No newline at end of file diff --git a/high_quality_transit_areas/logs/download_data.log b/high_quality_transit_areas/logs/download_data.log index 11af8e4c4..606d149f9 100644 --- a/high_quality_transit_areas/logs/download_data.log +++ b/high_quality_transit_areas/logs/download_data.log @@ -6083,3 +6083,38 @@ 2023-01-19 14:06:35.243 | INFO | __main__::35 - # operators to run: 184 2023-01-19 14:06:35.243 | INFO | __main__::38 - *********** Download stops data *********** 2023-01-19 14:06:46.436 | INFO | __main__::66 - execution time: 0:00:11.868638 +2023-01-25 11:48:05.568 | INFO | __main__::30 - Analysis date: 2023-01-18 +2023-01-25 11:48:06.269 | INFO | __main__::38 - # operators to run: 184 +2023-01-25 11:48:06.270 | INFO | __main__::41 - *********** Download trips data *********** +2023-01-25 11:48:21.115 | INFO | __main__::64 - execution time: 0:00:15.520630 +2023-01-25 11:48:34.668 | INFO | __main__::27 - Analysis date: 2023-01-18 +2023-01-25 11:48:35.332 | INFO | __main__::35 - # operators to run: 184 +2023-01-25 11:48:35.332 | INFO | __main__::38 - *********** Download stops data *********** +2023-01-25 11:48:46.859 | INFO | __main__::66 - execution time: 0:00:12.190804 +2023-01-25 11:49:00.076 | INFO | __main__::27 - Analysis date: 2023-01-18 +2023-01-25 11:49:00.754 | INFO | __main__::35 - # operators to run: 184 +2023-01-25 11:49:00.755 | INFO | __main__::39 - *********** Download routelines data *********** +2023-01-25 11:51:19.247 | INFO | __main__::66 - execution time: 0:02:19.170782 +2023-01-25 11:51:33.130 | INFO | __main__::61 - Analysis date: 2023-01-18 +2023-01-25 11:51:34.279 | INFO | __main__::70 - # operators to run: 140 +2023-01-25 11:51:34.279 | INFO | __main__::80 - *********** Download st data *********** +2023-01-25 11:53:14.879 | INFO | __main__::101 - execution time: 0:01:41.748688 +2023-01-25 12:03:18.625 | INFO | __main__::30 - Analysis date: 2023-01-18 +2023-01-25 12:03:19.338 | INFO | __main__::38 - # operators to run: 186 +2023-01-25 12:03:19.340 | INFO | __main__::41 - *********** Download trips data *********** +2023-01-25 12:03:38.467 | INFO | __main__::64 - execution time: 0:00:19.835970 +2023-01-25 12:05:23.912 | INFO | __main__::27 - Analysis date: 2023-01-18 +2023-01-25 12:05:24.584 | INFO | __main__::35 - # operators to run: 186 +2023-01-25 12:05:24.584 | INFO | __main__::38 - *********** Download stops data *********** +2023-01-25 12:05:36.637 | INFO | __main__::66 - execution time: 0:00:12.723930 +2023-01-25 12:05:49.876 | INFO | __main__::27 - Analysis date: 2023-01-18 +2023-01-25 12:05:50.544 | INFO | __main__::35 - # operators to run: 186 +2023-01-25 12:05:50.545 | INFO | __main__::39 - *********** Download routelines data *********** +2023-01-25 12:10:12.929 | INFO | __main__::66 - execution time: 0:04:23.052629 +2023-01-25 12:10:25.776 | INFO | __main__::61 - Analysis date: 2023-01-18 +2023-01-25 12:10:26.990 | INFO | __main__::70 - # operators to run: 142 +2023-01-25 12:10:26.991 | INFO | __main__::80 - *********** Download st data *********** +2023-01-25 12:40:58.427 | INFO | __main__::25 - Analysis date: 2023-01-18 +2023-01-25 12:40:59.702 | INFO | __main__::33 - # operators to run: 142 +2023-01-25 12:40:59.702 | INFO | __main__::37 - *********** Download st data *********** +2023-01-25 12:42:28.283 | INFO | __main__::58 - execution time: 0:01:29.855088 diff --git a/high_quality_transit_areas/logs/operators_for_hqta.log b/high_quality_transit_areas/logs/operators_for_hqta.log index 1adda76a8..c2b5f26a4 100644 --- a/high_quality_transit_areas/logs/operators_for_hqta.log +++ b/high_quality_transit_areas/logs/operators_for_hqta.log @@ -7526,3 +7526,7 @@ 2023-01-19 13:36:10.594 | INFO | __main__::161 - get list of cached ITP IDs: 0:00:00.560371 2023-01-19 13:36:12.533 | INFO | __main__::177 - check files for completeness, save as json: 0:00:01.938918 2023-01-19 13:36:12.534 | INFO | __main__::180 - execution time: 0:00:02.499858 +2023-01-25 11:53:50.213 | INFO | __main__::123 - Analysis date: 2023-01-18 +2023-01-25 11:53:50.831 | INFO | __main__::133 - get list of cached ITP IDs: 0:00:00.578116 +2023-01-25 11:53:52.752 | INFO | __main__::149 - check files for completeness, save as json: 0:00:01.921301 +2023-01-25 11:53:52.753 | INFO | __main__::152 - execution time: 0:00:02.499976 diff --git a/high_quality_transit_areas/operators_for_hqta.py b/high_quality_transit_areas/operators_for_hqta.py index 6fff23d3b..288a20d00 100644 --- a/high_quality_transit_areas/operators_for_hqta.py +++ b/high_quality_transit_areas/operators_for_hqta.py @@ -47,12 +47,9 @@ def scheduled_operators_for_hqta(analysis_date: str): feed_option = "use_subfeeds" ) - exclude = ["Amtrak Schedule"] - keep_cols = ["feed_key", "name"] - operators_to_include = all_operators[ - ~all_operators.name.isin(exclude)][keep_cols] + operators_to_include = all_operators[keep_cols] # There shouldn't be any duplicates by name, since we got rid # of precursor feeds. But, just in case, don't allow dup names. @@ -66,40 +63,15 @@ def scheduled_operators_for_hqta(analysis_date: str): return operators_to_include - -def feed_keys_ran(analysis_date: str, - gcs_folder: str = COMPILED_CACHED_VIEWS) -> list: - """ - Find the feed_keys that already have cached files in GCS - """ - all_files = fs.ls(gcs_folder) - files_for_date = [f for f in all_files if str(analysis_date) in all_files] - - def feeds_from_dataset( - dataset: Literal["trips", "routelines", "stops"], - *args, **kwargs - ) -> list: - # Can use pd because we only keep feed_key...so there's no geometry col - df = pd.read_parquet( - f"{gcs_folder}{dataset}_{analysis_date}.parquet", - columns = ["feed_key"] - ).drop_duplicates() - - return df.feed_key.tolist() - trip_feeds = feeds_from_dataset("trips") - shape_feeds = feeds_from_dataset("routelines") - stop_feeds = feeds_from_dataset("stops") - - # The set of all the feeds are the feeds in common - # BUG / TODO: shape_feeds has 1 less feed...missing Metrolink - already_ran = list(set(trip_feeds) & set(shape_feeds) & set(stop_feeds)) - - return already_ran - - def name_feed_key_dict_to_json(operators_df: pd.DataFrame, file: str): + # Exclude Amtrak from making it into JSON + # (we want the data downloaded, but not continue on in the analysis) + exclude = ["Amtrak Schedule"] + operators_df = operators_df[~operators_df.name.isin(exclude) + ].reset_index(drop=True) + # Put name as the key, in case feed_key for operator changes # over time, we still have a way to catalog this name_feed_key_dict = dict(zip(operators_df.name, operators_df.feed_key)) diff --git a/open_data/cleanup.py b/open_data/cleanup.py new file mode 100644 index 000000000..752245e8b --- /dev/null +++ b/open_data/cleanup.py @@ -0,0 +1,36 @@ +""" +Clean up and remove local files created in the process. +""" +import os + +import open_data +from gcs_to_esri import remove_zipped_shapefiles + +def delete_old_xml_check_in_new_xml(open_data_dict: dict): + """ + After the XML file has been moved locally to sync with + ESRI, no longer need 2 versions of the XML: + + (1) metadata_xml/ + (2) metadata_xml/run_in_esri/ + + Just move the one from metadata_xml/run_in_esri/ out and + check that one into GitHub. + """ + + for key, value in open_data_dict.items(): + file_path = value["path"] + file_name = os.path.basename(file_path) + os.replace(f"./metadata_xml/run_in_esri/{file_name}", + f"./metadata_xml/{file_name}") + + + +if __name__=="__main__": + # Delete the XML before it was updated, and move the + # updated XML out of the run_in_esri/ directory + delete_old_xml_check_in_new_xml(open_data.OPEN_DATA) + + # Clean up local files + remove_zipped_shapefiles() + print("Remove local zipped shapefiles") \ No newline at end of file diff --git a/portfolio/segment_speeds/0__ca_segment_speeds__.ipynb b/portfolio/segment_speeds/0__ca_segment_speeds__.ipynb deleted file mode 100644 index 56b297f1a..000000000 --- a/portfolio/segment_speeds/0__ca_segment_speeds__.ipynb +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:6b22e63dae84416ad4f94b0569596b8f7ea0279ae48636fdd57571a42c6e553f -size 92569056 diff --git a/portfolio/segment_speeds/_toc.yml b/portfolio/segment_speeds/_toc.yml index 7b3d0dc32..8f4324e3b 100644 --- a/portfolio/segment_speeds/_toc.yml +++ b/portfolio/segment_speeds/_toc.yml @@ -2,5 +2,16 @@ format: jb-book parts: - caption: null chapters: - - file: 0__ca_segment_speeds__.ipynb + - file: district_1/0__ca_segment_speeds__district_1.ipynb + - file: district_2/0__ca_segment_speeds__district_2.ipynb + - file: district_3/0__ca_segment_speeds__district_3.ipynb + - file: district_4/0__ca_segment_speeds__district_4.ipynb + - file: district_5/0__ca_segment_speeds__district_5.ipynb + - file: district_6/0__ca_segment_speeds__district_6.ipynb + - file: district_7/0__ca_segment_speeds__district_7.ipynb + - file: district_8/0__ca_segment_speeds__district_8.ipynb + - file: district_9/0__ca_segment_speeds__district_9.ipynb + - file: district_10/0__ca_segment_speeds__district_10.ipynb + - file: district_11/0__ca_segment_speeds__district_11.ipynb + - file: district_12/0__ca_segment_speeds__district_12.ipynb root: README diff --git a/portfolio/segment_speeds/district_1/0__ca_segment_speeds__district_1.ipynb b/portfolio/segment_speeds/district_1/0__ca_segment_speeds__district_1.ipynb new file mode 100644 index 000000000..4d66e6c63 --- /dev/null +++ b/portfolio/segment_speeds/district_1/0__ca_segment_speeds__district_1.ipynb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:1c0fc16cc45c71465f9b8bde5e91120dea913e94f37043fd51ec3956c1484f95 +size 2926462 diff --git a/portfolio/segment_speeds/district_10/0__ca_segment_speeds__district_10.ipynb b/portfolio/segment_speeds/district_10/0__ca_segment_speeds__district_10.ipynb new file mode 100644 index 000000000..4c27c2aa0 --- /dev/null +++ b/portfolio/segment_speeds/district_10/0__ca_segment_speeds__district_10.ipynb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5b36b6c4a8fd0769d5b2e3cd159f8280d1161142e06fad2b7ed3f16f554ecbca +size 2932265 diff --git a/portfolio/segment_speeds/district_11/0__ca_segment_speeds__district_11.ipynb b/portfolio/segment_speeds/district_11/0__ca_segment_speeds__district_11.ipynb new file mode 100644 index 000000000..3707c352f --- /dev/null +++ b/portfolio/segment_speeds/district_11/0__ca_segment_speeds__district_11.ipynb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:283b125fd560a7d92c0099924c3c008907b2736f5425484f4a312f3002ccde0c +size 4253394 diff --git a/portfolio/segment_speeds/district_12/0__ca_segment_speeds__district_12.ipynb b/portfolio/segment_speeds/district_12/0__ca_segment_speeds__district_12.ipynb new file mode 100644 index 000000000..5fcb08c4b --- /dev/null +++ b/portfolio/segment_speeds/district_12/0__ca_segment_speeds__district_12.ipynb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:66748da7a6e7181db5ecba0171db95f8d4d764465e60f58c52df651d04409de1 +size 6053008 diff --git a/portfolio/segment_speeds/district_2/0__ca_segment_speeds__district_2.ipynb b/portfolio/segment_speeds/district_2/0__ca_segment_speeds__district_2.ipynb new file mode 100644 index 000000000..a1b67397f --- /dev/null +++ b/portfolio/segment_speeds/district_2/0__ca_segment_speeds__district_2.ipynb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ecf7ca0cc854e0b3a2822202bc8bc90f6d5a3441e91fde8d63919e7fbe26b730 +size 841255 diff --git a/portfolio/segment_speeds/district_3/0__ca_segment_speeds__district_3.ipynb b/portfolio/segment_speeds/district_3/0__ca_segment_speeds__district_3.ipynb new file mode 100644 index 000000000..6bb8e302d --- /dev/null +++ b/portfolio/segment_speeds/district_3/0__ca_segment_speeds__district_3.ipynb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:1e7441efc54bde4e72d968c0e24c69ee93a01426a4d7346ec9255992ebc22265 +size 7445433 diff --git a/portfolio/segment_speeds/district_4/0__ca_segment_speeds__district_4.ipynb b/portfolio/segment_speeds/district_4/0__ca_segment_speeds__district_4.ipynb new file mode 100644 index 000000000..e260c4dae --- /dev/null +++ b/portfolio/segment_speeds/district_4/0__ca_segment_speeds__district_4.ipynb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:7998ec1031c6005338a81acfe783e92e2d4e927f9671a26d71be9406bb51ef9b +size 38514500 diff --git a/portfolio/segment_speeds/district_5/0__ca_segment_speeds__district_5.ipynb b/portfolio/segment_speeds/district_5/0__ca_segment_speeds__district_5.ipynb new file mode 100644 index 000000000..8c68c0d52 --- /dev/null +++ b/portfolio/segment_speeds/district_5/0__ca_segment_speeds__district_5.ipynb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4432d4cac079a42535258365ac3e30ad67d37296e53252092ac3138aa3c0e49c +size 2387628 diff --git a/portfolio/segment_speeds/district_6/0__ca_segment_speeds__district_6.ipynb b/portfolio/segment_speeds/district_6/0__ca_segment_speeds__district_6.ipynb new file mode 100644 index 000000000..bc82ad9c0 --- /dev/null +++ b/portfolio/segment_speeds/district_6/0__ca_segment_speeds__district_6.ipynb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:20eb74e29dae37ee0d48eb56bab083756e5f514ea387452b8a93292965118f98 +size 4921063 diff --git a/portfolio/segment_speeds/district_7/0__ca_segment_speeds__district_7.ipynb b/portfolio/segment_speeds/district_7/0__ca_segment_speeds__district_7.ipynb new file mode 100644 index 000000000..d1db2458d --- /dev/null +++ b/portfolio/segment_speeds/district_7/0__ca_segment_speeds__district_7.ipynb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:73b88e474ee8934eec832cdf5fcdc9caf62e354b6ab00859e56d65a75c4d6e8a +size 16938272 diff --git a/portfolio/segment_speeds/district_8/0__ca_segment_speeds__district_8.ipynb b/portfolio/segment_speeds/district_8/0__ca_segment_speeds__district_8.ipynb new file mode 100644 index 000000000..e06830bd9 --- /dev/null +++ b/portfolio/segment_speeds/district_8/0__ca_segment_speeds__district_8.ipynb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:938365c41884f0344864ada9c2318f4c59e64bceb1dc0d0fc062c3ee7dead87c +size 7762113 diff --git a/portfolio/segment_speeds/district_9/0__ca_segment_speeds__district_9.ipynb b/portfolio/segment_speeds/district_9/0__ca_segment_speeds__district_9.ipynb new file mode 100644 index 000000000..e8d03953a --- /dev/null +++ b/portfolio/segment_speeds/district_9/0__ca_segment_speeds__district_9.ipynb @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a52933ac312a47765dc51a927975bf91632d0b0a3b371c023470f1414b3bac93 +size 8988 diff --git a/portfolio/sites/segment_speeds.yml b/portfolio/sites/segment_speeds.yml index a5cc6aac8..31be274a6 100644 --- a/portfolio/sites/segment_speeds.yml +++ b/portfolio/sites/segment_speeds.yml @@ -1,7 +1,31 @@ title: Real-Time Speeds by Segment directory: ./daskify_rt/ readme: ./daskify_rt/README.md +notebook: ./daskify_rt/ca_segment_speeds.ipynb parts: - caption: Introduction - chapters: - - notebook: ./daskify_rt/ca_segment_speeds.ipynb + - params: + district: 1 + - params: + district: 2 + - params: + district: 3 + - params: + district: 4 + - params: + district: 5 + - params: + district: 6 + - params: + district: 7 + - params: + district: 8 + - params: + district: 9 + - params: + district: 10 + - params: + district: 11 + - params: + district: 12 \ No newline at end of file diff --git a/quarterly_performance_objective/A1_generate_routes_on_shn_data.py b/quarterly_performance_objective/A1_generate_routes_on_shn_data.py index 3719b4077..f6720db7e 100644 --- a/quarterly_performance_objective/A1_generate_routes_on_shn_data.py +++ b/quarterly_performance_objective/A1_generate_routes_on_shn_data.py @@ -3,87 +3,20 @@ """ import datetime import geopandas as gpd -import os import pandas as pd import sys -os.environ["CALITP_BQ_MAX_BYTES"] = str(130_000_000_000) - -from siuba import * from loguru import logger +from typing import Literal -from bus_service_utils import create_parallel_corridors, gtfs_build -from shared_utils import gtfs_utils, geography_utils, portfolio_utils +from bus_service_utils import create_parallel_corridors +from shared_utils import geography_utils from update_vars import ANALYSIS_DATE, BUS_SERVICE_GCS, COMPILED_CACHED_GCS -def merge_routelines_with_trips(selected_date: str) -> gpd.GeoDataFrame: - """ - Merge routes and trips to get line geometry. - """ - routelines = gpd.read_parquet( - f"{COMPILED_CACHED_GCS}routelines_{selected_date}_all.parquet") - trips = pd.read_parquet( - f"{COMPILED_CACHED_GCS}trips_{selected_date}_all.parquet") - - EPSG_CODE = routelines.crs.to_epsg() - - df = gtfs_build.merge_routes_trips( - routelines, trips, - merge_cols = ["calitp_itp_id", "calitp_url_number", "shape_id"], - crs = f"EPSG: {EPSG_CODE}", - join = "inner" - ) - - df2 = (df.rename(columns = {"calitp_itp_id": "itp_id"}) - .reset_index(drop=True) - ) - - return df2 - - -def get_total_service_hours(selected_date): - # Run a query that aggregates service hours to shape_id level - trip_cols = ["calitp_itp_id", "calitp_url_number", - "route_id", "shape_id"] - - # exclude 200! - itp_ids_on_day = (portfolio_utils.add_agency_name(selected_date) - >> filter(_.calitp_itp_id != 200) - >> select(_.calitp_itp_id) - >> distinct() - ) - - ITP_IDS = itp_ids_on_day.calitp_itp_id.tolist() - - trips_with_hrs = gtfs_utils.get_trips( - selected_date = selected_date, - itp_id_list = ITP_IDS, - trip_cols = trip_cols + ["service_hours", "direction_id"], - get_df = True # only when it's True can the Metrolink fix get applied - ) - - trips_with_hrs.to_parquet( - f"./data/trips_with_hrs_staging_{selected_date}.parquet") - - aggregated_hrs = (trips_with_hrs.groupby(trip_cols) - .agg({"service_hours": "sum"}) - .reset_index() - .rename(columns = { - "service_hours": "total_service_hours"}) - .drop_duplicates() - ) - - aggregated_hrs.to_parquet( - f"{BUS_SERVICE_GCS}trips_with_hrs_{selected_date}.parquet") - - # Once aggregated dataset written to GCS, remove local cache - os.remove(f"./data/trips_with_hrs_staging_{selected_date}.parquet") - - if __name__ == "__main__": - logger.add("./logs/A1_generate_routes_on_shn_data.log") + logger.add("./logs/A1_generate_routes_on_shn_data.log", retention="6 months") logger.add(sys.stderr, format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", level="INFO") @@ -91,18 +24,19 @@ def get_total_service_hours(selected_date): logger.info(f"Analysis date: {ANALYSIS_DATE}") start = datetime.datetime.now() - # Use concatenated routelines and trips from traffic_ops work - # Use the datasets with Amtrak added back in (rt_delay always excludes Amtrak) - trips_with_geom = merge_routelines_with_trips(ANALYSIS_DATE) + VERSION = "v2" + shapes_with_route_info = merge_routelines_with_trips( + ANALYSIS_DATE, warehouse_version = VERSION) # 50 ft buffers, get routes that are on SHN create_parallel_corridors.make_analysis_data( - hwy_buffer_feet=50, - alternate_df = trips_with_geom, + hwy_buffer_feet = 50, + transit_routes_df = shapes_with_route_info, pct_route_threshold = 0.2, pct_highway_threshold = 0, - DATA_PATH = BUS_SERVICE_GCS, - FILE_NAME = f"routes_on_shn_{ANALYSIS_DATE}" + data_path = BUS_SERVICE_GCS, + file_name = f"routes_on_shn_{ANALYSIS_DATE}", + warehouse_version = VERSION ) time1 = datetime.datetime.now() @@ -110,22 +44,17 @@ def get_total_service_hours(selected_date): # Grab other routes where at least 35% of route is within 0.5 mile of SHN create_parallel_corridors.make_analysis_data( - hwy_buffer_feet= geography_utils.FEET_PER_MI * 0.5, - alternate_df = trips_with_geom, + hwy_buffer_feet = geography_utils.FEET_PER_MI * 0.5, + transit_routes_df = shapes_with_route_info, pct_route_threshold = 0.2, pct_highway_threshold = 0, - DATA_PATH = BUS_SERVICE_GCS, - FILE_NAME = f"parallel_or_intersecting_{ANALYSIS_DATE}" + data_path = BUS_SERVICE_GCS, + file_name = f"parallel_or_intersecting_{ANALYSIS_DATE}", + warehouse_version = VERSION ) time2 = datetime.datetime.now() logger.info(f"routes within half mile buffer created: {time2 - time1}") - - # Get aggregated service hours by shape_id - get_total_service_hours(ANALYSIS_DATE) - - time3 = datetime.datetime.now() - logger.info(f"downloaded service hours at shape_id level: {time3 - time2}") - + end = datetime.datetime.now() logger.info(f"execution time: {end - start}") \ No newline at end of file diff --git a/quarterly_performance_objective/A2_categorize_routes.py b/quarterly_performance_objective/A2_categorize_routes.py index c469cd1a2..c49e4e8cb 100644 --- a/quarterly_performance_objective/A2_categorize_routes.py +++ b/quarterly_performance_objective/A2_categorize_routes.py @@ -4,75 +4,97 @@ """ import datetime import geopandas as gpd +import intake import pandas as pd import sys from loguru import logger +from typing import Literal from A1_generate_routes_on_shn_data import merge_routelines_with_trips from shared_utils import geography_utils, utils from bus_service_utils import create_parallel_corridors from update_vars import ANALYSIS_DATE, BUS_SERVICE_GCS, COMPILED_CACHED_GCS -route_cols = ["itp_id", "route_id"] +catalog = intake.open_catalog( + "../_shared_utils/shared_utils/shared_data_catalog.yml") #---------------------------------------------------------------# # Data processing - merge trips dfs, tag a route as parallel/on shn/other #---------------------------------------------------------------# - -def calculate_route_level_service_hours(analysis_date: str) -> pd.DataFrame: +def merge_trips_with_shape_service_hours(analysis_date: str): """ Merge trips df with trips_with_service_hrs (aggregated to shape_id). Aggregate this to route_level, since shape_id is confusing and leads to double-counting, or merges not going through in full. """ - trips_with_hrs = pd.read_parquet( - f"{BUS_SERVICE_GCS}trips_with_hrs_{analysis_date}.parquet").rename( - columns = {"calitp_itp_id": "itp_id"}) - + shape_service_hours = (pd.read_parquet( + f"{BUS_SERVICE_GCS}trips_with_hrs_{analysis_date}.parquet") + .rename(columns = {"total_service_hours": "service_hours", + "itp_id": "calitp_itp_id"}) + ) + trips = pd.read_parquet( - f"{COMPILED_CACHED_GCS}trips_{analysis_date}_all.parquet").rename( - columns = {"calitp_itp_id": "itp_id"}) + f"{COMPILED_CACHED_GCS}trips_{analysis_date}_all.parquet") + + trips2 = pd.merge( + trips, + shape_service_hours[ + ["calitp_itp_id", "shape_id", "service_hours"]].drop_duplicates(), + on = ["calitp_itp_id", "shape_id"], + how = "inner" + ) - # Aggregate trips with service hours (at shape_id level) up to route_id - route_service_hours = geography_utils.aggregate_by_geography( - trips_with_hrs, - group_cols = route_cols, - sum_cols = ["total_service_hours"] - ) + trips3 = trips2[["calitp_itp_id", "route_id", + "shape_id", "service_hours"] + ].drop_duplicates().reset_index(drop=True) - # Aggregate trips (at trip_id level) to route_id - routes = trips[route_cols + ["route_type"]].drop_duplicates().reset_index(drop=True) + return trips3 - # there are multiple trips sharing same shape_id - # that's fine, but since trips_with_hrs is already aggregated up to - # the route_id level, aggregate for trips too - route_full_info = pd.merge( - routes, - route_service_hours, - on = route_cols, - how = "outer", - validate = "1:1", - indicator=True - ).rename(columns = {"calitp_itp_id": "itp_id"}) +def calculate_route_level_service_hours( + analysis_date: str, + warehouse_version: Literal["v1", "v2"] +) -> pd.DataFrame: + + route_cols = ["route_id", "route_type"] + + if warehouse_version == "v1": + operator_cols = ["calitp_itp_id"] + + trips = merge_trips_with_shape_service_hours(analysis_date) + + elif warehouse_version == "v2": + operator_cols = ["feed_key", "name"] + + trips = pd.read_parquet( + f"{COMPILED_CACHED_GCS}trips_{analysis_date}.parquet") + + # Aggregate trips with service hours (at shape_id level) up to route_id + route_service_hours = geography_utils.aggregate_by_geography( + trips, + group_cols = operator_cols + route_cols, + sum_cols = ["service_hours"] + ) - return route_full_info + return route_service_hours -def get_unique_routes(df: pd.DataFrame) -> pd.DataFrame: +def get_unique_routes(df: pd.DataFrame, route_cols: list) -> pd.DataFrame: """ + route_cols: list that uniquely identifies an operator-route_id. Get it down to unique route for each row. As is, a row is route_id-hwy, the segment of overlap """ # If there are multiple shape_ids for route_id, # Keep the one where it's has higher overlap with SHN # If it was ever tagged as parallel, let's keep that obs - df2 = (df.sort_values(route_cols + ["pct_route", "pct_highway"], - ascending=[True, True, False, False] - ) - .drop_duplicates(subset=route_cols) + df2 = (df.sort_values(route_cols + ["pct_route", "pct_highway"]) + .drop_duplicates(subset=route_cols, keep="last") + # keep last because route_cols can + # be 2 cols (feed_key, name) or 1 col (calitp_itp_id) and can't control + # sort order without knowing list length )[route_cols] return df2 @@ -98,59 +120,41 @@ def make_mutually_exclusive(row: str) -> str: return df2 -def add_district(df: pd.DataFrame, date_str: str) -> pd.DataFrame: - """ - Merge in district info (only 1 district per route_id) - """ - - on_shn = gpd.read_parquet( - f"{BUS_SERVICE_GCS}routes_on_shn_{date_str}.parquet") - - district = on_shn[route_cols + ["pct_route", "District"]].drop_duplicates() - - # If there's multiple districts, keep the one associated with the highest % route - # since that's the obs we keep anyway - district2 = (district.sort_values(route_cols + ["pct_route", "District"], - ascending=[True, True, False, True]) - .drop_duplicates(subset=route_cols) - .reset_index(drop=True) - [route_cols + ["District"]] - ) - - df2 = pd.merge( - df, district2, - on = route_cols, - how = "left", - validate = "1:1" - ).astype({"District": "Int64"}) - - return df2 + -def flag_shn_intersecting_routes(analysis_date: str) -> pd.DataFrame: +def flag_shn_intersecting_routes( + analysis_date: str, + route_cols: list +) -> pd.DataFrame: """ Take the trips df (each indiv trip) and aggregated trip_service_hrs df (aggregated to shape_id), merge together, and flag whether a transit route is on SHN, intersects SHN, or other. """ + on_shn = gpd.read_parquet( f"{BUS_SERVICE_GCS}routes_on_shn_{analysis_date}.parquet") - - on_shn_routes = get_unique_routes(on_shn[on_shn.parallel==1]) + + all_routes = on_shn[route_cols].drop_duplicates().reset_index(drop=True) + + on_shn_routes = get_unique_routes( + on_shn[on_shn.parallel==1], + route_cols + ) intersecting = gpd.read_parquet( f"{BUS_SERVICE_GCS}parallel_or_intersecting_{analysis_date}.parquet" ) - intersecting_routes = get_unique_routes(intersecting[intersecting.parallel==1]) - - # Create route-level df - # Merge trips and trips_with_hrs dfs, and aggregate to route_level - route_service_hours = calculate_route_level_service_hours(analysis_date) + intersecting_routes = get_unique_routes( + intersecting[intersecting.parallel==1], + route_cols + ) # Merge dummy variables in with_on_shn_intersecting = pd.merge( - route_service_hours, + all_routes, on_shn_routes, on = route_cols, how = "left", @@ -164,17 +168,15 @@ def flag_shn_intersecting_routes(analysis_date: str) -> pd.DataFrame: indicator = "in_intersecting" ) - # Make sure on_shn, intersects_shn, and other are mutually exclusive categories # A route can only fall into 1 of these groups with_categories = mutually_exclusive_groups(with_on_shn_intersecting) - with_categories = add_district(with_categories, analysis_date) return with_categories if __name__=="__main__": - logger.add("./logs/A2_categorize_routes.log") + logger.add("./logs/A2_categorize_routes.log", retention = "6 months") logger.add(sys.stderr, format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", level="INFO") @@ -182,36 +184,61 @@ def flag_shn_intersecting_routes(analysis_date: str) -> pd.DataFrame: logger.info(f"Analysis date: {ANALYSIS_DATE}") start = datetime.datetime.now() - # (1) Categorize into each group - df = flag_shn_intersecting_routes(ANALYSIS_DATE) + VERSION = "v1" + + if VERSION == "v1": + route_cols = ["calitp_itp_id", "route_id"] + + elif VERSION == "v2": + route_cols = ["feed_key", "name", "route_id"] + + # (1) Get route's line geom + shapes = merge_routelines_with_trips(ANALYSIS_DATE, VERSION) + + routes = (create_parallel_corridors.process_transit_routes( + shapes, VERSION + )[route_cols + ["route_length", "geometry"]] + .to_crs(geography_utils.CA_StatePlane) + # make sure units are in feet + ) + + routes_with_district = add_district(routes, route_cols) + + # (2) Categorize into each group + routes_flagged = flag_shn_intersecting_routes(ANALYSIS_DATE, route_cols) time1 = datetime.datetime.now() logger.info(f"flag shn or intersecting: {time1 - start}") - # (2) Add in the route's line geom - trips_with_geom = merge_routelines_with_trips(ANALYSIS_DATE) - - bus_route_with_geom = create_parallel_corridors.process_transit_routes( - alternate_df = trips_with_geom) + # (3) Aggregate service hours up to route-level + route_service_hours = calculate_route_level_service_hours( + ANALYSIS_DATE, VERSION) - gdf = pd.merge( - bus_route_with_geom[["itp_id", "route_id", "route_length", "geometry"]], - df, - on = ["itp_id", "route_id"], + # (4) Merge all routes (with line geom) with categories and service hours + df = pd.merge( + routes_with_district, + route_service_hours, + on = route_cols, + how = "left", + validate = "1:1" + ).merge( + routes_flagged, + on = route_cols, how = "left", - ).to_crs(geography_utils.CA_StatePlane) # make sure units are in feet + validate = "1:1" + ) time2 = datetime.datetime.now() - logger.info(f"merge in line geom: {time2 - time1}") - - # (3) Some cleanup for visualization, as well as simplifying geom for faster mapping - gdf = gdf.assign( - service_hours = gdf.total_service_hours.round(2), - # simplify geom to make it faster to plot? - geometry = gdf.geometry.simplify(tolerance = 20), - route_length_mi = round(gdf.route_length / geography_utils.FEET_PER_MI, 2), - ).drop(columns = ["route_length", "total_service_hours"]).to_crs(geography_utils.WGS84) - + logger.info(f"create route-level df: {time2 - time1}") + + # (3) Some cleanup for visualization + gdf = (df.assign( + service_hours = df.service_hours.round(2), + route_length_mi = round(df.route_length / geography_utils.FEET_PER_MI, 2), + ).drop(columns = ["route_length", "service_hours"]) + .to_crs(geography_utils.WGS84) + ) + ''' # Export to GCS (use date suffix because we will want historical comparisons) utils.geoparquet_gcs_export( gdf, @@ -220,6 +247,7 @@ def flag_shn_intersecting_routes(analysis_date: str) -> pd.DataFrame: ) logger.info("exported dataset to GCS") - + ''' + logger.info("test oct 2022 run, don't save") end = datetime.datetime.now() logger.info(f"execution time: {end - start}") \ No newline at end of file diff --git a/quarterly_performance_objective/download_trips_v2_backfill.py b/quarterly_performance_objective/download_trips_v2_backfill.py new file mode 100644 index 000000000..a6e3c4f82 --- /dev/null +++ b/quarterly_performance_objective/download_trips_v2_backfill.py @@ -0,0 +1,172 @@ +""" +Download all trips and shapes for a day for v2. + +Use this to figure out why v1 and v2 aggregate service hours are so different.s +""" +import os +os.environ["CALITP_BQ_MAX_BYTES"] = str(300_000_000_000) + +import datetime as dt +import pandas as pd +import sys + +from calitp.tables import tbls +from siuba import * +from loguru import logger + +from shared_utils import (gtfs_utils_v2, gtfs_utils, + rt_dates, utils, geography_utils) +from update_vars import COMPILED_CACHED_GCS + +def scheduled_operators(analysis_date: str): + """ + This is how HQTA data is downloaded...so do the same here for backfill. + """ + all_operators = gtfs_utils_v2.schedule_daily_feed_to_organization( + selected_date = analysis_date, + keep_cols = None, + get_df = True, + feed_option = "use_subfeeds" + ) + + keep_cols = ["feed_key", "name"] + + operators_to_include = all_operators[keep_cols] + + # There shouldn't be any duplicates by name, since we got rid + # of precursor feeds. But, just in case, don't allow dup names. + operators_to_include = (operators_to_include + .drop_duplicates(subset="name") + .reset_index(drop=True) + ) + + return operators_to_include + + +if __name__=="__main__": + logger.add("./logs/download_trips_v2_backfill.log") + logger.add(sys.stderr, + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", + level="INFO") + + analysis_date = rt_dates.PMAC["Q3_2022"] + VERSION = "v1" + logger.info(f"Analysis date: {analysis_date} warehouse {VERSION}") + + start = dt.datetime.now() + + if VERSION == "v1": + ITP_IDS = (tbls.gtfs_schedule.agency() + >> distinct(_.calitp_itp_id) + >> filter(_.calitp_itp_id != 200) + >> collect() + ).calitp_itp_id.tolist() + + IDS_TO_RUN = sorted(ITP_IDS) + + logger.info(f"# operators to run: {len(IDS_TO_RUN)}") + + dataset = "trips" + logger.info(f"*********** Download {dataset} data ***********") + + + keep_trip_cols = [ + "calitp_itp_id", "calitp_url_number", + "service_date", "trip_key", "trip_id", + "route_id", "direction_id", "shape_id", + "calitp_extracted_at", "calitp_deleted_at", + "trip_first_departure_ts", "trip_last_arrival_ts", + "service_hours" + ] + + trips = gtfs_utils.get_trips( + selected_date = analysis_date, + itp_id_list = IDS_TO_RUN, + trip_cols = keep_trip_cols, + get_df = True, + ) + + trips.to_parquet( + f"{COMPILED_CACHED_GCS}{dataset}_{analysis_date}_{VERSION}.parquet") + ''' + trips = pd.read_parquet( + f"{COMPILED_CACHED_GCS}trips_{analysis_date}_{VERSION}.parquet") + + dataset = "routelines" + logger.info(f"*********** Download {dataset} data ***********") + + routelines = gtfs_utils.get_route_shapes( + selected_date = analysis_date, + itp_id_list = IDS_TO_RUN, + get_df = True, + crs = geography_utils.CA_NAD83Albers, + trip_df = trips + )[["calitp_itp_id", "calitp_url_number", "shape_id", "geometry"]] + + utils.geoparquet_gcs_export( + routelines, + COMPILED_CACHED_GCS, + f"{dataset}_{analysis_date}_{VERSION}.parquet" + ) + ''' + + ''' + elif VERSION == "v2": + operators_df = scheduled_operators(analysis_date) + + FEEDS_TO_RUN = sorted(operators_df.feed_key.unique().tolist()) + + logger.info(f"# operators to run: {len(FEEDS_TO_RUN)}") + + dataset = "trips" + logger.info(f"*********** Download {dataset} data ***********") + + keep_trip_cols = [ + "feed_key", "name", "regional_feed_type", + "service_date", "trip_key", "trip_id", + "route_key", "route_id", "route_type", + "route_short_name", "route_long_name", "route_desc", + "direction_id", + "shape_array_key", "shape_id", + "trip_first_departure_sec", "trip_last_arrival_sec", + "service_hours" + ] + + trips = gtfs_utils_v2.get_trips( + selected_date = analysis_date, + operator_feeds = FEEDS_TO_RUN, + trip_cols = keep_trip_cols, + get_df = True, + ) + + trips.to_parquet( + f"{COMPILED_CACHED_GCS}{dataset}_{analysis_date}_{VERSION}.parquet") + + dataset = "routelines" + logger.info(f"*********** Download {dataset} data ***********") + + keep_shape_cols = [ + "feed_key", + "shape_id", "shape_array_key", + "n_trips", + # n_trips is new column...can help if we want + # to choose between shape_ids + # geometry already returned when get_df is True + ] + + routelines = gtfs_utils_v2.get_shapes( + selected_date = analysis_date, + operator_feeds = FEEDS_TO_RUN, + shape_cols = keep_shape_cols, + get_df = True, + crs = geography_utils.CA_NAD83Albers, + ) + + utils.geoparquet_gcs_export( + routelines, + COMPILED_CACHED_GCS, + f"{dataset}_{analysis_date}_{VERSION}.parquet" + ) + ''' + end = dt.datetime.now() + logger.info(f"execution time: {end - start}") diff --git a/quarterly_performance_objective/route_df.py b/quarterly_performance_objective/route_df.py new file mode 100644 index 000000000..b3cd7f7d4 --- /dev/null +++ b/quarterly_performance_objective/route_df.py @@ -0,0 +1,172 @@ +""" +Make a route-level df from trips and shapes. + +Must include the route's line geom and service hours and +district. +""" +import datetime +import geopandas as gpd +import intake +import pandas as pd +import sys + +from loguru import logger +from typing import Literal + +from shared_utils import geography_utils, utils, rt_dates +from bus_service_utils import create_parallel_corridors +from update_vars import BUS_SERVICE_GCS, COMPILED_CACHED_GCS #,ANALYSIS_DATE + +catalog = intake.open_catalog( + "../_shared_utils/shared_utils/shared_data_catalog.yml") + + +def shape_geom_to_route_geom( + shapes: gpd.GeoDataFrame, + trips: pd.DataFrame, + warehouse_version: Literal["v1", "v2"] +) -> gpd.GeoDataFrame: + """ + Merge routes and trips to get line geometry. + """ + cols_to_keep = ["route_id", "route_type", "shape_id", "geometry"] + + if warehouse_version == "v1": + operator_cols = ["calitp_itp_id"] + keep_cols = operator_cols + cols_to_keep + + elif warehouse_version == "v2": + operator_cols = ["feed_key"] + keep_cols = operator_cols + ["name"] + cols_to_keep + + # Merge trips with the shape's line geom, and get it to shape_id level + df = (pd.merge( + shapes, + trips, + on = operator_cols + ["shape_id"], + how = "inner", + )[keep_cols] + .drop_duplicates(subset=operator_cols + ["shape_id"]) + .reset_index(drop=True) + ) + + # Now, get it from shape_id level to route_id level + routes = (create_parallel_corridors.process_transit_routes( + df, warehouse_version + ).to_crs(geography_utils.CA_StatePlane) + # make sure units are in feet + ) + + return routes + + +def aggregate_trip_service_to_route_level( + trips: pd.DataFrame, + route_cols: list) -> pd.DataFrame: + + route_service_hours = geography_utils.aggregate_by_geography( + trips, + group_cols = route_cols, + sum_cols = ["service_hours"] + + ) + + return route_service_hours + + +def add_district(route_df: gpd.GeoDataFrame, + route_cols: list) -> gpd.GeoDataFrame: + """ + Merge in district info (only 1 district per route_id) + """ + districts = (catalog.caltrans_districts.read() + [["DISTRICT", "geometry"]] + .rename(columns = {"DISTRICT": "district"}) + ).to_crs(route_df.crs) + + # Find overlay and calculate overlay length + # Keep the district with longest overlay for that route_id + overlay_results = gpd.overlay( + route_df[route_cols + ["geometry"]].drop_duplicates(), + districts, + how = "intersection", + keep_geom_type = False + ) + + overlay_results = overlay_results.assign( + overlay_length = overlay_results.geometry.to_crs( + geography_utils.CA_StatePlane).length + ) + + longest_overlay = (overlay_results + .sort_values(route_cols + ["overlay_length"]) + .drop_duplicates(subset=route_cols, keep="last") + .reset_index(drop=True) + ) + + + # Only keep 1 district per route + gdf = pd.merge( + route_df, + longest_overlay[route_cols + ["district"]], + on = route_cols, + how = "left" + ) + + return gdf + + +if __name__ == "__main__": + + ANALYSIS_DATE = rt_dates.PMAC["Q4_2022"] + VERSION = "v2" + + logger.add("./logs/assemble_route_df.log", retention="6 months") + logger.add(sys.stderr, + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", + level="INFO") + + logger.info(f"Analysis date: {ANALYSIS_DATE} warehouse {VERSION}") + start = datetime.datetime.now() + + if VERSION == "v1": + route_cols = ["calitp_itp_id", "route_id"] + + elif VERSION == "v2": + route_cols = ["feed_key", "name", "route_id"] + + # Import data + trips = pd.read_parquet( + f"{COMPILED_CACHED_GCS}trips_{ANALYSIS_DATE}_all.parquet") + routelines = gpd.read_parquet( + f"{COMPILED_CACHED_GCS}routelines_{ANALYSIS_DATE}_all.parquet") + + # Merge to get shape_level geometry, then pare down to route-level geometry + route_geom = shape_geom_to_route_geom( + routelines, trips, VERSION) + + # Add district + route_geom_with_district = add_district(route_geom, route_cols) + logger.info("get route-level geometry and district") + + # Get route-level service + route_service = aggregate_trip_service_to_route_level(trips, route_cols) + logger.info("get route-level service") + + # Merge service hours with route-level geometry and district + gdf = pd.merge( + route_geom_with_district, + route_service, + on = route_cols, + how = "left", + validate = "1:1" + ) + + utils.geoparquet_gcs_export( + gdf, + BUS_SERVICE_GCS, + f"routes_{ANALYSIS_DATE}_{VERSION}" + ) + + end = datetime.datetime.now() + logger.info(f"execution time: {end - start}") \ No newline at end of file diff --git a/quarterly_performance_objective/update_vars.py b/quarterly_performance_objective/update_vars.py index 668867817..509168c74 100644 --- a/quarterly_performance_objective/update_vars.py +++ b/quarterly_performance_objective/update_vars.py @@ -3,5 +3,6 @@ BUS_SERVICE_GCS = f"{utils.GCS_FILE_PATH}" COMPILED_CACHED_GCS = f"{rt_utils.GCS_FILE_PATH}compiled_cached_views/" -CURRENT_QUARTER = "Q4_2022" -ANALYSIS_DATE = rt_dates.PMAC[CURRENT_QUARTER] +#CURRENT_QUARTER = "Q4_2022" +#ANALYSIS_DATE = rt_dates.PMAC[CURRENT_QUARTER] +ANALYSIS_DATE = rt_dates.DATES["oct2022"] diff --git a/quarterly_performance_objective/v1_v2_parity.ipynb b/quarterly_performance_objective/v1_v2_parity.ipynb new file mode 100644 index 000000000..e01d863a1 --- /dev/null +++ b/quarterly_performance_objective/v1_v2_parity.ipynb @@ -0,0 +1,137 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "ac1ae78f-ac19-4bb4-99bb-09ae5fad0ac5", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/opt/conda/lib/python3.9/site-packages/geopandas/_compat.py:123: UserWarning: The Shapely GEOS version (3.10.3-CAPI-1.16.1) is incompatible with the GEOS version PyGEOS was compiled with (3.10.1-CAPI-1.16.0). Conversions between both will be slow.\n", + " warnings.warn(\n" + ] + } + ], + "source": [ + "import geopandas as gpd\n", + "import pandas as pd\n", + "\n", + "from shared_utils import rt_dates\n", + "from update_vars import BUS_SERVICE_GCS, COMPILED_CACHED_GCS " + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "2d0e3275-5eeb-4a4f-8da0-3116143500f5", + "metadata": {}, + "outputs": [], + "source": [ + "ANALYSIS_DATE = rt_dates.PMAC[\"Q4_2022\"]\n", + "\n", + "df1 = pd.read_parquet(\n", + " f\"{BUS_SERVICE_GCS}trips_with_hrs_{ANALYSIS_DATE}.parquet\")\n", + "\n", + "df2 = gpd.read_parquet(\n", + " f\"{BUS_SERVICE_GCS}routes_{ANALYSIS_DATE}_v2.parquet\")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "28c4219b-9b4c-4b12-bc46-f7a6de0e222a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "28353.39944444444" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df1[[\"calitp_itp_id\", \"route_id\", \n", + " \"shape_id\", \"total_service_hours\"]\n", + " ].drop_duplicates().total_service_hours.sum()" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "36f1a8c2-e84c-4ecc-aa3c-cc23342c4134", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "31448.98333333333" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df1.total_service_hours.sum()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "5e1aabaf-a50e-4189-ae02-0a8809fa2ce8", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "102686.6461111111" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df2.service_hours.sum()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "238f44b4-8365-476f-b0de-8bde5f1cf86a", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/traffic_ops/Makefile b/traffic_ops/Makefile index 5b6c3a62a..4b990f311 100644 --- a/traffic_ops/Makefile +++ b/traffic_ops/Makefile @@ -1,5 +1,3 @@ gtfs_schedule_geospatial_open_data: - #cd ../bus_service_increase/ && make setup_bus_service_utils && cd .. - python prep_data.py python create_routes_data.py python create_stops_data.py \ No newline at end of file diff --git a/traffic_ops/README.md b/traffic_ops/README.md index 607c7e67b..a73652589 100644 --- a/traffic_ops/README.md +++ b/traffic_ops/README.md @@ -3,7 +3,7 @@ Traffic Ops had a request for all transit routes and transit stops to be published in the open data portal. ## Scripts -1. [prep_data](./prep_data.py): prep warehouse queries for GTFS schedule tables +1. [prep_data](./prep_data.py): helper functions for creating `routes` and `stops` datasets 1. [create_routes_data](./create_routes_data.py): functions to assemble routes that appear in `shapes` 1. [create_stops_data](./create_stops_data.py): functions to assemble stop data diff --git a/traffic_ops/check-exported-data.ipynb b/traffic_ops/check-exported-data.ipynb index 2b42ff2e2..397a329cd 100644 --- a/traffic_ops/check-exported-data.ipynb +++ b/traffic_ops/check-exported-data.ipynb @@ -59,7 +59,7 @@ "metadata": {}, "outputs": [], "source": [ - "cols = ['itp_id', 'route_id', 'shape_id']\n", + "cols = ['agency', 'route_id', 'shape_id']\n", "print(f\"# unique combos: {len(gdf[cols].drop_duplicates())}\")" ] }, @@ -124,7 +124,7 @@ "metadata": {}, "outputs": [], "source": [ - "cols = ['itp_id', 'route_id', 'stop_id']\n", + "cols = ['agency', 'route_id', 'stop_id']\n", "print(f\"# unique combos: {len(gdf[cols].drop_duplicates())}\")" ] }, diff --git a/traffic_ops/create_routes_data.py b/traffic_ops/create_routes_data.py index 2bd8a1534..5b8602af9 100644 --- a/traffic_ops/create_routes_data.py +++ b/traffic_ops/create_routes_data.py @@ -2,53 +2,20 @@ Create routes file with identifiers including route_id, route_name, operator name. """ -import dask.dataframe as dd -import dask_geopandas as dg import geopandas as gpd import pandas as pd from datetime import datetime import prep_data -from shared_utils import utils, geography_utils, portfolio_utils +from shared_utils import utils, portfolio_utils -def import_trips(analysis_date: str) -> pd.DataFrame: - keep_cols = ["feed_key", "name", - "trip_id", - "route_id", "shape_id", - "route_long_name", "route_short_name", "route_desc" - ] - - trips = pd.read_parquet( - f"{prep_data.COMPILED_CACHED_GCS}" - f"trips_{analysis_date}_all.parquet", - columns = keep_cols - ) - - # Clean organization name - trips2 = portfolio_utils.clean_organization_name(trips) - - return trips2 - - -def import_shapes(analysis_date: str) -> gpd.GeodataFrame: - keep_cols = ["feed_key", "shape_id", "n_trips", "geometry"] - - shapes = gpd.read_parquet( - f"{prep_data.COMPILED_CACHED_GCS}" - f"routelines_{analysis_date}_all.parquet", - columns = keep_cols - ).to_crs(geography_utils.WGS84) - - return shapes - - def create_routes_file_for_export(analysis_date: str) -> gpd.GeoDataFrame: # Read in local parquets - trips = import_trips(analysis_date) - shapes = import_shapes(analysis_date) + trips = prep_data.import_trips(analysis_date) + shapes = prep_data.import_shapes(analysis_date) shape_cols = ["feed_key", "shape_id"] @@ -61,20 +28,29 @@ def create_routes_file_for_export(analysis_date: str) -> gpd.GeoDataFrame: ) drop_cols = ["route_short_name", "route_long_name", - "route_desc", "service_date", - "feed_key" + "route_desc", "feed_key", "trip_id" ] routes_assembled = (portfolio_utils.add_route_name(df) - .drop(columns = drop_cols) - .sort_values(["name", "route_id"]) - .drop_duplicates(subset=["name", - "route_id", "shape_id"]) + .drop(columns = drop_cols) + .sort_values(["name", "route_id"]) + .drop_duplicates(subset=[ + "name", "route_id", "shape_id"]) .reset_index(drop=True) - .rename(columns = prep_data.RENAME_COLS) ) - return routes_assembled + + # Change column order + col_order = [ + 'agency', 'route_id', 'route_type', 'route_name', + 'shape_id', 'n_trips', + 'feed_url', 'geometry' + ] + + routes_assembled2 = prep_data.standardize_operator_info_for_exports( + routes_assembled)[col_order].reindex(columns = col_order) + + return routes_assembled2 if __name__ == "__main__": diff --git a/traffic_ops/create_stops_data.py b/traffic_ops/create_stops_data.py index 63b8bdb51..34535c07d 100644 --- a/traffic_ops/create_stops_data.py +++ b/traffic_ops/create_stops_data.py @@ -3,48 +3,19 @@ route_id, route_name, agency_id, agency_name. """ import dask.dataframe as dd -import dask_geopandas as dg import geopandas as gpd import pandas as pd from datetime import datetime import prep_data -from shared_utils import utils, geography_utils, portfolio_utils -from create_routes_data import import_trips - -def import_stops(analysis_date: str) -> gpd.GeoDataFrame: - # Instead of keeping route_type_0, route_type_1, etc - # keep stops table long, instead of wide - # attach route_id, route_type as before - keep_cols = [ - "feed_key", - "stop_id", "stop_name", - ] - - stops = gpd.read_parquet( - f"{prep_data.COMPILED_CACHED_GCS}" - f"stops_{analysis_date}_all.parquet", - columns = keep_cols - ) - - -def import_stop_times(analysis_date: str) -> pd.DataFrame: - keep_cols = ["feed_key", "trip_id", "stop_id"] - - stop_times = pd.read_parquet( - f"{prep_data.COMPILED_CACHED_GCS}" - f"st_{analysis_date}_all.parquet", - columns = keep_cols - ).drop_duplicates().reset_index(drop=True) - - return stop_times +from shared_utils import utils def attach_route_info_to_stops( stops: gpd.GeoDataFrame, trips: pd.DataFrame, - stop_times: pd.DataFrame + stop_times: dd.DataFrame ) -> gpd.GeoDataFrame: """ Attach all the various route information (route_id, route_type) @@ -58,7 +29,7 @@ def attach_route_info_to_stops( ] stops_with_route_info = ( - pd.merge( + dd.merge( stop_times, trips[trip_cols], on = ["feed_key", "trip_id"] @@ -68,12 +39,12 @@ def attach_route_info_to_stops( .reset_index(drop=True) ) - stops_with_geom = pd.merge( + stops_with_geom = dd.merge( stops, stops_with_route_info, on = ["feed_key", "stop_id"], how = "inner", - ) + ).compute() # Drop feed_key and just sort on name stops_assembled = (stops_with_geom.drop(columns = "feed_key") @@ -81,24 +52,31 @@ def attach_route_info_to_stops( .reset_index(drop=True) ) - return stops_assembled + # Change column order + col_order = [ + 'agency', 'route_id', 'route_type', + 'stop_id', 'stop_name', + 'feed_url', 'geometry' + ] + + stops_assembled2 = prep_data.standardize_operator_info_for_exports( + stops_assembled)[col_order].reindex(columns = col_order) + + return stops_assembled2 def create_stops_file_for_export(analysis_date: str) -> gpd.GeoDataFrame: time0 = datetime.now() - # Read in local parquets - stops = import_stops(analysis_date) - trips = import_trips(analysis_date) - stop_times = import_stop_times(analysis_date) - - time1 = datetime.now() - print(f"Get rid of duplicates: {time1-time0}") - + # Read in parquets + stops = prep_data.import_stops(analysis_date) + trips = prep_data.import_trips(analysis_date) + stop_times = prep_data.import_stop_times(analysis_date) + stops_assembled = attach_route_info_to_stops(stops, trips, stop_times) - time2 = datetime.now() - print(f"Attach route and operator info to stops: {time2-time1}") + time1 = datetime.now() + print(f"Attach route and operator info to stops: {time1-time0}") return stops_assembled diff --git a/traffic_ops/prep_data.py b/traffic_ops/prep_data.py index 733ac9737..ec78fc806 100644 --- a/traffic_ops/prep_data.py +++ b/traffic_ops/prep_data.py @@ -1,76 +1,88 @@ """ -Functions to query GTFS schedule data, -save locally as parquets, -then clean up at the end of the script. +Import trips, shapes, stops, stop_times files +and get it ready for GTFS schedule routes / stops datasets. """ -import os -os.environ["CALITP_BQ_MAX_BYTES"] = str(130_000_000_000) - import dask.dataframe as dd -import dask_geopandas as dg -import datetime import geopandas as gpd import pandas as pd -from siuba import * -from typing import Union, Literal - -from shared_utils import geography_utils, gtfs_utils_v2, utils, rt_dates +from shared_utils import (utils, rt_dates, rt_utils, + geography_utils, portfolio_utils) -ANALYSIS_DATE = gtfs_utils.format_date(rt_dates.DATES["dec2022"]) +ANALYSIS_DATE = rt_utils.format_date(rt_dates.DATES["jan2023"]) GCS = "gs://calitp-analytics-data/data-analyses/" TRAFFIC_OPS_GCS = f"{GCS}traffic_ops/" COMPILED_CACHED_GCS = f"{GCS}rt_delay/compiled_cached_views/" -DATA_PATH = "./data/" - -def concatenate_dataset_and_export( - dataset: Literal["trips", "routelines", "stops", "st"], - date: Union[str, datetime.date], - export_path: str = COMPILED_CACHED_GCS, -) -> Union[pd.DataFrame, gpd.GeoDataFrame]: - """ - Grab the cached file on selected date for trips, stops, routelines, st. - Concatenate Amtrak. - Save a new cached file in GCS. - """ +def import_trips(analysis_date: str) -> pd.DataFrame: + keep_cols = ["feed_key", "name", + "trip_id", + "route_id", "route_type", "shape_id", + "route_long_name", "route_short_name", "route_desc" + ] - if dataset in ["trips", "st"]: - amtrak = pd.read_parquet(f"amtrak_{dataset}.parquet") - df = pd.read_parquet(f"{export_path}{dataset}_{date}.parquet") - - elif dataset in ["routelines", "stops"]: - amtrak = gpd.read_parquet( - f"amtrak_{dataset}.parquet" - ).to_crs(geography_utils.WGS84) - - df = gpd.read_parquet( - f"{export_path}{dataset}_{date}.parquet" - ).to_crs(geography_utils.WGS84) - - full_df = (pd.concat([df, amtrak], axis=0) - .drop_duplicates() - .reset_index(drop=True) - ) + trips = pd.read_parquet( + f"{COMPILED_CACHED_GCS}trips_{analysis_date}.parquet", + columns = keep_cols + ) + + return trips - if isinstance(full_df, pd.DataFrame): - full_df.to_parquet(f"{export_path}{dataset}_{date}_all.parquet") - - elif isinstance(full_df, gpd.GeoDataFrame): - utils.geoparquet_gcs_export( - full_df, - export_path, - f"{dataset}_{date}_all" - ) - -def remove_local_parquets(): - # Remove Amtrak now that full dataset made - for dataset in ["trips", "routelines", "stops", "st"]: - os.remove(f"amtrak_{dataset}.parquet") +def import_shapes(analysis_date: str) -> gpd.GeoDataFrame: + keep_cols = ["feed_key", "shape_id", "n_trips", "geometry"] + + shapes = gpd.read_parquet( + f"{COMPILED_CACHED_GCS}routelines_{analysis_date}.parquet", + columns = keep_cols + ).to_crs(geography_utils.WGS84) + + return shapes + +def import_stops(analysis_date: str) -> gpd.GeoDataFrame: + # Instead of keeping route_type_0, route_type_1, etc + # keep stops table long, instead of wide + # attach route_id, route_type as before + keep_cols = [ + "feed_key", + "stop_id", "stop_name", + "geometry" + ] + + stops = gpd.read_parquet( + f"{COMPILED_CACHED_GCS}stops_{analysis_date}.parquet", + columns = keep_cols + ).to_crs(geography_utils.WGS84) + + return stops + + +def import_stop_times(analysis_date: str) -> pd.DataFrame: + keep_cols = ["feed_key", "trip_id", "stop_id"] + + stop_times = dd.read_parquet( + f"{COMPILED_CACHED_GCS}st_{analysis_date}.parquet", + columns = keep_cols + ).drop_duplicates().reset_index(drop=True) + + return stop_times + +def standardize_operator_info_for_exports(df: pd.DataFrame) -> pd.DataFrame: + + # Add a decoded version for base64_url + df2 = portfolio_utils.add_agency_identifiers(df) + + # Clean organization name + df3 = (portfolio_utils.clean_organization_name(df2) + .rename(columns = RENAME_COLS) + ) + + return df3 + + def export_to_subfolder(file_name: str, analysis_date: str): """ We always overwrite the same geoparquets each month, and point our @@ -89,82 +101,8 @@ def export_to_subfolder(file_name: str, analysis_date: str): ) -#----------------------------------------------------# -# Functions are used in -# `create_routes_data.py` and `create_stops_data.py` -#----------------------------------------------------# # Define column names, must fit ESRI 10 character limits RENAME_COLS = { "name": "agency", "route_name_used": "route_name", -} - - -if __name__ == "__main__": - """ - Amtrak is always excluded from queries for hqta and rt_delay - - Add back in now for our open data portal dataset - """ - - amtrak = "Amtrak Schedule" - - keep_stop_cols = [ - 'feed_key', 'stop_id', 'stop_name', - 'stop_key' - ] - - amtrak_stops = gtfs_utils_v2.get_stops( - selected_date = selected_date, - operator_feeds = [amtrak], - stop_cols = keep_stop_cols, - get_df = True, - crs = geography_utils.CA_NAD83Albers, # this is the CRS used for rt_delay - ) - - amtrak_stops.to_parquet("amtrak_stops.parquet") - - keep_trip_cols = [ - 'feed_key', 'service_date', - 'trip_key','trip_id', - 'route_id', 'route_type', - 'direction_id', 'shape_id', - 'route_short_name', 'route_long_name', 'route_desc' - ] - - amtrak_trips = gtfs_utils_v2.get_trips( - selected_date = selected_date, - operator_feeds = [amtrak], - trip_cols = keep_trip_cols, - get_df = True, - ) - - amtrak_trips.to_parquet("amtrak_trips.parquet") - - amtrak_routelines = gtfs_utils_v2.get_shapes( - selected_date = selected_date, - operator_feeds = [amtrak], - get_df = True, - crs = geography_utils.CA_NAD83Albers, - ) - - amtrak_routelines.to_parquet("amtrak_routelines.parquet") - - amtrak_st = gtfs_utils_v2.get_stop_times( - selected_date = selected_date, - operator_feeds = [amtrak], - get_df = True, - trip_df = amtrak_trips - ) - - amtrak_st.to_parquet("amtrak_st.parquet") - - # Concatenate Amtrak - for d in ["trips", "routelines", "stops", "st"]: - concatenate_dataset_and_export( - dataset = d, - date = ANALYSIS_DATE, - export_path = COMPILED_CACHED_GCS, - ) - - remove_local_parquets() \ No newline at end of file +} \ No newline at end of file