diff --git a/darts/models/__init__.py b/darts/models/__init__.py index 17640b195d..1556ebfb91 100644 --- a/darts/models/__init__.py +++ b/darts/models/__init__.py @@ -28,6 +28,9 @@ from darts.models.forecasting.fft import FFT from darts.models.forecasting.kalman_forecaster import KalmanForecaster from darts.models.forecasting.linear_regression_model import LinearRegressionModel +from darts.models.forecasting.multivariate_forecasting_model_wrapper import ( + MultivariateForecastingModelWrapper, +) from darts.models.forecasting.random_forest import RandomForest from darts.models.forecasting.regression_ensemble_model import RegressionEnsembleModel from darts.models.forecasting.regression_model import RegressionModel @@ -143,6 +146,7 @@ "GlobalNaiveDrift", "GlobalNaiveDrift", "GlobalNaiveSeasonal", + "MultivariateForecastingModelWrapper", "NBEATSModel", "NHiTSModel", "NLinearModel", diff --git a/darts/models/forecasting/multivariate_forecasting_model_wrapper.py b/darts/models/forecasting/multivariate_forecasting_model_wrapper.py new file mode 100644 index 0000000000..fa996298cd --- /dev/null +++ b/darts/models/forecasting/multivariate_forecasting_model_wrapper.py @@ -0,0 +1,145 @@ +""" +Multivariate forecasting model wrapper +------------------------- + +A wrapper around local forecasting models to enable multivariate series training and forecasting. One model is trained +for each component of the target series, independently of the others hence ignoring the potential interactions between +its components. +""" + +from typing import List, Optional, Tuple + +from darts.logging import get_logger, raise_if_not +from darts.models.forecasting.forecasting_model import ( + FutureCovariatesLocalForecastingModel, + LocalForecastingModel, + TransferableFutureCovariatesLocalForecastingModel, +) +from darts.timeseries import TimeSeries, concatenate +from darts.utils.ts_utils import seq2series + +logger = get_logger(__name__) + + +class MultivariateForecastingModelWrapper(FutureCovariatesLocalForecastingModel): + def __init__(self, model: LocalForecastingModel): + """ + Wrapper for univariate LocalForecastingModel to enable multivariate series training and forecasting. + + A copy of the provided model will be trained independently on each component of the target series, ignoring the + potential interactions. + ---------- + model + Model used to predict individual components + """ + super().__init__() + + self.model: LocalForecastingModel = model + self._trained_models: List[LocalForecastingModel] = [] + + def _fit(self, series: TimeSeries, future_covariates: Optional[TimeSeries] = None): + super()._fit(series, future_covariates) + self._trained_models = [] + + series = seq2series(series) + for comp in series.components: + comp = series.univariate_component(comp) + component_model = ( + self.model.untrained_model().fit( + series=comp, future_covariates=future_covariates + ) + if self.supports_future_covariates + else self.model.untrained_model().fit(series=comp) + ) + self._trained_models.append(component_model) + + return self + + def predict( + self, + n: int, + series: Optional[TimeSeries] = None, + future_covariates: Optional[TimeSeries] = None, + num_samples: int = 1, + **kwargs, + ) -> TimeSeries: + return self._predict(n, future_covariates, num_samples, **kwargs) + + def _predict( + self, + n: int, + future_covariates: Optional[TimeSeries] = None, + num_samples: int = 1, + verbose: bool = False, + **kwargs, + ) -> TimeSeries: + predictions = [ + model.predict(n=n, future_covariates=future_covariates) + if self.supports_future_covariates + else model.predict(n=n) + for model in self._trained_models + ] + + raise_if_not( + len(predictions) == len(self._trained_models), + f"Prediction contains {len(predictions)} components but {len(self._trained_models)} models were fitted", + ) + + return concatenate(predictions, axis=1) + + @property + def extreme_lags( + self, + ) -> Tuple[ + Optional[int], + Optional[int], + Optional[int], + Optional[int], + Optional[int], + Optional[int], + ]: + return self.model.extreme_lags + + @property + def _model_encoder_settings( + self, + ) -> Tuple[ + Optional[int], + Optional[int], + bool, + bool, + Optional[List[int]], + Optional[List[int]], + ]: + return None, None, False, self.supports_future_covariates, None, None + + @property + def supports_multivariate(self) -> bool: + return True + + @property + def supports_past_covariates(self) -> bool: + return self.model.supports_past_covariates + + @property + def supports_future_covariates(self) -> bool: + return self.model.supports_future_covariates + + @property + def supports_static_covariates(self) -> bool: + return self.model.supports_static_covariates + + @property + def _is_probabilistic(self) -> bool: + """ + A MultivariateForecastingModelWrapper is probabilistic if the base_model + is probabilistic + """ + return self.model._is_probabilistic + + def _supports_non_retrainable_historical_forecasts(self) -> bool: + return isinstance(self.model, TransferableFutureCovariatesLocalForecastingModel) + + @property + def _supress_generate_predict_encoding(self) -> bool: + return isinstance(self.model, TransferableFutureCovariatesLocalForecastingModel) diff --git a/darts/tests/models/forecasting/test_multivariate_forecasting_model_wrapper.py b/darts/tests/models/forecasting/test_multivariate_forecasting_model_wrapper.py new file mode 100644 index 0000000000..d9721b7abc --- /dev/null +++ b/darts/tests/models/forecasting/test_multivariate_forecasting_model_wrapper.py @@ -0,0 +1,130 @@ +import copy + +import pytest + +from darts import TimeSeries +from darts.logging import get_logger +from darts.models import ( + ARIMA, + BATS, + FFT, + TBATS, + AutoARIMA, + Croston, + ExponentialSmoothing, + FourTheta, + KalmanForecaster, + MultivariateForecastingModelWrapper, + NaiveMean, + NaiveMovingAverage, + NaiveSeasonal, + Prophet, + StatsForecastAutoCES, + StatsForecastAutoTheta, + Theta, +) +from darts.utils import timeseries_generation as tg + +logger = get_logger(__name__) + +local_models = [ + NaiveMean(), + NaiveMovingAverage(5), + NaiveSeasonal(), + ExponentialSmoothing(), + StatsForecastAutoTheta(season_length=12), + StatsForecastAutoCES(season_length=12, model="Z"), + Theta(1), + FourTheta(1), + FFT(trend="poly"), + TBATS(use_trend=True, use_arma_errors=True, use_box_cox=True), + BATS(use_trend=True, use_arma_errors=True, use_box_cox=True), +] + +future_covariates_models = [ + Prophet(), + Croston(), + AutoARIMA(), + ARIMA(12, 1, 1), + KalmanForecaster(), +] + + +class TestMultivariateForecastingModelWrapper: + RANDOM_SEED = 42 + + ts_length = 50 + n_pred = 5 + + univariate = tg.gaussian_timeseries(length=ts_length, mean=50) + multivariate = univariate.stack(tg.gaussian_timeseries(length=ts_length, mean=20)) + + future_covariates = tg.gaussian_timeseries(length=ts_length + n_pred, mean=50) + + @pytest.mark.parametrize("model", local_models) + @pytest.mark.parametrize("series", [univariate, multivariate]) + def test_fit_predict_local_models(self, model, series): + self._test_predict_with_base_model(model, series) + + @pytest.mark.parametrize("model", future_covariates_models) + @pytest.mark.parametrize("series", [univariate, multivariate]) + def test_fit_predict_local_future_covariates_models(self, model, series): + self._test_predict_with_base_model(model, series, self.future_covariates) + + @pytest.mark.parametrize("model_object", future_covariates_models) + @pytest.mark.parametrize("series", [univariate, multivariate]) + @pytest.mark.parametrize("future_covariates", [future_covariates, None]) + def test_encoders_support(self, model_object, series, future_covariates): + add_encoders = { + "position": {"future": ["relative"]}, + } + + model_params = { + k: vals for k, vals in copy.deepcopy(model_object.model_params).items() + } + model_params["add_encoders"] = add_encoders + model = model_object.__class__(**model_params) + + self._test_predict_with_base_model(model, series, future_covariates) + + def _test_predict_with_base_model( + self, model, series: TimeSeries, future_covariates=None + ): + print(type(series), isinstance(series, TimeSeries)) + preds = self.trained_model_predictions( + model, self.n_pred, series, future_covariates + ) + assert isinstance(preds, TimeSeries) + assert preds.n_components == series.n_components + + # Make sure that the compound prediction is the same as the individual predictions + individual_preds = self.trained_individual_model_predictions( + model, self.n_pred, series, future_covariates + ) + for component in range(series.n_components): + assert preds.univariate_component(component) == individual_preds[component] + + def trained_model_predictions(self, base_model, n, series, future_covariates): + model = MultivariateForecastingModelWrapper(base_model) + print(series) + model.fit(series, future_covariates=future_covariates) + return model.predict(n=n, series=series, future_covariates=future_covariates) + + def trained_individual_model_predictions( + self, base_model, n, series, future_covariates + ): + predictions = [] + for component in range(series.n_components): + single_series = series.univariate_component(component) + + model = base_model.untrained_model() + if model.supports_future_covariates: + model.fit(single_series, future_covariates=future_covariates) + predictions.append( + model.predict(n=n, future_covariates=future_covariates) + ) + else: + model.fit(single_series) + predictions.append(model.predict(n=n)) + + return predictions