Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add options to handle larger dataset for location models #687

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 77 additions & 2 deletions activitysim/estimation/larch/location_choice.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
from pathlib import Path
from typing import Collection
import pickle
from datetime import datetime

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -46,6 +48,8 @@ def location_choice_model(
settings_file="{name}_model_settings.yaml",
landuse_file="{name}_landuse.csv",
return_data=False,
alt_values_to_feather=False,
chunking_size=None,
):
model_selector = name.replace("_location", "")
model_selector = model_selector.replace("_destination", "")
Expand All @@ -59,12 +63,42 @@ def _read_csv(filename, **kwargs):
filename = filename.format(name=name)
return pd.read_csv(os.path.join(edb_directory, filename), **kwargs)

def _read_feather(filename, **kwargs):
filename = filename.format(name=name)
return pd.read_feather(os.path.join(edb_directory, filename), **kwargs)

def _to_feather(df, filename, **kwargs):
filename = filename.format(name=name)
return df.to_feather(os.path.join(edb_directory, filename), **kwargs)

def _read_pickle(filename, **kwargs):
filename = filename.format(name=name)
return pd.read_pickle(os.path.join(edb_directory, filename))

def _to_pickle(df, filename, **kwargs):
filename = filename.format(name=name)
return df.to_pickle(os.path.join(edb_directory, filename))

def _file_exists(filename):
filename = filename.format(name=name)
return os.path.exists(os.path.join(edb_directory, filename))

coefficients = _read_csv(
coefficients_file,
index_col="coefficient_name",
)
spec = _read_csv(spec_file, comment="#")
alt_values = _read_csv(alt_values_file)

# read alternative values either as csv or feather file
alt_values_fea_file = alt_values_file.replace(".csv", ".fea")
if os.path.exists(
os.path.join(edb_directory, alt_values_fea_file.format(name=name))
):
alt_values = _read_feather(alt_values_fea_file)
else:
alt_values = _read_csv(alt_values_file)
if alt_values_to_feather:
_to_feather(df=alt_values, filename=alt_values_fea_file)
chooser_data = _read_csv(chooser_file)
landuse = _read_csv(landuse_file, index_col="zone_id")
master_size_spec = _read_csv(size_spec_file)
Expand Down Expand Up @@ -152,7 +186,48 @@ def _read_csv(filename, **kwargs):

chooser_index_name = chooser_data.columns[0]
x_co = chooser_data.set_index(chooser_index_name)
x_ca = cv_to_ca(alt_values.set_index([chooser_index_name, alt_values.columns[1]]))

def split(a, n):
k, m = divmod(len(a), n)
return (a[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n))

# process x_ca with cv_to_ca with or without chunking
x_ca_pickle_file = "{name}_x_ca.pkl"
if chunking_size == None:
x_ca = cv_to_ca(
alt_values.set_index([chooser_index_name, alt_values.columns[1]])
)
elif _file_exists(x_ca_pickle_file):
# if pickle file from previous x_ca processing exist, load it to save time
time_start = datetime.now()
x_ca = _read_pickle(x_ca_pickle_file)
print(
f"x_ca data loaded from {name}_x_ca.fea - time elapsed {(datetime.now() - time_start).total_seconds()}"
)
else:
time_start = datetime.now()
# calculate num_chunks based on chunking_size (or max number of rows per chunk)
num_chunks = int(len(alt_values) / chunking_size)
all_person_ids = list(alt_values["person_id"].unique())
split_ids = list(split(all_person_ids, num_chunks))
x_ca_list = []
i = 0
for chunk_ids in split_ids:
alt_values_i = alt_values[alt_values["person_id"].isin(chunk_ids)]
x_ca_i = cv_to_ca(
alt_values_i.set_index([chooser_index_name, alt_values_i.columns[1]])
)
x_ca_list.append(x_ca_i)
print(
f"\rx_ca_i compute done for chunk {i}/{num_chunks} - time elapsed {(datetime.now() - time_start).total_seconds()}"
)
i = i + 1
x_ca = pd.concat(x_ca_list, axis=0)
# save final x_ca result as pickle file to save time for future data loading
_to_pickle(df=x_ca, filename=x_ca_pickle_file)
print(
f"x_ca compute done - time elapsed {(datetime.now() - time_start).total_seconds()}"
)

if CHOOSER_SEGMENT_COLUMN_NAME is not None:
# label segments with names
Expand Down
Loading