Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: multiprocessing for generate_tours #588

Merged
merged 1 commit into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions tests/preprocessing/test_trips.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ class TestGenerate_tours:

def test_generate_tours(self, example_trip_data):
"""Test general functionality of generate tours function"""
trips, sp_locs = example_trip_data
trips_out, tours = ti.preprocessing.trips.generate_tours(trips)
trips, _ = example_trip_data
trips_out, _ = ti.preprocessing.trips.generate_tours(trips)
# check that nothing else than the new column has changed in trips df
assert all(trips_out.iloc[:, :6] == trips)
# check that the two tours were found
Expand All @@ -161,9 +161,22 @@ def test_generate_tours(self, example_trip_data):
user_1_df = trips_out[trips_out["user_id"] == 1]
assert all(pd.isna(user_1_df["tour_id"]))

def test_parallel_computing(self, example_trip_data):
"""The result obtained with parallel computing should be identical."""
trips, _ = example_trip_data

# without parallel computing code
trips_ori, tours_ori = ti.preprocessing.trips.generate_tours(trips, n_jobs=1)
# using two cores
trips_para, tours_para = ti.preprocessing.trips.generate_tours(trips, n_jobs=2)

# the result of parallel computing should be identical
assert_geodataframe_equal(trips_ori, trips_para)
pd.testing.assert_frame_equal(tours_ori, tours_para)

def test_tours_with_gap(self, example_trip_data):
"""Test functionality of max_nr_gaps parameter in tour generation"""
trips, sp_locs = example_trip_data
trips, _ = example_trip_data
trips_out, tours = ti.preprocessing.trips.generate_tours(trips, max_nr_gaps=1)
# new tour was found for user 1
assert len(tours) == 3
Expand All @@ -172,8 +185,8 @@ def test_tours_with_gap(self, example_trip_data):

def test_tour_times(self, example_trip_data):
"""Check whether the start and end times of generated tours are correct"""
trips, sp_locs = example_trip_data
trips_out, tours = ti.preprocessing.trips.generate_tours(trips, max_nr_gaps=1, max_time="1d")
trips, _ = example_trip_data
_, tours = ti.preprocessing.trips.generate_tours(trips, max_nr_gaps=1, max_time="1d")
# check that all times are below the max time
for i, row in tours.iterrows():
time_diff = row["finished_at"] - row["started_at"]
Expand All @@ -189,16 +202,16 @@ def test_tour_times(self, example_trip_data):

def test_tour_geom(self, example_trip_data):
"""Test whether tour generation is invariant to the name of the geometry column"""
trips, sp_locs = example_trip_data
trips, _ = example_trip_data
trips.rename(columns={"geom": "other_geom_name"}, inplace=True)
trips = trips.set_geometry("other_geom_name")
trips_out, tours = ti.preprocessing.trips.generate_tours(trips)
trips_out, _ = ti.preprocessing.trips.generate_tours(trips)
# check that nothing else than the new column has changed in trips df
assert all(trips_out.iloc[:, :6] == trips)

def test_tour_max_time(self, example_trip_data):
"""Test functionality of max time argument in tour generation"""
trips, sp_locs = example_trip_data
trips, _ = example_trip_data
with pytest.warns(UserWarning, match="No tours can be generated, return empty tours"):
_, tours = ti.preprocessing.trips.generate_tours(trips, max_time="2h") # only 2 hours allowed
assert len(tours) == 0
Expand All @@ -208,7 +221,7 @@ def test_tour_max_time(self, example_trip_data):
def test_tours_locations(self, example_trip_data):
"""Test whether tour generation with locations as input yields correct results as well"""
trips, sp_locs = example_trip_data
trips_out, tours = ti.preprocessing.trips.generate_tours(trips, staypoints=sp_locs, max_nr_gaps=1)
_, tours = ti.preprocessing.trips.generate_tours(trips, staypoints=sp_locs, max_nr_gaps=1)
assert all(tours["location_id"] == pd.Series([1, 2, 2]))

# group trips by tour and check that the locations of start and end of each tour are correct
Expand Down Expand Up @@ -263,12 +276,12 @@ def test_accessor(self, example_trip_data):

def test_print_progress_flag(self, example_trip_data, capsys):
"""Test if the print_progress bar controls the printing behavior."""
trips, sp_locs = example_trip_data
trips_out, tours = ti.preprocessing.trips.generate_tours(trips, print_progress=True)
trips, _ = example_trip_data
ti.preprocessing.trips.generate_tours(trips, print_progress=True)
captured_print = capsys.readouterr()
assert captured_print.err != ""

trips_out, tours = ti.preprocessing.trips.generate_tours(trips, print_progress=False)
ti.preprocessing.trips.generate_tours(trips, print_progress=False)
captured_noprint = capsys.readouterr()
assert captured_noprint.err == ""

Expand Down
29 changes: 16 additions & 13 deletions trackintel/preprocessing/trips.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import trackintel as ti
from trackintel import Tours, Trips
from trackintel.preprocessing.util import applyParallel


def get_trips_grouped(trips, tours):
Expand Down Expand Up @@ -59,6 +60,7 @@ def generate_tours(
max_time="1d",
max_nr_gaps=0,
print_progress=False,
n_jobs=1,
):
"""
Generate trackintel-tours from trips
Expand All @@ -85,6 +87,12 @@ def generate_tours(
print_progress : bool, default False
If print_progress is True, the progress bar is displayed

n_jobs: int, default 1
The maximum number of concurrently running jobs. If -1 all CPUs are used. If 1 is given, no parallel
computing code is used at all, which is useful for debugging. See
https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
for a detailed description

Returns
-------
trips_with_tours: Trips
Expand Down Expand Up @@ -150,19 +158,14 @@ def generate_tours(
"geom_col": geom_col,
"crs_is_projected": crs_is_projected,
}
if print_progress:
tqdm.pandas(desc="User tour generation")
tours = (
trips_input.groupby(["user_id"], group_keys=False, as_index=False)
.progress_apply(_generate_tours_user, **kwargs)
.reset_index(drop=True)
)
else:
tours = (
trips_input.groupby(["user_id"], group_keys=False, as_index=False)
.apply(_generate_tours_user, **kwargs)
.reset_index(drop=True)
)

tours = applyParallel(
trips_input.groupby("user_id", group_keys=False, as_index=False),
_generate_tours_user,
print_progress=print_progress,
n_jobs=n_jobs,
**kwargs
)

# No tours found
if len(tours) == 0:
Expand Down
Loading