Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop segments refactor 2 #765

Merged
merged 11 commits into from
May 23, 2023
2 changes: 1 addition & 1 deletion _shared_utils/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
setup(
name="shared_utils",
packages=find_packages(),
version="1.2.0",
version="2.0.0",
description="Shared utility functions for data analyses",
author="Cal-ITP",
license="Apache",
Expand Down
2 changes: 1 addition & 1 deletion _shared_utils/shared_utils/geography_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def cut_segments(

segmented = (
segmented.assign(
segment_sequence=(segmented.groupby(group_cols)["temp_index"].transform("rank") - 1).astype(int)
segment_sequence=(segmented.groupby(group_cols)["temp_index"].transform("rank") - 1).astype("int16")
)
.sort_values(group_cols)
.reset_index(drop=True)
Expand Down
11 changes: 2 additions & 9 deletions _shared_utils/shared_utils/gtfs_utils_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,33 +104,26 @@ def filter_feed_options(
"use_subfeeds",
"current_feeds",
"include_precursor",
"include_precursor_and_future",
]
) -> siuba.dply.verbs.Pipeable:
exclude_future = filter(_["is_future"] == False)
exclude_precursor = filter(_.regional_feed_type != "Regional Precursor Feed")

if feed_option == "customer_facing":
return filter(_.regional_feed_type != "Regional Subfeed") >> exclude_future >> exclude_precursor
return filter(_.regional_feed_type != "Regional Subfeed") >> exclude_precursor

elif feed_option == "use_subfeeds":
return (
filter(
_["name"] != "Bay Area 511 Regional Schedule"
) # keep VCTC combined because the combined feed is the only feed
>> exclude_future
>> exclude_precursor
)

elif feed_option == "current_feeds":
return exclude_future >> exclude_precursor
return exclude_precursor

elif feed_option == "include_precursor":
return exclude_future

elif feed_option == "include_precursor_and_future":
return filter()

else:
return filter()

Expand Down
94 changes: 52 additions & 42 deletions _shared_utils/shared_utils/portfolio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def decode_base64_url(row):
return decoded


def add_agency_identifiers(df: pd.DataFrame) -> pd.DataFrame:
def add_agency_identifiers(df: pd.DataFrame, date: str) -> pd.DataFrame:
"""
Find the current base64_url for the organization name and
decode it as ASCII (Chad Baker request for CKAN data).
Expand All @@ -80,16 +80,21 @@ def add_agency_identifiers(df: pd.DataFrame) -> pd.DataFrame:
dim_gtfs_datasets = gtfs_utils_v2.get_transit_organizations_gtfs_dataset_keys(keep_cols=None, get_df=True)

current_feeds = (
dim_gtfs_datasets[(dim_gtfs_datasets.data_quality_pipeline == True) & (dim_gtfs_datasets._is_current == True)]
.drop_duplicates(subset="name")
.reset_index(drop=True)
dim_gtfs_datasets[
(dim_gtfs_datasets.data_quality_pipeline == True)
& (dim_gtfs_datasets._is_current == True)
& (dim_gtfs_datasets._valid_from <= pd.to_datetime(analysis_date))
& (dim_gtfs_datasets._valid_to >= pd.to_datetime(analysis_date))
]
.sort_values(["name", "gtfs_dataset_key"])
.drop_duplicates("name")
)

current_feeds2 = current_feeds.assign(feed_url=current_feeds.apply(lambda x: decode_base64_url(x), axis=1))

df2 = pd.merge(
df,
current_feeds2[["name", "base64_url", "feed_url", "uri"]],
current_feeds2[["gtfs_dataset_key", "name", "base64_url", "feed_url", "uri"]],
on="name",
how="inner",
validate="m:1",
Expand All @@ -98,53 +103,58 @@ def add_agency_identifiers(df: pd.DataFrame) -> pd.DataFrame:
return df2


def standardize_gtfs_dataset_names(df: pd.DataFrame, name_col: str = "name") -> pd.DataFrame:
"""
Have gtfs_dataset_name reflect the operator.
Remove the distinction between LA Metro Bus and LA Metro Rail (which
show up as 2 feeds, 2 feed_keys, 2 gtfs_dataset_keys in our warehouse).
def add_organization_name(
df: pd.DataFrame,
date: str,
merge_cols: list = [],
) -> pd.DataFrame:
"""
df[name_col] = df[name_col].str.replace("LA Metro Bus", "LA Metro").str.replace("LA Metro Rail", "LA Metro")

return df

Instead of using the GTFS dataset name (of the quartet), usually
LA Metro Schedule, LA Metro Trip Updates, etc, always
publish with the organization name, LA Metro.

# https://github.com/cal-itp/data-analyses/blob/main/bus_service_increase/E5_make_stripplot_data.py
def add_caltrans_district() -> pd.DataFrame:
"""
Returns a dataframe with calitp_itp_id and the caltrans district
Input a date to filter down what feeds were valid.
Merge columns must be defined and hold prefixes indicating the quartet.
Ex: schedule_gtfs_dataset_key, vehicle_positions_gtfs_dataset_name
"""
df = (
(
tbls.airtable.california_transit_organizations()
>> select(_.itp_id, _.caltrans_district)
quartet = ["schedule", "vehicle_positions", "service_alerts", "trip_updates"]

datasets = ["gtfs_dataset_key", "gtfs_dataset_name", "source_record_id"]

# https://stackoverflow.com/questions/2541401/pairwise-crossproduct-in-python
quartet_cols = [f"{q}_{d}" for q in quartet for d in datasets]

# All the merge cols must be found in quartet_cols
# This is flexible enough so we can take just gtfs_dataset_key or name to merge too
if not all(c in quartet_cols for c in merge_cols):
raise KeyError(
"Unable to detect which GTFS quartet "
f"these columns {df.columns}. "
"Rename to [quartet]_gtfs_dataset_key, "
"[quartet]_gtfs_dataset_name. "
"Valid quartet values: schedule, vehicle_positions, "
"trip_updates, or service_alerts."
)
else:
dim_provider_gtfs_data = (
tbls.mart_transit_database.dim_provider_gtfs_data()
>> filter(_._valid_from <= pd.to_datetime(date), _._valid_to >= pd.to_datetime(date))
>> filter(_._is_current == True)
>> select(_.organization_source_record_id, _.organization_name, _.regional_feed_type, *merge_cols)
>> distinct()
>> collect()
>> filter(_.itp_id.notna())
)
.sort_values(["itp_id", "caltrans_district"])
.drop_duplicates(subset="itp_id")
.rename(columns={"itp_id": "calitp_itp_id"})
.astype({"calitp_itp_id": int})
.reset_index(drop=True)
)

# Some ITP IDs are missing district info
missing_itp_id_district = {
48: "03 - Marysville", # B-Line in Butte County
70: "04 - Oakland", # Cloverdale Transit in Sonoma County
82: "02 - Redding", # Burney Express in Redding
142: "12 - Irvine", # Irvine Shuttle
171: "07 - Los Angeles", # Avocado Heights/Bassett/West Valinda Shuttle
473: "05 - San Luis Obispo", # Clean Air Express in Santa Barbara
}
df2 = pd.merge(df, dim_provider_gtfs_data, on=merge_cols, how="inner")
return df2

missing_ones = pd.DataFrame(missing_itp_id_district.items(), columns=["calitp_itp_id", "caltrans_district"])

# If there are any missing district info for ITP IDs, fill it in now
df2 = pd.concat([df, missing_ones], axis=0).sort_values("calitp_itp_id").reset_index(drop=True)
def add_caltrans_district(df: pd.DataFrame, date: str):
"""
Caltrans districts are defined at the organization-level.
"""

return df2
return


# https://github.com/cal-itp/data-analyses/blob/main/rt_delay/utils.py
Expand Down
3 changes: 3 additions & 0 deletions rt_segment_speeds/dask_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@
* `dd.compute(*results)` [https://docs.dask.org/en/stable/delayed-collections.html](docs) - this doesn't work when the pandas df that results needs `df.compute()`.
* How to better manipulate `delayed` objects through the workflow to actually do the compute once?


## Client
* [GH: Dask workers don't reload modified modules](https://github.com/dask/distributed/issues/4245)
Loading