Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix aggregate function #232

Merged
merged 6 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ defaults:
run:
shell: bash -l {0}

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
nb-sync:
runs-on: ubuntu-latest
Expand Down
6 changes: 2 additions & 4 deletions hierarchicalforecast/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.HierarchicalPlot.plot_summing_matrix': ( 'utils.html#hierarchicalplot.plot_summing_matrix',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils._to_summing_dataframe': ( 'utils.html#_to_summing_dataframe',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils._to_summing_matrix': ( 'utils.html#_to_summing_matrix',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils._to_upper_hierarchy': ( 'utils.html#_to_upper_hierarchy',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.aggregate': ( 'utils.html#aggregate',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.aggregate_before': ( 'utils.html#aggregate_before',
Expand All @@ -191,8 +191,6 @@
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.level_to_outputs': ( 'utils.html#level_to_outputs',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.numpy_balance': ( 'utils.html#numpy_balance',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.quantiles_to_outputs': ( 'utils.html#quantiles_to_outputs',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.samples_to_quantiles_df': ( 'utils.html#samples_to_quantiles_df',
Expand Down
216 changes: 75 additions & 141 deletions hierarchicalforecast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# %% ../nbs/utils.ipynb 3
import sys
import timeit
import warnings
from itertools import chain
from typing import Callable, Dict, List, Optional, Iterable

Expand Down Expand Up @@ -100,7 +101,7 @@ def aggregate_before(df: pd.DataFrame,
sparse_s: bool = False):
"""Utils Aggregation Function.

Aggregates bottom level series contained in the pd.DataFrame `df` according
Aggregates bottom level series contained in the pd.DataFrame `df` according
to levels defined in the `spec` list applying the `agg_fn` (sum, mean).<br>

**Parameters:**<br>
Expand Down Expand Up @@ -139,88 +140,15 @@ def aggregate_before(df: pd.DataFrame,
return Y_df, S, tags

# %% ../nbs/utils.ipynb 11
def numpy_balance(*arrs):
"""
Fast NumPy implementation of balance function.
The function creates all the interactions between
the NumPy arrays provided.
**Parameters:**<br>
`arrs`: NumPy arrays.<br>
**Returns:**<br>
`out`: NumPy array.<br>
"""
N = len(arrs)
out = np.transpose(np.meshgrid(*arrs, indexing='ij'),
np.roll(np.arange(N + 1), -1)).reshape(-1, N)
return out

def _to_summing_dataframe(
df: pd.DataFrame, spec: List[List[str]], sparse_s: bool = False
):
#------------------------------- Wrangling -----------------------------#
# Keep unique levels, preserving first aparison order
all_levels = list(chain.from_iterable(spec))
all_levels = [*dict.fromkeys(all_levels)]

# Create hierarchical labels
S_df = df[all_levels].copy()
S_df = S_df.drop_duplicates()

max_len_idx = np.argmax([len(hier) for hier in spec])
bottom_comb = spec[max_len_idx]
hiers_cols = []
df = df.copy()
for hier in spec:
if hier == bottom_comb:
hier_col = 'unique_id'
bottom_col = '/'.join(hier)
df['unique_id'] = df[hier].agg('/'.join, axis=1)
else:
hier_col = '/'.join(hier)
S_df[hier_col] = S_df[hier].agg('/'.join, axis=1)
hiers_cols.append(hier_col)
S_df = S_df.sort_values(by=bottom_comb)
S_df = S_df[hiers_cols]

#------------------------------- Encoding ------------------------------#
# One hot encode only aggregate levels
bottom_ids = list(S_df.unique_id)
del S_df['unique_id']
categories = [S_df[col].unique() for col in S_df.columns]
tags = dict(zip(S_df.columns, categories))
tags[bottom_col] = bottom_ids

try:
encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)
except TypeError: # sklearn < 1.2
encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)

S = encoder.fit_transform(S_df).T

if sparse_s:
S = sparse.vstack(
[
sparse.csr_matrix(S),
sparse.identity(len(bottom_ids), dtype=np.float32, format='csr'),
]
)
S_df = pd.DataFrame.sparse.from_spmatrix(
S, columns=bottom_ids, index=list(chain(*categories)) + bottom_ids
)
else:
S = np.concatenate([S, np.eye(len(bottom_ids), dtype=np.float32)], axis=0)
S_df = pd.DataFrame(
S, columns=bottom_ids, index=list(chain(*categories)) + bottom_ids
)

# Match index ordering of S_df and collapse df to Y_bottom_df
Y_bottom_df = df.copy()
Y_bottom_df = Y_bottom_df.groupby(['unique_id', 'ds'])['y'].sum().reset_index()
Y_bottom_df.unique_id = Y_bottom_df.unique_id.astype('category')
Y_bottom_df.unique_id = Y_bottom_df.unique_id.cat.set_categories(S_df.columns)
return Y_bottom_df, S_df, tags
def _to_upper_hierarchy(bottom_split, bottom_values, upper_key):
upper_split = upper_key.split('/')
upper_idxs = [bottom_split.index(i) for i in upper_split]

def join_upper(bottom_value):
bottom_parts = bottom_value.split('/')
return '/'.join(bottom_parts[i] for i in upper_idxs)

return [join_upper(val) for val in bottom_values]

# %% ../nbs/utils.ipynb 12
def aggregate(
Expand All @@ -229,71 +157,77 @@ def aggregate(
is_balanced: bool = False,
sparse_s: bool = False,
):
""" Utils Aggregation Function.
Aggregates bottom level series contained in the pd.DataFrame `df` according
to levels defined in the `spec` list applying the `agg_fn` (sum, mean).

