diff --git a/hierarchicalforecast/core.py b/hierarchicalforecast/core.py index 13667954..2c1c8fda 100644 --- a/hierarchicalforecast/core.py +++ b/hierarchicalforecast/core.py @@ -23,9 +23,9 @@ def _build_fn_name(fn) -> str: func_params = fn.__dict__ # Take default parameter out of names - args_to_remove = ['insample'] + args_to_remove = ['insample', 'num_threads'] if not func_params.get('nonnegative', False): - args_to_remove += ['nonnegative'] + args_to_remove.append('nonnegative') if fn_name == 'MinTrace' and \ func_params['method']=='mint_shrink': diff --git a/hierarchicalforecast/methods.py b/hierarchicalforecast/methods.py index 82b801c5..27cdcc8c 100644 --- a/hierarchicalforecast/methods.py +++ b/hierarchicalforecast/methods.py @@ -6,6 +6,7 @@ # %% ../nbs/methods.ipynb 3 import warnings from collections import OrderedDict +from concurrent.futures import ThreadPoolExecutor from copy import deepcopy from typing import Callable, Dict, List, Optional, Union @@ -136,7 +137,7 @@ class BottomUp(HReconciler): **References:**
- [Orcutt, G.H., Watts, H.W., & Edwards, J.B.(1968). \"Data aggregation and information loss\". The American - Economic Review, 58 , 773{787)](http://www.jstor.org/stable/1815532). + Economic Review, 58 , 773(787)](http://www.jstor.org/stable/1815532). """ insample = False @@ -565,13 +566,16 @@ class MinTrace(HReconciler): minimizes the squared errors for the coherent forecasts under an unbiasedness assumption; the solution has a closed form.
- $$\mathbf{P}_{\\text{MinT}}=\\left(\mathbf{S}^{\intercal}\mathbf{W}_{h}\mathbf{S}\\right)^{-1} - \mathbf{S}^{\intercal}\mathbf{W}^{-1}_{h}$$ + $$ + \mathbf{P}_{\\text{MinT}}=\\left(\mathbf{S}^{\intercal}\mathbf{W}_{h}\mathbf{S}\\right)^{-1} + \mathbf{S}^{\intercal}\mathbf{W}^{-1}_{h} + $$ **Parameters:**
`method`: str, one of `ols`, `wls_struct`, `wls_var`, `mint_shrink`, `mint_cov`.
`nonnegative`: bool, reconciled forecasts should be nonnegative?
`mint_shr_ridge`: float=2e-8, ridge numeric protection to MinTrace-shr covariance estimator.
+ `num_threads`: int=1, number of threads to use for solving the optimization problems. **References:**
- [Wickramasuriya, S. L., Athanasopoulos, G., & Hyndman, R. J. (2019). \"Optimal forecast reconciliation for @@ -584,12 +588,14 @@ class MinTrace(HReconciler): def __init__(self, method: str, nonnegative: bool = False, - mint_shr_ridge: Optional[float] = 2e-8): + mint_shr_ridge: Optional[float] = 2e-8, + num_threads: int = 1): self.method = method self.nonnegative = nonnegative self.insample = method in ['wls_var', 'mint_cov', 'mint_shrink'] if method == 'mint_shrink': self.mint_shr_ridge = mint_shr_ridge + self.num_threads = num_threads def _get_PW_matrices(self, S: np.ndarray, @@ -708,9 +714,11 @@ def fit(self, if self.nonnegative: _, n_bottom = S.shape W_inv = np.linalg.pinv(self.W) - warnings.warn('Replacing negative forecasts with zero.') - y_hat = np.copy(y_hat) - y_hat[y_hat < 0] = 0. + negatives = y_hat < 0 + if negatives.any(): + warnings.warn('Replacing negative forecasts with zero.') + y_hat = np.copy(y_hat) + y_hat[negatives] = 0. # Quadratic progamming formulation # here we are solving the quadratic programming problem # formulated in the origial paper @@ -724,8 +732,16 @@ def fit(self, b = np.zeros(n_bottom) # the quadratic programming problem # returns the forecasts of the bottom series - bottom_fcts = np.apply_along_axis(lambda y_hat: solve_qp(G=G, a=a @ y_hat, C=C, b=b)[0], - axis=0, arr=y_hat) + if self.num_threads == 1: + bottom_fcts = np.apply_along_axis(lambda y_hat: solve_qp(G=G, a=a @ y_hat, C=C, b=b)[0], + axis=0, arr=y_hat) + else: + futures = [] + with ThreadPoolExecutor(self.num_threads) as executor: + for j in range(y_hat.shape[1]): + future = executor.submit(solve_qp, G=G, a=a @ y_hat[:, j], C=C, b=b) + futures.append(future) + bottom_fcts = np.hstack([f.result()[0][:, None] for f in futures]) if not np.all(bottom_fcts > -1e-8): raise Exception('nonnegative optimization failed') # remove negative values close to zero @@ -933,13 +949,12 @@ class OptimalCombination(MinTrace): """ def __init__(self, method: str, - nonnegative: bool = False): + nonnegative: bool = False, + num_threads: int = 1): comb_methods = ['ols', 'wls_struct'] if method not in comb_methods: raise ValueError(f"Optimal Combination class does not support method: \"{method}\"") - - self.method = method - self.nonnegative = nonnegative + super().__init__(method=method, nonnegative=nonnegative, num_threads=num_threads) self.insample = False # %% ../nbs/methods.ipynb 58 @@ -996,7 +1011,7 @@ class ERM(HReconciler): **References:**
- [Ben Taieb, S., & Koo, B. (2019). Regularized regression for hierarchical forecasting without unbiasedness conditions. In Proceedings of the 25th ACM SIGKDD International Conference on Knowledge - Discovery & Data Mining KDD '19 (p. 1337{1347). New York, NY, USA: Association for Computing Machinery.](https://doi.org/10.1145/3292500.3330976).
+ Discovery & Data Mining KDD '19 (p. 1337-1347). New York, NY, USA: Association for Computing Machinery.](https://doi.org/10.1145/3292500.3330976).
""" def __init__(self, method: str, diff --git a/hierarchicalforecast/probabilistic_methods.py b/hierarchicalforecast/probabilistic_methods.py index a7f8451f..8cf0b703 100644 --- a/hierarchicalforecast/probabilistic_methods.py +++ b/hierarchicalforecast/probabilistic_methods.py @@ -378,7 +378,10 @@ def get_samples(self, num_samples: int=None): # Initialize PERMBU utility rec_samples = base_samples.copy() - encoder = OneHotEncoder(sparse=False, dtype=np.float32) + try: + encoder = OneHotEncoder(sparse_output=False, dtype=np.float32) + except TypeError: + encoder = OneHotEncoder(sparse=False, dtype=np.float32) hier_links = np.vstack(self._nonzero_indexes_by_row(self.S.T)) # BottomUp hierarchy traversing diff --git a/nbs/core.ipynb b/nbs/core.ipynb index 03b018b3..cee170ac 100644 --- a/nbs/core.ipynb +++ b/nbs/core.ipynb @@ -69,9 +69,9 @@ " func_params = fn.__dict__\n", "\n", " # Take default parameter out of names\n", - " args_to_remove = ['insample']\n", + " args_to_remove = ['insample', 'num_threads']\n", " if not func_params.get('nonnegative', False):\n", - " args_to_remove += ['nonnegative']\n", + " args_to_remove.append('nonnegative')\n", "\n", " if fn_name == 'MinTrace' and \\\n", " func_params['method']=='mint_shrink':\n", @@ -921,8 +921,7 @@ " ]\n", ")\n", "\n", - "fcst.fit(df=train_df)\n", - "fcst_df = fcst.forecast(h=12, fitted=True)\n", + "fcst_df = fcst.forecast(df=train_df, h=12, fitted=True)\n", "fitted_df = fcst.forecast_fitted_values()\n", "\n", "fcst_df = hrec.reconcile(\n", @@ -1156,11 +1155,8 @@ "\n", "# Compute base auto-ETS predictions\n", "# Careful identifying correct data freq, this data quarterly 'Q'\n", - "fcst = StatsForecast(df=Y_train_df,\n", - " #models=[ETS(season_length=12), Naive()],\n", - " models=[Naive()],\n", - " freq='Q', n_jobs=-1)\n", - "Y_hat_df = fcst.forecast(h=4, fitted=True)\n", + "fcst = StatsForecast(models=[Naive()], freq='Q', n_jobs=-1)\n", + "Y_hat_df = fcst.forecast(df=Y_train_df, h=4, fitted=True)\n", "Y_fitted_df = fcst.forecast_fitted_values()\n", "\n", "# Reconcile the base predictions\n", @@ -1172,7 +1168,7 @@ "Y_rec_df = hrec.reconcile(Y_hat_df=Y_hat_df, \n", " Y_df=Y_fitted_df,\n", " S=S_df, tags=tags)\n", - "Y_rec_df.groupby('unique_id').head(2)" + "Y_rec_df.groupby('unique_id', observed=True).head(2)" ] } ], diff --git a/nbs/methods.ipynb b/nbs/methods.ipynb index 65d095ee..e728e9a0 100644 --- a/nbs/methods.ipynb +++ b/nbs/methods.ipynb @@ -39,6 +39,7 @@ "#| export\n", "import warnings\n", "from collections import OrderedDict\n", + "from concurrent.futures import ThreadPoolExecutor\n", "from copy import deepcopy\n", "from typing import Callable, Dict, List, Optional, Union\n", "\n", @@ -1028,6 +1029,7 @@ " `method`: str, one of `ols`, `wls_struct`, `wls_var`, `mint_shrink`, `mint_cov`.
\n", " `nonnegative`: bool, reconciled forecasts should be nonnegative?
\n", " `mint_shr_ridge`: float=2e-8, ridge numeric protection to MinTrace-shr covariance estimator.
\n", + " `num_threads`: int=1, number of threads to use for solving the optimization problems.\n", "\n", " **References:**
\n", " - [Wickramasuriya, S. L., Athanasopoulos, G., & Hyndman, R. J. (2019). \\\"Optimal forecast reconciliation for\n", @@ -1040,12 +1042,14 @@ " def __init__(self, \n", " method: str,\n", " nonnegative: bool = False,\n", - " mint_shr_ridge: Optional[float] = 2e-8):\n", + " mint_shr_ridge: Optional[float] = 2e-8,\n", + " num_threads: int = 1):\n", " self.method = method\n", " self.nonnegative = nonnegative\n", " self.insample = method in ['wls_var', 'mint_cov', 'mint_shrink']\n", " if method == 'mint_shrink':\n", " self.mint_shr_ridge = mint_shr_ridge\n", + " self.num_threads = num_threads\n", "\n", " def _get_PW_matrices(self, \n", " S: np.ndarray,\n", @@ -1164,9 +1168,11 @@ " if self.nonnegative:\n", " _, n_bottom = S.shape\n", " W_inv = np.linalg.pinv(self.W)\n", - " warnings.warn('Replacing negative forecasts with zero.')\n", - " y_hat = np.copy(y_hat)\n", - " y_hat[y_hat < 0] = 0.\n", + " negatives = y_hat < 0\n", + " if negatives.any():\n", + " warnings.warn('Replacing negative forecasts with zero.')\n", + " y_hat = np.copy(y_hat)\n", + " y_hat[negatives] = 0.\n", " # Quadratic progamming formulation\n", " # here we are solving the quadratic programming problem\n", " # formulated in the origial paper\n", @@ -1180,8 +1186,16 @@ " b = np.zeros(n_bottom)\n", " # the quadratic programming problem\n", " # returns the forecasts of the bottom series\n", - " bottom_fcts = np.apply_along_axis(lambda y_hat: solve_qp(G=G, a=a @ y_hat, C=C, b=b)[0], \n", - " axis=0, arr=y_hat)\n", + " if self.num_threads == 1:\n", + " bottom_fcts = np.apply_along_axis(lambda y_hat: solve_qp(G=G, a=a @ y_hat, C=C, b=b)[0], \n", + " axis=0, arr=y_hat)\n", + " else:\n", + " futures = []\n", + " with ThreadPoolExecutor(self.num_threads) as executor:\n", + " for j in range(y_hat.shape[1]):\n", + " future = executor.submit(solve_qp, G=G, a=a @ y_hat[:, j], C=C, b=b)\n", + " futures.append(future)\n", + " bottom_fcts = np.hstack([f.result()[0][:, None] for f in futures])\n", " if not np.all(bottom_fcts > -1e-8):\n", " raise Exception('nonnegative optimization failed')\n", " # remove negative values close to zero\n", @@ -1451,6 +1465,9 @@ " )['mean'],\n", " S @ y_hat_bottom\n", " )\n", + "mintrace_1thr = MinTrace(method='ols', nonnegative=False, num_threads=1).fit(S=S, y_hat=S @ y_hat_bottom)\n", + "mintrace_2thr = MinTrace(method='ols', nonnegative=False, num_threads=2).fit(S=S, y_hat=S @ y_hat_bottom)\n", + "np.testing.assert_allclose(mintrace_1thr.y_hat, mintrace_2thr.y_hat)\n", "with ExceptionExpected(regex='min_trace (mint_cov)*'):\n", " for nonnegative in [False, True]:\n", " cls_min_trace = MinTrace(method='mint_cov', nonnegative=nonnegative)\n", @@ -1548,13 +1565,12 @@ " \"\"\"\n", " def __init__(self,\n", " method: str,\n", - " nonnegative: bool = False):\n", + " nonnegative: bool = False,\n", + " num_threads: int = 1):\n", " comb_methods = ['ols', 'wls_struct']\n", " if method not in comb_methods:\n", " raise ValueError(f\"Optimal Combination class does not support method: \\\"{method}\\\"\")\n", - "\n", - " self.method = method\n", - " self.nonnegative = nonnegative\n", + " super().__init__(method=method, nonnegative=nonnegative, num_threads=num_threads)\n", " self.insample = False" ] }, diff --git a/nbs/probabilistic_methods.ipynb b/nbs/probabilistic_methods.ipynb index 634871e6..fc50cfb1 100644 --- a/nbs/probabilistic_methods.ipynb +++ b/nbs/probabilistic_methods.ipynb @@ -506,7 +506,10 @@ "\n", " # Initialize PERMBU utility\n", " rec_samples = base_samples.copy()\n", - " encoder = OneHotEncoder(sparse=False, dtype=np.float32)\n", + " try:\n", + " encoder = OneHotEncoder(sparse_output=False, dtype=np.float32)\n", + " except TypeError:\n", + " encoder = OneHotEncoder(sparse=False, dtype=np.float32)\n", " hier_links = np.vstack(self._nonzero_indexes_by_row(self.S.T))\n", "\n", " # BottomUp hierarchy traversing\n",