Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish speeds #784

Merged
merged 14 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _shared_utils/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
setup(
name="shared_utils",
packages=find_packages(),
version="2.0.0",
version="2.1.0",
description="Shared utility functions for data analyses",
author="Cal-ITP",
license="Apache",
Expand Down
2 changes: 2 additions & 0 deletions _shared_utils/shared_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
portfolio_utils,
rt_dates,
rt_utils,
schedule_rt_utils,
styleguide,
utils,
)
Expand All @@ -18,6 +19,7 @@
"gtfs_utils",
"gtfs_utils_v2",
"portfolio_utils",
"schedule_rt_utils",
"rt_dates",
"rt_utils",
"styleguide",
Expand Down
83 changes: 3 additions & 80 deletions _shared_utils/shared_utils/portfolio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import dask_geopandas as dg
import pandas as pd
import pandas.io.formats.style # for type hint: https://github.com/pandas-dev/pandas/issues/24884
from calitp_data_analysis.tables import tbls
from IPython.display import HTML
from shared_utils import gtfs_utils_v2, rt_utils
from siuba import *
Expand All @@ -37,26 +36,6 @@
}


def clean_organization_name(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean up organization name used in portfolio.
"""
df = df.assign(
name=(
df.name.str.replace("Schedule", "")
.str.replace("Vehicle Positions", "")
.str.replace("VehiclePositions", "")
.str.replace("Trip Updates", "")
.str.replace("TripUpdates", "")
.str.replace("Service Alerts", "")
.str.replace("Bay Area 511", "")
.str.strip()
)
)

return df


def decode_base64_url(row):
"""
Provide decoded version of URL as ASCII.
Expand All @@ -83,18 +62,16 @@ def add_agency_identifiers(df: pd.DataFrame, date: str) -> pd.DataFrame:
dim_gtfs_datasets[
(dim_gtfs_datasets.data_quality_pipeline == True)
& (dim_gtfs_datasets._is_current == True)
& (dim_gtfs_datasets._valid_from <= pd.to_datetime(analysis_date))
& (dim_gtfs_datasets._valid_to >= pd.to_datetime(analysis_date))
& (dim_gtfs_datasets._valid_from <= pd.to_datetime(date))
& (dim_gtfs_datasets._valid_to >= pd.to_datetime(date))
]
.sort_values(["name", "gtfs_dataset_key"])
.drop_duplicates("name")
)

current_feeds2 = current_feeds.assign(feed_url=current_feeds.apply(lambda x: decode_base64_url(x), axis=1))

df2 = pd.merge(
df,
current_feeds2[["gtfs_dataset_key", "name", "base64_url", "feed_url", "uri"]],
current_feeds[["gtfs_dataset_key", "name", "base64_url", "uri"]],
on="name",
how="inner",
validate="m:1",
Expand All @@ -103,60 +80,6 @@ def add_agency_identifiers(df: pd.DataFrame, date: str) -> pd.DataFrame:
return df2


def add_organization_name(
df: pd.DataFrame,
date: str,
merge_cols: list = [],
) -> pd.DataFrame:
"""
Instead of using the GTFS dataset name (of the quartet), usually
LA Metro Schedule, LA Metro Trip Updates, etc, always
publish with the organization name, LA Metro.

Input a date to filter down what feeds were valid.
Merge columns must be defined and hold prefixes indicating the quartet.
Ex: schedule_gtfs_dataset_key, vehicle_positions_gtfs_dataset_name
"""
quartet = ["schedule", "vehicle_positions", "service_alerts", "trip_updates"]

datasets = ["gtfs_dataset_key", "gtfs_dataset_name", "source_record_id"]

# https://stackoverflow.com/questions/2541401/pairwise-crossproduct-in-python
quartet_cols = [f"{q}_{d}" for q in quartet for d in datasets]

# All the merge cols must be found in quartet_cols
# This is flexible enough so we can take just gtfs_dataset_key or name to merge too
if not all(c in quartet_cols for c in merge_cols):
raise KeyError(
"Unable to detect which GTFS quartet "
f"these columns {df.columns}. "
"Rename to [quartet]_gtfs_dataset_key, "
"[quartet]_gtfs_dataset_name. "
"Valid quartet values: schedule, vehicle_positions, "
"trip_updates, or service_alerts."
)
else:
dim_provider_gtfs_data = (
tbls.mart_transit_database.dim_provider_gtfs_data()
>> filter(_._valid_from <= pd.to_datetime(date), _._valid_to >= pd.to_datetime(date))
>> filter(_._is_current == True)
>> select(_.organization_source_record_id, _.organization_name, _.regional_feed_type, *merge_cols)
>> distinct()
>> collect()
)

df2 = pd.merge(df, dim_provider_gtfs_data, on=merge_cols, how="inner")
return df2


def add_caltrans_district(df: pd.DataFrame, date: str):
"""
Caltrans districts are defined at the organization-level.
"""

return


# https://github.com/cal-itp/data-analyses/blob/main/rt_delay/utils.py
def add_route_name(df: pd.DataFrame) -> pd.DataFrame:
"""
Expand Down
21 changes: 0 additions & 21 deletions _shared_utils/shared_utils/rt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import numpy as np
import pandas as pd
import shapely
import siuba # need for type hints
from calitp_data_analysis.tables import tbls
from numba import jit
from shared_utils import geography_utils, gtfs_utils_v2, rt_dates, utils
Expand Down Expand Up @@ -796,23 +795,3 @@ def get_operators(analysis_date, operator_list, verbose=False):
print(f"not yet run: {itp_id}")
op_list_runstatus[itp_id] = "not_yet_run"
return op_list_runstatus


def get_rt_schedule_feeds_crosswalk(
date: str, keep_cols: list, get_df: bool = True, custom_filtering: dict = None
) -> Union[pd.DataFrame, siuba.sql.verbs.LazyTbl]:
"""
Get fct_daily_rt_feeds, which provides the schedule_feed_key
to use when merging with schedule data.
"""
fct_rt_feeds = tbls.mart_gtfs.fct_daily_rt_feed_files() >> filter(_.date == date)

if get_df:
fct_rt_feeds = (
fct_rt_feeds
>> collect()
>> gtfs_utils_v2.filter_custom_col(custom_filtering)
>> gtfs_utils_v2.subset_cols(keep_cols)
)

return fct_rt_feeds >> gtfs_utils_v2.subset_cols(keep_cols)
Loading