**Parameters:**<br>
`df`: pd.DataFrame with columns `['ds', 'y']` and columns to aggregate.<br>
`spec`: List of levels. Each element of the list contains a list of columns of `df` to aggregate.<br>
`is_balanced`: bool=False, whether `Y_bottom_df` is balanced, if not we balance.<br>
`sparse_s`: bool=False, whether the returned S_df should be a sparse DataFrame.<br>
**Returns:**<br>
`Y_df, S_df, tags`: tuple with hierarchically structured series `Y_df` ($\mathbf{y}_{[a,b]}$),
summing dataframe `S_df`, and hierarchical aggregation indexes `tags`.
"""Utils Aggregation Function.
Aggregates bottom level series contained in the pandas DataFrame `df` according
to levels defined in the `spec` list.

Parameters
----------
df : pandas DataFrame
Dataframe with columns `['ds', 'y']` and columns to aggregate.
spec : list of list of str
List of levels. Each element of the list should contain a list of columns of `df` to aggregate.
is_balanced : bool (default=False)
Deprecated.
sparse_s : bool (default=False)
Return `S_df` as a sparse dataframe.

Returns
-------
Y_df : pandas DataFrame
Hierarchically structured series.
S_df : pandas DataFrame
Summing dataframe.
tags : dict
Aggregation indices.
"""

#Ensure no null values
# Checks
if df.isnull().values.any():
raise Exception('`df` contains null values')
raise ValueError('`df` contains null values')
if is_balanced:
warnings.warn(
"`is_balanced` is deprecated and will be removed in a future version. "
"Don't set this argument to suppress this warning.",
category=DeprecationWarning,
)

#-------------------------------- Wrangling --------------------------------#
# constraints S_df and collapsed Y_bottom_df with 'unique_id'
Y_bottom_df, S_df, tags = _to_summing_dataframe(df=df, spec=spec, sparse_s=sparse_s)

# Create balanced/sorted dataset for numpy aggregation (nan=0)
# TODO: investigate potential memory speed tradeoff
if not is_balanced:
dates = Y_bottom_df['ds'].unique()
balanced_prod = numpy_balance(S_df.columns, dates)
balanced_df = pd.DataFrame(balanced_prod, columns=['unique_id', 'ds'])
balanced_df['ds'] = balanced_df['ds'].astype(Y_bottom_df['ds'].dtype)

