-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #703 from cal-itp/error-stop-segments
Error stop segments
- Loading branch information
Showing
21 changed files
with
2,047 additions
and
121 deletions.
There are no files selected for viewing
Empty file.
1,280 changes: 1,280 additions & 0 deletions
1,280
rt_segment_speeds/09_loop_shapes_exploration.ipynb
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
2023-04-01 12:35:44.231 | INFO | __main__:<module>:134 - Analysis date: 2023-02-15 | ||
2023-04-01 12:39:00.176 | INFO | __main__:<module>:146 - pare down vp | ||
2023-04-01 12:39:00.277 | INFO | __main__:<module>:149 - execution time: 0:03:16.043023 | ||
2023-04-03 10:28:38.880 | INFO | __main__:<module>:133 - Analysis date: 2023-03-15 | ||
2023-04-03 10:30:40.291 | INFO | __main__:<module>:145 - pare down vp | ||
2023-04-03 10:30:40.293 | INFO | __main__:<module>:148 - execution time: 0:02:01.403003 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
""" | ||
Pre-processing vehicle positions. | ||
Drop all RT trips with less than ___ min of data. | ||
Create 2 dfs of trips that are straightforward - no loops, no inlining, | ||
and ones that are more complex? | ||
""" | ||
import dask.dataframe as dd | ||
import datetime | ||
import geopandas as gpd | ||
import numpy as np | ||
import pandas as pd | ||
import sys | ||
|
||
from loguru import logger | ||
|
||
from shared_utils import utils | ||
from segment_speed_utils import helpers | ||
from segment_speed_utils.project_vars import (SEGMENT_GCS, analysis_date, | ||
CONFIG_PATH) | ||
|
||
def trip_time_elapsed( | ||
ddf: dd.DataFrame, | ||
group_cols: list, | ||
timestamp_col: str | ||
): | ||
""" | ||
Group by trip and calculate the time elapsed (max_time-min_time) | ||
for RT vp observed. | ||
""" | ||
min_time = (ddf.groupby(group_cols) | ||
[timestamp_col] | ||
.min() | ||
.reset_index() | ||
.rename(columns = {timestamp_col: "min_time"}) | ||
) | ||
|
||
|
||
max_time = (ddf.groupby(group_cols) | ||
[timestamp_col] | ||
.max() | ||
.reset_index() | ||
.rename(columns = {timestamp_col: "max_time"}) | ||
) | ||
|
||
df = dd.merge( | ||
min_time, | ||
max_time, | ||
on = group_cols, | ||
how = "outer" | ||
) | ||
|
||
df = df.assign( | ||
trip_time_sec = (df.max_time - df.min_time) / np.timedelta64(1, "s") | ||
) | ||
|
||
return df | ||
|
||
|
||
def get_valid_trips_by_time_cutoff( | ||
ddf: dd.DataFrame, | ||
timestamp_col: str, | ||
trip_time_min_cutoff: int | ||
)-> pd.DataFrame: | ||
""" | ||
Filter down trips by trip time elapsed. | ||
Set the number of minutes to do cut-off for at least x min of RT. | ||
""" | ||
trip_cols = ["gtfs_dataset_key", "trip_id"] | ||
trip_stats = trip_time_elapsed( | ||
ddf, | ||
trip_cols, | ||
timestamp_col | ||
) | ||
|
||
usable_trips = (trip_stats[ | ||
trip_stats.trip_time_sec >= trip_time_min_cutoff * 60] | ||
[trip_cols] | ||
.drop_duplicates() | ||
.reset_index(drop=True) | ||
) | ||
|
||
return usable_trips | ||
|
||
|
||
def pare_down_vp_to_valid_trips( | ||
analysis_date: str, | ||
dict_inputs: dict = {} | ||
): | ||
""" | ||
Pare down vehicle positions that have been joined to segments | ||
to keep the enter / exit timestamps. | ||
Also, exclude any bad batches of trips. | ||
""" | ||
INPUT_FILE_PREFIX = dict_inputs["stage0"] | ||
TIMESTAMP_COL = dict_inputs["timestamp_col"] | ||
TIME_CUTOFF = dict_inputs["time_min_cutoff"] | ||
EXPORT_FILE = dict_inputs["stage1"] | ||
|
||
vp = gpd.read_parquet( | ||
f"{SEGMENT_GCS}{INPUT_FILE_PREFIX}_{analysis_date}.parquet" | ||
) | ||
|
||
usable_trips = get_valid_trips_by_time_cutoff( | ||
vp, | ||
TIMESTAMP_COL, | ||
TIME_CUTOFF | ||
) | ||
|
||
usable_vp = pd.merge( | ||
vp, | ||
usable_trips, | ||
on = ["gtfs_dataset_key", "trip_id"], | ||
how = "inner" | ||
) | ||
|
||
utils.geoparquet_gcs_export( | ||
usable_vp, | ||
SEGMENT_GCS, | ||
f"{EXPORT_FILE}_{analysis_date}" | ||
) | ||
|
||
|
||
|
||
if __name__ == "__main__": | ||
|
||
LOG_FILE = "../logs/usable_rt_vp.log" | ||
logger.add(LOG_FILE, retention="3 months") | ||
logger.add(sys.stderr, | ||
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", | ||
level="INFO") | ||
|
||
logger.info(f"Analysis date: {analysis_date}") | ||
|
||
start = datetime.datetime.now() | ||
|
||
ROUTE_SEG_DICT = helpers.get_parameters(CONFIG_PATH, "route_segments") | ||
|
||
time1 = datetime.datetime.now() | ||
pare_down_vp_to_valid_trips( | ||
analysis_date, | ||
dict_inputs = ROUTE_SEG_DICT | ||
) | ||
|
||
logger.info(f"pare down vp") | ||
|
||
end = datetime.datetime.now() | ||
logger.info(f"execution time: {end-start}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.