Skip to content

Commit

Permalink
Merge pull request #802 from AIStream-Peelout/multimodal_models
Browse files Browse the repository at this point in the history
Read the Docs Updates and Fixes
  • Loading branch information
isaacmg authored Oct 8, 2024
2 parents 1b2c801 + ebc17d7 commit df94e25
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 46 deletions.
18 changes: 6 additions & 12 deletions .idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ sphinx-autodoc-typehints
pandas
numpy
torch
matplotlib
plotly
google-cloud-storage
scikit-learn
wandb
shap
einops
2 changes: 1 addition & 1 deletion docs/source/basic_utils.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Basic Google Cloud Platform Utilities
================
=====================================

Flow Forecast natively integrates with Google Cloud Platform.

Expand Down
2 changes: 1 addition & 1 deletion docs/source/crossformer.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Crossformer
=========================
.. automodule:: flood_forecast.transformer_xl.crossformer
.. automodule:: flood_forecast.transformer_xl.cross_former
:members:
2 changes: 1 addition & 1 deletion docs/source/custom_opt.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Custom Optimizers and more
====================
==========================

.. automodule:: flood_forecast.custom.custom_opt
:members:
2 changes: 1 addition & 1 deletion docs/source/explain_model_output.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Explain Model Output
=================
====================

.. automodule:: flood_forecast.explain_model_output
:members:
70 changes: 40 additions & 30 deletions flood_forecast/preprocessing/pytorch_loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import numpy as np
import pandas as pd
import torch
from typing import Dict, Tuple, Union, List
from typing import Dict, Tuple, Union, Optional, List
from flood_forecast.pre_dict import interpolate_dict
from flood_forecast.preprocessing.buil_dataset import get_data
from datetime import datetime
Expand All @@ -21,15 +21,17 @@ def __init__(
scaling=None,
start_stamp: int = 0,
end_stamp: int = None,
gcp_service_key: Optional[str] = None,
interpolate_param: bool = False,
sort_column=None,
scaled_cols=None,
feature_params=None,
no_scale=False,
preformatted_df=False

):
"""A data loader that takes a CSV file and properly batches for use in training/eval a PyTorch model.
"""
A data loader that takes a CSV file and properly batches for use in training/eval a PyTorch model
:param file_path: The path to the CSV file you wish to use (GCS compatible) or a Pandas dataframe.
:param forecast_history: This is the length of the historical time series data you wish to
utilize for forecasting
Expand All @@ -40,12 +42,10 @@ def __init__(
:param scaling: (highly reccomended) If provided should be a subclass of sklearn.base.BaseEstimator
and sklearn.base.TransformerMixin) i.e StandardScaler, MaxAbsScaler, MinMaxScaler, etc) Note without
a scaler the loss is likely to explode and cause infinite loss which will corrupt weights
:param start_stamp: Optional if you want to only use part of a CSV for training, validation
:param start_stamp int: Optional if you want to only use part of a CSV for training, validation
or testing supply these
:type start_stamp: int, optional
:param end_stamp: Optional if you want to only use part of a CSV for training, validation,
or testing supply these
:type end_stamp: int, optional
:param end_stamp int: Optional if you want to only use part of a CSV for training, validation,
or testing supply these
:param sort_column str: The column to sort the time series on prior to forecast.
:param scaled_cols: The columns you want scaling applied to (if left blank will default to all columns)
:param feature_params: These are the datetime features you want to create.
Expand Down Expand Up @@ -122,13 +122,13 @@ def __len__(self) -> int:
len(self.df.index) - self.forecast_history - self.forecast_length - 1
)

def __sample_and_track_series__(self, idx: int, series_id=None):
def __sample_and_track_series__(self, idx, series_id=None):
pass