Y_bottom_df.set_index(['unique_id', 'ds'], inplace=True)
balanced_df.set_index(['unique_id', 'ds'], inplace=True)
balanced_df = balanced_df.merge(Y_bottom_df[['y']],
how='left', left_on=['unique_id', 'ds'],
right_index=True).reset_index()
Y_bottom_df.reset_index(inplace=True)
else:
dates = Y_bottom_df['ds'].unique()
balanced_df = Y_bottom_df.copy()

#------------------------------- Aggregation -------------------------------#
n_agg = S_df.shape[0] - S_df.shape[1]
# compute aggregations and tags
spec = sorted(spec, key=len)
bottom = spec[-1]
aggs = []
tags = {}
for levels in spec:
agg = df.groupby(levels + ['ds'])['y'].sum().reset_index('ds')
group = agg.index.get_level_values(0)
for level in levels[1:]:
group = group + '/' + agg.index.get_level_values(level).str.replace('/', '_')
agg.index = group
agg.index.name = 'unique_id'
tags['/'.join(levels)] = group.unique().values
aggs.append(agg)
Y_df = pd.concat(aggs)

# construct S
bottom_key = '/'.join(bottom)
bottom_levels = tags[bottom_key]
S = np.empty((len(bottom_levels), len(spec)), dtype=object)
for j, levels in enumerate(spec[:-1]):
S[:, j] = _to_upper_hierarchy(bottom, bottom_levels, '/'.join(levels))
S[:, -1] = tags[bottom_key]
categories = list(tags.values())
try:
encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)
except TypeError: # sklearn < 1.2
encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)
S = encoder.fit_transform(S).T
if sparse_s:
Agg = S_df.sparse.to_coo().tocsr()[:n_agg, :]
df_constructor = pd.DataFrame.sparse.from_spmatrix
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
else:
Agg = S_df.values[:n_agg, :]

y_bottom = balanced_df.y.values
y_bottom = y_bottom.reshape(len(S_df.columns), len(dates))
y_bottom_mask = np.isnan(y_bottom)
y_agg = Agg @ np.nan_to_num(y_bottom)
y_agg_mask = Agg @ y_bottom_mask

# Create long format hierarchical dataframe
y_agg = y_agg.flatten()
y_agg[y_agg_mask.flatten() > 1] = np.nan
y_bottom = y_bottom.flatten()
Y_df = pd.DataFrame(dict(
unique_id = np.repeat(S_df.index, len(dates)),
ds = np.tile(dates, len(S_df.index)),
y = np.concatenate([y_agg, y_bottom], axis=0)))
Y_df = Y_df.set_index('unique_id').dropna()
df_constructor = pd.DataFrame
S_df = df_constructor(S, index=np.hstack(categories), columns=bottom_levels)
return Y_df, S_df, tags

# %% ../nbs/utils.ipynb 20
# %% ../nbs/utils.ipynb 21
class HierarchicalPlot:
""" Hierarchical Plot

Expand Down Expand Up @@ -487,7 +421,7 @@ def plot_hierarchical_predictions_gap(self,
plt.grid()
plt.show()

# %% ../nbs/utils.ipynb 35
# %% ../nbs/utils.ipynb 36
# convert levels to output quantile names
def level_to_outputs(level:Iterable[int]):
""" Converts list of levels into output names matching StatsForecast and NeuralForecast methods.
Expand Down Expand Up @@ -531,7 +465,7 @@ def quantiles_to_outputs(quantiles:Iterable[float]):
output_names.append('-median')
return quantiles, output_names

# %% ../nbs/utils.ipynb 36
# %% ../nbs/utils.ipynb 37
# given input array of sample forecasts and inptut quantiles/levels,
# output a Pandas Dataframe with columns of quantile predictions
def samples_to_quantiles_df(samples:np.ndarray,
Expand Down
Loading