Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gtfs funnel performance improvements #1335

Merged
merged 11 commits into from
Dec 27, 2024
3 changes: 1 addition & 2 deletions gtfs_funnel/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ preprocess_schedule_vp_dependency:

preprocess_vp:
python vp_keep_usable.py
python vp_direction.py
python cleanup.py
python vp_dwell_time.py
python vp_condenser.py
Expand All @@ -22,7 +21,7 @@ preprocess_schedule_only:
make route_typologies_data
python operator_scheduled_stats.py
python schedule_stats_by_stop.py
#python track_publish_dates.py
python track_publish_dates.py

route_typologies_data:
python route_typologies.py
Expand Down
6 changes: 4 additions & 2 deletions gtfs_funnel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ Use `update_vars` and input one or several days to download.
1. Harmonize how route names are displayed in time-series: `make timeseries_preprocessing`
1. Download monthly aggregated schedule data: `make monthly_scheduled_data`

[![gtfs_funnel_mermaid1](https://mermaid.ink/img/pako:eNqlVX9v2jAQ_SqWJwSTgBESCk23SqUQ_to0qWiTllSRSS7EarAj22lLK777nKRAKL86DSmJ8b1393w--15xwEPANm61Wh5TVCVgo8nUuUNOxhgkqPFTQCp4AFJSNv_ssQJYq716DCHKqLJRMUSormJYQN1G9RmRUG9WZ38RQcksAVnfwLUpFXRBxPKWJ1zkvE_jntNzbtbULWIKz2qL6nQ6-5AhFyGIY6CEMjhmkxBwFu7qcJz-eFjBKBCK7kCiKKqX5lX-0a9VreYxj80FSWM0HZbG8i2zWTkd8ieWcKKVrl0jFCREyhFEKIxQRJPEfkvDAYQMBE3VGlWIzEOucTdGw11H8JWGyna6vP9s2_YbsdW6Rlt4QXEL3L3G6PD79hiSFIRs00XKhfJlEEOYJfDm_moLHlZjy5ikcD740C2BX2fi-vfkbmD9u4zSQUXHrVGRoXiZglMibt0C9j8acn5FwmgnFdroK7r4QDpG7hZ8VMWo654UUtJ3OU9UxX5IBQSKcoa-oanI4ItDEgn3Fd3jqu5HiGmQgJ9ySXPWefnjbsPVJykgCph-znrYZef-xu5j-m4frtb1DSxcDzenabteP63eUpWteKdzZDYqSfZ3E3NyhaPuSSmP6TEJ43JpOrDGPACkfibzq_B8Pq2C8jF5-c_J4aXzo-Uz7ulNSoCwLN31V6kDp-A5RhE-vxuBSRDnw3er-DDfSP2fAREglf7SeTzjIp_-cTMamJv93V6ThxLLdakTxYUfCC7lE0keKvfdARUTLXsD9ecqkn5IlG5ISid_6SvuczEnjL6QAzndL8mJW7jIqVUe2oTYL1PcxAsQC0JD3VaLZufhogl62NbDECKSJcrDumNoKMkUv1uyANtKH8omzlItF0aU6OUvsB3lh7SJIaQ6B9_LVl107CZOCcP2K37GdmvQbZuXZvfSMvsD87JrWE28xLZ50Tas3sC6sCyj1xlYxqqJXzjXXo1219BNsGP0TVOjrb5VuPtTGIuQq79773TT?type=png)](https://mermaid.live/edit#pako:eNqlVX9v2jAQ_SqWJwSTgBESCk23SqUQ_to0qWiTllSRSS7EarAj22lLK777nKRAKL86DSmJ8b1393w--15xwEPANm61Wh5TVCVgo8nUuUNOxhgkqPFTQCp4AFJSNv_ssQJYq716DCHKqLJRMUSormJYQN1G9RmRUG9WZ38RQcksAVnfwLUpFXRBxPKWJ1zkvE_jntNzbtbULWIKz2qL6nQ6-5AhFyGIY6CEMjhmkxBwFu7qcJz-eFjBKBCK7kCiKKqX5lX-0a9VreYxj80FSWM0HZbG8i2zWTkd8ieWcKKVrl0jFCREyhFEKIxQRJPEfkvDAYQMBE3VGlWIzEOucTdGw11H8JWGyna6vP9s2_YbsdW6Rlt4QXEL3L3G6PD79hiSFIRs00XKhfJlEEOYJfDm_moLHlZjy5ikcD740C2BX2fi-vfkbmD9u4zSQUXHrVGRoXiZglMibt0C9j8acn5FwmgnFdroK7r4QDpG7hZ8VMWo654UUtJ3OU9UxX5IBQSKcoa-oanI4ItDEgn3Fd3jqu5HiGmQgJ9ySXPWefnjbsPVJykgCph-znrYZef-xu5j-m4frtb1DSxcDzenabteP63eUpWteKdzZDYqSfZ3E3NyhaPuSSmP6TEJ43JpOrDGPACkfibzq_B8Pq2C8jF5-c_J4aXzo-Uz7ulNSoCwLN31V6kDp-A5RhE-vxuBSRDnw3er-DDfSP2fAREglf7SeTzjIp_-cTMamJv93V6ThxLLdakTxYUfCC7lE0keKvfdARUTLXsD9ecqkn5IlG5ISid_6SvuczEnjL6QAzndL8mJW7jIqVUe2oTYL1PcxAsQC0JD3VaLZufhogl62NbDECKSJcrDumNoKMkUv1uyANtKH8omzlItF0aU6OUvsB3lh7SJIaQ6B9_LVl107CZOCcP2K37GdmvQbZuXZvfSMvsD87JrWE28xLZ50Tas3sC6sCyj1xlYxqqJXzjXXo1219BNsGP0TVOjrb5VuPtTGIuQq79773TT)

[![gtfs_funnel_mermaid1](https://mermaid.ink/img/pako:eNqllW9v2jAQxr-K5QlBJWCEhJamW6VSSF9tmlS0SQtVZJILsWrsyHZKadXvPicpJLQFNu0FSYh_z92T8597xqGIALu40-nMuKaagYtupt4t8jLOgaHWDwmpFCEoRfniZMYLsNF4nnGEKKfaRcUjQk2dwBKaLmrOiYJmu_72J5GUzBmo5hY3Q6mkSyLX14IJmes-TQbewLvaSCtiCo-6onq93ntkJGQEch_EKId9YwpCwaNdH553NhnVGA1S0x0kjuNmOfyS38zlpdGY8RlfSJImaDoqB8uryubl60isOBPEON2ERihkRKkxxCiKUUwZc1_L8AGhQklTvaEKk3nKDXdltfxNhkAbVHXT9d2J67qvwk7nElV4IfEL7s4wJv378QRYClJ16TIVUgcqTCDKGLyGv6jgUT23SkgKx5OP_BL8MpeXv25uh86_2ygD1HxcWzUbWpQlOGTi2i-w__GQ62sWxjulMIOBpsu_KMfYr-C9LsZ9_6CRUr6rWVGdBBGVEGoqOPqKpjKDzx5hCu5qvid13w-Q0JBBkApFc9Vx-5N-yzc7KSQauPkdjbCrzuNN_If0zTxcbNY38GjzuN1N1fcGaf2Uqk3FG59ju1UrcrBbmINfOO4ftPKQ7rMwKT_NJDbMPUAaZCo_Co_X0ykk0QoYK_weVXg5X0YvZXtX0WRg5ooB4Vm6G7W2HLxC51mFi_yIBK5AHjfRz3kORILS5k4XyVzIfFq_X42H9nZaq9Pxo3oKs8KJFjIIpVBqRdh97Zj7IOuNsblFg4WOVRARbfqQNjVfB1oEQi4Ip0_kg5l-vxJv_CJELq3r0DbF-9WJ23gJckloZLpp0eNmuOh9M-yaxwhikjE9w6ZRGJRkWtyueYhdbfZiG0uRLRLsxvmebOMsNeZhTIkpxnL7NiX8txDVf4ioqdC3sn8XbbxgsPuMH7HbGfa79rndP3fss6F93recNl5j1z7tWs5g6Jw6jjXoDR3rpY2fiqhWt2-Zztizzmzb0M6Z8_IHVGx2Qw?type=png)](https://mermaid.live/edit#pako:eNqllW9v2jAQxr-K5QlBJWCEhJamW6VSSF9tmlS0SQtVZJILsWrsyHZKadXvPicpJLQFNu0FSYh_z92T8597xqGIALu40-nMuKaagYtupt4t8jLOgaHWDwmpFCEoRfniZMYLsNF4nnGEKKfaRcUjQk2dwBKaLmrOiYJmu_72J5GUzBmo5hY3Q6mkSyLX14IJmes-TQbewLvaSCtiCo-6onq93ntkJGQEch_EKId9YwpCwaNdH553NhnVGA1S0x0kjuNmOfyS38zlpdGY8RlfSJImaDoqB8uryubl60isOBPEON2ERihkRKkxxCiKUUwZc1_L8AGhQklTvaEKk3nKDXdltfxNhkAbVHXT9d2J67qvwk7nElV4IfEL7s4wJv378QRYClJ16TIVUgcqTCDKGLyGv6jgUT23SkgKx5OP_BL8MpeXv25uh86_2ygD1HxcWzUbWpQlOGTi2i-w__GQ62sWxjulMIOBpsu_KMfYr-C9LsZ9_6CRUr6rWVGdBBGVEGoqOPqKpjKDzx5hCu5qvid13w-Q0JBBkApFc9Vx-5N-yzc7KSQauPkdjbCrzuNN_If0zTxcbNY38GjzuN1N1fcGaf2Uqk3FG59ju1UrcrBbmINfOO4ftPKQ7rMwKT_NJDbMPUAaZCo_Co_X0ykk0QoYK_weVXg5X0YvZXtX0WRg5ooB4Vm6G7W2HLxC51mFi_yIBK5AHjfRz3kORILS5k4XyVzIfFq_X42H9nZaq9Pxo3oKs8KJFjIIpVBqRdh97Zj7IOuNsblFg4WOVRARbfqQNjVfB1oEQi4Ip0_kg5l-vxJv_CJELq3r0DbF-9WJ23gJckloZLpp0eNmuOh9M-yaxwhikjE9w6ZRGJRkWtyueYhdbfZiG0uRLRLsxvmebOMsNeZhTIkpxnL7NiX8txDVf4ioqdC3sn8XbbxgsPuMH7HbGfa79rndP3fss6F93recNl5j1z7tWs5g6Jw6jjXoDR3rpY2fiqhWt2-Zztizzmzb0M6Z8_IHVGx2Qw)


[![speeds_stop_times_mermaid](https://mermaid.ink/img/pako:eNqlVV1v2jAU_SuWJwSTgPERCqTapEJh07buYUV7WEDITW7AmmNHttOWVvz3OU5YkkK7Lx7A-J577_Hxsf2IfREAdnGr1VpyTTUDF31doGvYRMA1uo4BAoUID-ysFjFa0AjUktuEWu1xyRGinGoX2SFCdb2FCOouqt8QBfVmefYbkZTcMFD1X3ATiiWNiNxNBRMyzXs1G8wH84tDaoFYwL0uUJ1O5xgyETIA-RyIUQ7PxRT4ggdVHvP5cDYpYTRITSuQMAzrWXif_pivfa225EseMnHnb4nUaDHJACq52UgSb5HKxKUPcCiMkM-IUpcQoiBEIWXMzUU4gVC-pLE-oCzFU6isyWIXQw6dT-f98bSJlJbiB7gp9ZToIfOi2_D8RK-V2eR1nq3a8W712nXdvGerhYqE9EMYQ9qEkM3SqTUM6N0T1IVXKboyBYPwvMBMTGsFDPy_6C44ZJ1jkEhtSQx1VWJRAJXIpmNCpXGyBGT2WVGlU38XMOJLoZQtaZdQYufZ8lVqq_OydtkIeHCYLPbaHqB1TGNIzVdOmjY8DoaP0utbw1rYBkdLfqrlZcOj3PgwFozonBSRkt4S9tvcWcMr45VtmhKsZp6jA82TK0ovgVyGfHlFl3kjj6y1Md7bimS2QcmVx_ymJU_MbPx9wyO3IMkGDmUywsfWKG3YB08KxhLTmfINg4Ds3uQTUcI0Nf9L7sj2R4rEiBlQ-YJJM0Ivi2MuyVMe_FiVReY-t6h_keVTSZYkSu-9P5Dl83_IsipRuPIK8us7qrcpwJxdKvjKsps9WcMXT_lbCBIG68rKizOUS4mbOAIZERqYF8m-D0ts340lds0wgJAYpktsLlkDJYkW1zvuY1fLBJo4iQNzIi4pMVsRYTc0BjezEFAt5FX2ytnHroljwrH7iO-x2xr12v1xvzd2-sNRf9zrOk28w27_rN11BiPnzHG6g87I6e6b-EEIU7Xb7nXNu9HpDvt9g3aGji333QZty_1PICdHvQ?type=png)](https://mermaid.live/edit#pako:eNqlVV1v2jAU_SuWJwSTgPERCqTapEJh07buYUV7WEDITW7AmmNHttOWVvz3OU5YkkK7Lx7A-J577_Hxsf2IfREAdnGr1VpyTTUDF31doGvYRMA1uo4BAoUID-ysFjFa0AjUktuEWu1xyRGinGoX2SFCdb2FCOouqt8QBfVmefYbkZTcMFD1X3ATiiWNiNxNBRMyzXs1G8wH84tDaoFYwL0uUJ1O5xgyETIA-RyIUQ7PxRT4ggdVHvP5cDYpYTRITSuQMAzrWXif_pivfa225EseMnHnb4nUaDHJACq52UgSb5HKxKUPcCiMkM-IUpcQoiBEIWXMzUU4gVC-pLE-oCzFU6isyWIXQw6dT-f98bSJlJbiB7gp9ZToIfOi2_D8RK-V2eR1nq3a8W712nXdvGerhYqE9EMYQ9qEkM3SqTUM6N0T1IVXKboyBYPwvMBMTGsFDPy_6C44ZJ1jkEhtSQx1VWJRAJXIpmNCpXGyBGT2WVGlU38XMOJLoZQtaZdQYufZ8lVqq_OydtkIeHCYLPbaHqB1TGNIzVdOmjY8DoaP0utbw1rYBkdLfqrlZcOj3PgwFozonBSRkt4S9tvcWcMr45VtmhKsZp6jA82TK0ovgVyGfHlFl3kjj6y1Md7bimS2QcmVx_ymJU_MbPx9wyO3IMkGDmUywsfWKG3YB08KxhLTmfINg4Ds3uQTUcI0Nf9L7sj2R4rEiBlQ-YJJM0Ivi2MuyVMe_FiVReY-t6h_keVTSZYkSu-9P5Dl83_IsipRuPIK8us7qrcpwJxdKvjKsps9WcMXT_lbCBIG68rKizOUS4mbOAIZERqYF8m-D0ts340lds0wgJAYpktsLlkDJYkW1zvuY1fLBJo4iQNzIi4pMVsRYTc0BjezEFAt5FX2ytnHroljwrH7iO-x2xr12v1xvzd2-sNRf9zrOk28w27_rN11BiPnzHG6g87I6e6b-EEIU7Xb7nXNu9HpDvt9g3aGji333QZty_1PICdHvQ)

Expand All @@ -24,5 +26,5 @@ Use `update_vars` and input one or several days to download.
| | | pipeline and workstream outputs available |
|---|---|---|
| Sampled Wednesdays Each Month for Time-Series<br>[rt_dates.py](../_shared_utils/shared_utils/rt_dates.py) | Mar 2023 - present | downloaded schedule tables (trips, shapes, stops, stop_times)<br>downloaded vehicle positions (vp)<br><br>`gtfs_funnel`: intermediate outputs for schedule and vp<br>* crosswalk<br>* schedule only metrics related to service availability<br>* operator aggregated metrics from schedule data<br>* route typologies<br><br><br>`rt_segment_speeds`: vp interpreted as speeds against <br>various segment types<br>* segment types: <br>(1) `stop segments` (shape-stop segments,<br>most common shape selected for a route-direction and all <br>trips aggregated to that shape)<br>(2) `rt_stop_times` (trip-stop segments, most granular, <br>cannot be aggregated, but used for rt_stop_times table)<br>(3) `speedmap segments`<br>(4) `road segments` (1 km road segments with all <br>transit across operators aggregated to the same physical <br>road space, currently WIP)<br>* interpolated stop arrivals <br>* speeds by trip<br>* segment and summary speeds for single day<br><br>`rt_vs_schedule`: <br>* RT vs schedule metrics<br>* rt_stop_times table (companion to scheduled stop_times)<br><br>`gtfs_digest`:<br>* downstream data product using all the outputs created in <br>gtfs_funnel, rt_segment_speeds, rt_vs_schedule. |
| Full Week for Weekly Averages<br>April / October each year | Apr 2023<br>Oct 2023<br>Apr 2024 | rt_segment_speeds:<br>* segment and summary speeds for a week<br><br>gtfs_digest<br>* service hours by hour for weekday / Saturday / Sunday |
| Full Week for Weekly Averages<br>April / October each year | Apr 2023<br>Oct 2023<br>Apr 2024<br>Oct 2024 | rt_segment_speeds:<br>* segment and summary speeds for a week<br><br>gtfs_digest<br>* service hours by hour for weekday / Saturday / Sunday |
| | | |
2 changes: 1 addition & 1 deletion gtfs_funnel/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
INPUT_FILE = GTFS_DATA_DICT.speeds_tables.usable_vp

publish_utils.if_exists_then_delete(
f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage"
f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage.parquet"
)
publish_utils.if_exists_then_delete(
f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet"
Expand Down
8 changes: 4 additions & 4 deletions gtfs_funnel/mermaid.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ graph TB

subgraph vp_preprocessing
E --> E3([vp_keep_usable.py]):::script -->
E4([vp_direction.py]):::script -->
F[vp_usable]:::df -->
E4([vp_dwell_time.py]):::script -->
F[vp_usable_dwell]:::df -->
E5([cleanup.py]):::script;
F --> F1([vp_condenser.py]):::script -->
F2[vp_condensed<br>vp_nearest_neighbor<br>NAD83]:::df;
F2[vp_nearest_neighbor<br>NAD83]:::df;

end

Expand Down Expand Up @@ -126,7 +126,7 @@ flowchart TB
end

subgraph RT stop_times
J(segment_type=rt_stop_times):::segmentType -->
J(segment_type=rt_stop_times<br>speedmap_segments):::segmentType -->
C;
E --> K([average_summary_speeds.py]):::script -->
L[rollup_singleday/rollup_multiday
Expand Down
210 changes: 96 additions & 114 deletions gtfs_funnel/stop_times_with_direction.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import geopandas as gpd
import numpy as np
import pandas as pd
import sys

from loguru import logger

from calitp_data_analysis import utils
from shared_utils import rt_utils
Expand Down Expand Up @@ -56,109 +59,114 @@ def prep_scheduled_stop_times(analysis_date: str) -> gpd.GeoDataFrame:
).drop(columns = ["trip_id"])

st_with_stop = gpd.GeoDataFrame(
st_with_stop, geometry = "geometry", crs = PROJECT_CRS)
st_with_stop, geometry = "geometry", crs = PROJECT_CRS
)

return st_with_stop


def get_projected_stop_meters(
stop_times: pd.DataFrame,
shapes: gpd.GeoDataFrame
) -> pd.DataFrame:
stop_times: gpd.GeoDataFrame,
analysis_date: str,
) -> pd.Series:
"""
Project the stop's position to the shape and
get stop_meters (meters from start of the shape).
Only return stop_meters as pd.Series to use as a column later.
"""
shapes = helpers.import_scheduled_shapes(
analysis_date,
columns = ["shape_array_key", "geometry"],
crs = PROJECT_CRS,
get_pandas=True
).dropna(subset="geometry")

gdf = pd.merge(
stop_times,
shapes.rename(columns = {"geometry": "shape_geometry"}),
stop_times.to_crs(PROJECT_CRS),
shapes.to_crs(PROJECT_CRS).rename(columns = {"geometry": "shape_geometry"}),
on = "shape_array_key",
how = "inner"
)
).set_geometry("geometry")

gdf = gdf.assign(
stop_meters = gdf.shape_geometry.project(gdf.geometry)
).drop(columns = "shape_geometry").drop_duplicates()

return gdf
stop_meters = gdf.shape_geometry.project(gdf.geometry)

return stop_meters


def find_prior_subseq_stop(
stop_times: gpd.GeoDataFrame,
trip_stop_cols: list
def find_prior_subseq_stop_info(
stop_times: gpd.GeoDataFrame,
analysis_date: str,
trip_cols: list = ["trip_instance_key"],
trip_stop_cols: list = ["trip_instance_key", "stop_sequence"]
) -> gpd.GeoDataFrame:
"""
For trip-stop, find the previous stop (using stop sequence).
Attach the previous stop's geometry.
This will determine the direction for the stop (it's from prior stop).
Add in subseq stop information too.

Create columns related to comparing current to prior stop.
- stop_pair (stop_id1_stop_id2)
- stop_pair_name (stop_name1__stop_name2)
"""
prior_stop = stop_times[trip_stop_cols].sort_values(
trip_stop_cols).reset_index(drop=True)

prior_stop = prior_stop.assign(
prior_stop_sequence = (prior_stop.groupby("trip_instance_key")
stop_meters = get_projected_stop_meters(stop_times, analysis_date)

gdf = stop_times[
trip_stop_cols + ["stop_id", "stop_name", "geometry"]
].assign(
stop_meters = stop_meters
)

gdf = gdf.assign(
prior_geometry = (gdf.groupby(trip_cols)
.geometry
.shift(1)),
prior_stop_sequence = (gdf.groupby(trip_cols)
.stop_sequence
.shift(1)),
# add subseq stop info here
subseq_stop_sequence = (prior_stop.groupby("trip_instance_key")
subseq_stop_sequence = (gdf.groupby(trip_cols)
.stop_sequence
.shift(-1)),
subseq_stop_id = (prior_stop.groupby("trip_instance_key")
subseq_stop_id = (gdf.groupby(trip_cols)
.stop_id
.shift(-1)),
subseq_stop_name = (prior_stop.groupby("trip_instance_key")
subseq_stop_name = (gdf.groupby(trip_cols)
.stop_name
.shift(-1))
.shift(-1)),
).fillna({
**{c: "" for c in ["subseq_stop_id", "subseq_stop_name"]}
})


stop_direction = np.vectorize(rt_utils.primary_cardinal_direction)(
gdf.prior_geometry.fillna(gdf.geometry), gdf.geometry)

# Just keep subset of columns because we'll get other stop columns back when we merge with stop_times
keep_cols = [
"trip_instance_key", "stop_sequence",
"stop_meters",
"prior_stop_sequence", "subseq_stop_sequence"
]

# Create stop pair with underscores, since stop_id
# can contain hyphens
gdf2 = gdf[keep_cols].assign(
stop_primary_direction = stop_direction,
stop_pair = gdf.stop_id.astype(str).str.cat(
gdf.subseq_stop_id.astype(str)),
stop_pair_name = gdf.stop_name.astype(str).str.cat(
gdf.subseq_stop_name.astype(str)),
)

# Merge in prior stop geom as a separate column so we can
# calculate distance / direction
prior_stop_geom = (stop_times[["trip_instance_key",
"stop_sequence", "geometry"]]
.rename(columns = {
"stop_sequence": "prior_stop_sequence",
"geometry": "prior_geometry"
})
.set_geometry("prior_geometry")
)

stop_times_with_prior = pd.merge(
stop_times_geom_direction = pd.merge(
stop_times,
prior_stop,
gdf2,
on = trip_stop_cols,
how = "left"
how = "inner"
)

stop_times_with_prior_geom = pd.merge(
stop_times_with_prior,
prior_stop_geom,
on = ["trip_instance_key", "prior_stop_sequence"],
how = "left"
).astype({
"prior_stop_sequence": "Int64",
"subseq_stop_sequence": "Int64"
}).fillna({
"subseq_stop_id": "",
"subseq_stop_name": ""
})

# Create stop pair with underscores, since stop_id
# can contain hyphens
stop_times_with_prior_geom = stop_times_with_prior_geom.assign(
stop_pair = stop_times_with_prior_geom.apply(
lambda x:
str(x.stop_id) + "__" + str(x.subseq_stop_id),
axis=1,
),
stop_pair_name = stop_times_with_prior_geom.apply(
lambda x:
x.stop_name + "__" + x.subseq_stop_name,
axis=1,
),
).drop(columns = ["subseq_stop_id", "subseq_stop_name"])

return stop_times_with_prior_geom

return stop_times_geom_direction


def assemble_stop_times_with_direction(
Expand All @@ -179,50 +187,17 @@ def assemble_stop_times_with_direction(

scheduled_stop_times = prep_scheduled_stop_times(analysis_date)

trip_stop_cols = ["trip_instance_key", "stop_sequence",
"stop_id", "stop_name"]
trip_cols = ["trip_instance_key"]
trip_stop_cols = ["trip_instance_key", "stop_sequence"]

scheduled_stop_times2 = find_prior_subseq_stop(
scheduled_stop_times, trip_stop_cols
)

other_stops = scheduled_stop_times2[
~(scheduled_stop_times2.prior_geometry.isna())
]

first_stop = scheduled_stop_times2[
scheduled_stop_times2.prior_geometry.isna()
]

first_stop = first_stop.assign(
stop_primary_direction = "Unknown"
).drop(columns = "prior_geometry")

other_stops_no_geom = other_stops.drop(columns = ["prior_geometry"])

prior_geom = other_stops.prior_geometry
current_geom = other_stops.geometry

# Create a column with readable direction like westbound, eastbound, etc
stop_direction = np.vectorize(
rt_utils.primary_cardinal_direction)(prior_geom, current_geom)
stop_distance = prior_geom.distance(current_geom)

other_stops_no_geom = other_stops_no_geom.assign(
stop_primary_direction = stop_direction,
stop_meters = stop_distance,
)

scheduled_stop_times_with_direction = pd.concat(
[first_stop, other_stops_no_geom],
axis=0
)

df = scheduled_stop_times_with_direction.sort_values(
trip_stop_cols).reset_index(drop=True)

time1 = datetime.datetime.now()
print(f"get scheduled stop times with direction: {time1 - start}")
df = find_prior_subseq_stop_info(
scheduled_stop_times,
analysis_date,
trip_cols = trip_cols,
trip_stop_cols = trip_stop_cols
).sort_values(
trip_stop_cols
).reset_index(drop=True)

utils.geoparquet_gcs_export(
df,
Expand All @@ -231,15 +206,22 @@ def assemble_stop_times_with_direction(
)

end = datetime.datetime.now()
print(f"execution time: {end - start}")
logger.info(
f"scheduled stop times with direction {analysis_date}: {end - start}"
)

return


if __name__ == "__main__":

from update_vars import analysis_date_list


LOG_FILE = "./logs/preprocessing.log"
logger.add(LOG_FILE, retention="3 months")
logger.add(sys.stderr,
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
level="INFO")

for date in analysis_date_list:
print(date)
assemble_stop_times_with_direction(date, GTFS_DATA_DICT)
Loading
Loading