def inverse_scale(
self, result_data: Union[torch.Tensor, pd.Series, np.ndarray]
) -> torch.Tensor:
"""Un-does the scaling of the data.
"""Un-does the scaling of the data
:param result_data: The data you want to unscale can handle multiple data types.
:type result_data: Union[torch.Tensor, pd.Series, np.ndarray]
Expand Down Expand Up @@ -161,16 +161,16 @@ def inverse_scale(


class CSVSeriesIDLoader(CSVDataLoader):
def __init__(self, series_id_col: str, main_params: dict, return_method: str, return_all: bool = True):
def __init__(self, series_id_col: str, main_params: dict, return_method: str, return_all=True):
"""A data-loader for a CSV file that contains a series ID column.
:param series_id_col: The id column of the series you want to forecast.
:param series_id_col: The id
:type series_id_col: str
:param main_params: The central set of parameters
:type main_params: dict
:param return_method: The method of return (e.g. all series at once, one at a time, or a random sample)
:param return_method: The method of return
:type return_method: str
:param return_all: Whether to return all items if set to True then __validate_data_in_df__, defaults to True
:param return_all: Whether to return all items, defaults to True
:type return_all: bool, optional
"""
main_params1 = deepcopy(main_params)
Expand Down Expand Up @@ -203,7 +203,8 @@ def __init__(self, series_id_col: str, main_params: dict, return_method: str, re
print("unique dict")

def __validate_data__in_df(self):
"""Makes sure the data in the data-frame is the proper length for each series."""
"""Makes sure the data in the data-frame is the proper length for each series e
"""
if self.return_all_series:
len_first = len(self.listed_vals[0])
print("Length of first series is:" + str(len_first))
Expand All @@ -230,6 +231,7 @@ def __getitem__(self, idx: int) -> Tuple[Dict, Dict]:
targ_list = {}
for va in self.listed_vals:
# We need to exclude the index column on one end and the series id column on the other

targ_start_idx = idx + self.forecast_history
idx2 = va[self.series_id_col].iloc[0]
va_returned = va[va.columns.difference([self.series_id_col], sort=False)]
Expand All @@ -239,7 +241,8 @@ def __getitem__(self, idx: int) -> Tuple[Dict, Dict]:
targ_list[self.unique_dict[idx2]] = targ
return src_list, targ_list
else:
raise NotImplementedError("Current code only supports returning all the series at once at each iteration")
raise NotImplementedError
return super().__getitem__(idx)

def __sample_series_id__(idx, series_id):
pass
Expand All @@ -264,12 +267,8 @@ def __init__(
**kwargs
):
"""
A data loader for the test data and plotting code it is a subclass of CSVDataLoader.
:param str df_path: The path to the CSV file you want to use (GCS compatible) or a Pandas DataFrame.
:type df_path: str
:param int forecast_total: The total length of the forecast.
:
:type forecast_total: int
:param str df_path: The path to the CSV file you want to use (GCS compatible) or a Pandas DataFrame
A data loader for the test data.
"""
if "file_path" not in kwargs:
kwargs["file_path"] = df_path
Expand All @@ -284,8 +283,8 @@ def __init__(
print(df_path)
self.forecast_total = forecast_total
# TODO these are antiquated delete them
self.use_real_precip = use_real_precip
self.use_real_temp = use_real_temp
self.use_real_precip = use_real_precip
self.target_supplied = target_supplied
# Convert back to datetime and save index
sort_col1 = sort_column_clone if sort_column_clone else "datetime"
Expand All @@ -310,7 +309,7 @@ def __getitem__(self, idx):
historical_rows = self.df.iloc[idx: self.forecast_history + idx]
target_idx_start = self.forecast_history + idx
# Why aren't we using these
# targ_rows = self.df.ilo c[
# targ_rows = self.df.iloc[
# target_idx_start : self.forecast_total + target_idx_start
# ]
all_rows_orig = self.original_df.iloc[
Expand All @@ -320,7 +319,10 @@ def __getitem__(self, idx):
return historical_rows.float(), all_rows_orig, target_idx_start

def convert_real_batches(self, the_col: str, rows_to_convert):
"""A helper function to return properly divided precip and temp values to be stacked with t forecasted cfs."""
"""
A helper function to return properly divided precip and temp
values to be stacked with t forecasted cfs.
"""
the_column = torch.from_numpy(rows_to_convert[the_col].to_numpy())
chunks = [
the_column[
Expand All @@ -333,7 +335,8 @@ def convert_real_batches(self, the_col: str, rows_to_convert):
def convert_history_batches(
self, the_col: Union[str, List[str]], rows_to_convert: pd.DataFrame
):
"""A helper function to return dataframe in batches of size (history_len, num_features)
"""A helper function to return dataframe in batches of
size (history_len, num_features)
Args:
the_col (str): column names
Expand All @@ -355,6 +358,10 @@ def __len__(self) -> int:
)


class TestLoaderABC(CSVTestLoader):
pass


class AEDataloader(CSVDataLoader):
def __init__(
self,
Expand All @@ -369,8 +376,9 @@ def __init__(
forecast_history=1,
no_scale=True,
sort_column=None):
"""A data loader class for autoencoders. Overrides __len__ and __getitem__ from generic dataloader. Also defaults
forecast_history and forecast_length to 1. Since AE will likely only use one row. Same parameters as before.
"""A data loader class for autoencoders. Overrides __len__ and __getitem__ from generic dataloader.
Also defaults forecast_history and forecast_length to 1. Since AE will likely only use one row.
Same parameters as before.
:param file_path: The path to the file
:type file_path: str
Expand Down Expand Up @@ -589,14 +597,15 @@ def __getitem__(self, idx):
class VariableSequenceLength(CSVDataLoader):
def __init__(self, series_marker_column: str, csv_loader_params: Dict, pad_length=None, task="classification",
n_classes=9 + 90):
"""Enables eas(ier) loading of time-series with variable length data.
"""Enables eas(ier) loading of time-series with variable length data
:param series_marker_column: The column that dealinates when an example begins and ends
:type series_marker_column: str
:param pad_length: If the specified the length to truncate sequences at or pad them till that length
:type pad_length: int
:param task: The specific task (e.g. classification, forecasting, auto_encode)
:type task: str
"""
super().__init__(**csv_loader_params)
self.pad_length = pad_length
Expand Down Expand Up @@ -636,7 +645,8 @@ def get_item_auto_encoder(self, idx):
return the_seq.float(), the_seq.float()

def pad_input_data(self, sequence: int):
"""Pads a sequence to a specified length."""
"""Pads a sequence to a specified length.
"""
if self.pad_length > sequence.shape[0]:
pad_dim = self.pad_length - sequence.shape[0]
return torch.nn.functional.pad(sequence, (0, 0, 0, pad_dim))
Expand Down

0 comments on commit df94e25

Please sign in to comment.