diff --git a/fedot/core/operations/evaluation/operation_implementations/models/ts_implementations/naive.py b/fedot/core/operations/evaluation/operation_implementations/models/ts_implementations/naive.py index 7b6ae81e2e..f3600b094d 100644 --- a/fedot/core/operations/evaluation/operation_implementations/models/ts_implementations/naive.py +++ b/fedot/core/operations/evaluation/operation_implementations/models/ts_implementations/naive.py @@ -1,5 +1,4 @@ from copy import copy -from typing import Optional import numpy as np @@ -104,9 +103,9 @@ def predict(self, input_data: InputData) -> OutputData: """ Get desired part of time series for averaging and calculate mean value """ forecast_length = input_data.task.task_params.forecast_length - elements_to_take = self._how_many_elements_use_for_averaging(input_data.features) + window = self._window(input_data.features) # Prepare single forecast - mean_value = np.nanmean(input_data.features[-elements_to_take:]) + mean_value = np.nanmean(input_data.features[-window:]) forecast = np.array([mean_value] * forecast_length).reshape((1, -1)) output_data = self._convert_to_output(input_data, @@ -117,9 +116,13 @@ def predict(self, input_data: InputData) -> OutputData: def predict_for_fit(self, input_data: InputData) -> OutputData: input_data = copy(input_data) forecast_length = input_data.task.task_params.forecast_length - parts = split_rolling_slices(input_data) - mean_values_for_chunks = self.average_by_axis(parts) - forecast = np.repeat(mean_values_for_chunks.reshape((-1, 1)), forecast_length, axis=1) + features = input_data.features + shape = features.shape[0] + + window = self._window(features) + mean_values = np.array([np.mean(features[-window-shape+i:i+1]) for i in range(shape)]) + + forecast = np.repeat(mean_values.reshape((-1, 1)), forecast_length, axis=1) # Update target new_idx, transformed_target = ts_to_table(idx=input_data.idx, time_series=input_data.target, @@ -133,42 +136,5 @@ def predict_for_fit(self, input_data: InputData) -> OutputData: data_type=DataTypesEnum.table) return output_data - def average_by_axis(self, parts: np.array): - """ Perform averaging for each column using last part of it """ - mean_values_for_chunks = np.apply_along_axis(self._average, 1, parts) - return mean_values_for_chunks - - def _average(self, row: np.array): - row = row[np.logical_not(np.isnan(row))] - if len(row) == 1: - return row - - elements_to_take = self._how_many_elements_use_for_averaging(row) - return np.mean(row[-elements_to_take:]) - - def _how_many_elements_use_for_averaging(self, time_series: np.array): - elements_to_take = round(len(time_series) * self.part_for_averaging) - elements_to_take = fix_elements_number(elements_to_take) - return elements_to_take - - -def split_rolling_slices(input_data: InputData): - """ Prepare slices for features series. - Example of result for time series [0, 1, 2, 3]: - [[0, nan, nan, nan], - [0, 1, nan, nan], - [0, 1, 2, nan], - [0, 1, 2, 3]] - """ - nan_mask = np.triu(np.ones_like(input_data.features, dtype=bool), k=1) - final_matrix = np.tril(input_data.features, k=0) - final_matrix = np.array(final_matrix, dtype=float) - final_matrix[nan_mask] = np.nan - - return final_matrix - - -def fix_elements_number(elements_to_take: int): - if elements_to_take < 2: - return 2 - return elements_to_take + def _window(self, time_series: np.ndarray): + return max(2, round(time_series.shape[0] * self.part_for_averaging)) diff --git a/fedot/core/repository/data/data_operation_repository.json b/fedot/core/repository/data/data_operation_repository.json index 27dc9ef572..32ae7884c3 100644 --- a/fedot/core/repository/data/data_operation_repository.json +++ b/fedot/core/repository/data/data_operation_repository.json @@ -219,9 +219,9 @@ }, "ransac_non_lin_reg": { "meta": "regression_preprocessing", - "presets": ["fast_train", "*tree"], + "presets": ["*tree"], "tags": ["affects_target", "non_linear", "filtering", - "correct_params", "non_applicable_for_ts", "non-default"] + "correct_params", "non_applicable_for_ts"] }, "isolation_forest_reg": { "meta": "regression_preprocessing", @@ -293,7 +293,7 @@ }, "diff_filter": { "meta": "custom_time_series_transformation", - "presets": ["fast_train", "ts"], + "presets": ["ts"], "tags": [ "differential", "non_lagged", diff --git a/test/integration/models/test_model.py b/test/integration/models/test_model.py index 55af186a14..4719f9efa5 100644 --- a/test/integration/models/test_model.py +++ b/test/integration/models/test_model.py @@ -1,7 +1,8 @@ import pickle -from collections import defaultdict + from copy import deepcopy from time import perf_counter +from typing import Tuple, Optional import numpy as np import pytest @@ -10,6 +11,7 @@ from sklearn.metrics import mean_absolute_error, mean_squared_error, roc_auc_score as roc_auc from sklearn.preprocessing import MinMaxScaler +from fedot.core.constants import FAST_TRAIN_PRESET_NAME from fedot.core.data.data import InputData, OutputData from fedot.core.data.data_split import train_test_data_setup from fedot.core.data.supplementary_data import SupplementaryData @@ -26,7 +28,7 @@ from fedot.core.pipelines.node import PipelineNode from fedot.core.pipelines.pipeline import Pipeline from fedot.core.repository.dataset_types import DataTypesEnum -from fedot.core.repository.operation_types_repository import OperationTypesRepository +from fedot.core.repository.operation_types_repository import OperationMetaInfo, OperationTypesRepository from fedot.core.repository.tasks import Task, TaskTypesEnum, TsForecastingParams from test.unit.common_tests import is_predict_ignores_target from test.unit.data_operations.test_time_series_operations import synthetic_univariate_ts @@ -135,6 +137,41 @@ def get_pca_incorrect_data(): return input_data +def get_operation_perfomance(operation: OperationMetaInfo, + data_lengths: Tuple[float, ...], + times: int = 1) -> Optional[Tuple[float, ...]]: + """ + Helper function to check perfomance of only the first valid operation pair (task_type, input_type). + """ + def fit_time_for_operation(operation: OperationMetaInfo, + data: InputData): + nodes_from = [] + if task_type is TaskTypesEnum.ts_forecasting: + if 'non_lagged' not in operation.tags: + nodes_from = [PipelineNode('lagged')] + node = PipelineNode(operation.id, nodes_from=nodes_from) + pipeline = Pipeline(node) + start_time = perf_counter() + pipeline.fit(data) + return perf_counter() - start_time + + for task_type in operation.task_type: + for data_type in operation.input_types: + perfomance_values = [] + for length in data_lengths: + data = get_data_for_testing(task_type, data_type, + length=length, features_count=2, + random=True) + if data is not None: + min_evaluated_time = min(fit_time_for_operation(operation, data) for _ in range(times)) + perfomance_values.append(min_evaluated_time) + if perfomance_values: + if len(perfomance_values) != len(data_lengths): + raise ValueError('not all measurements have been proceeded') + return tuple(perfomance_values) + raise Exception(f"Fit time for operation ``{operation.id}`` cannot be measured") + + @pytest.fixture() def classification_dataset(): samples = 1000 @@ -475,31 +512,31 @@ def test_operations_are_serializable(): def test_operations_are_fast(): - # models that raise exception - to_skip = ['custom', 'decompose', 'class_decompose'] - time_limits = defaultdict(lambda *args: 0.5, {'expensive': 2, 'non-default': 100}) + """ + Test ensures that all operations with fast_train preset meet sustainability expectation. + Test defines operation complexity as polynomial function of data size. + If complexity function grows fast, then operation should not have fast_train tag. + """ + + data_lengths = tuple(map(int, np.logspace(2.2, 4, 6))) + reference_operations = ['rf', 'rfr'] + to_skip = ['custom', 'decompose', 'class_decompose', 'kmeans', + 'resample', 'one_hot_encoding'] + reference_operations + reference_time = (float('inf'), ) * len(data_lengths) + # tries for time measuring + attempt = 2 for operation in OperationTypesRepository('all')._repo: - if operation.id in to_skip: - continue - time_limit = [time_limits[tag] for tag in time_limits if tag in operation.tags] - time_limit = max(time_limit) if time_limit else time_limits.default_factory() - for task_type in operation.task_type: - for data_type in operation.input_types: - data = get_data_for_testing(task_type, data_type, - length=100, features_count=2, - random=True) - if data is not None: - try: - nodes_from = [] - if task_type is TaskTypesEnum.ts_forecasting: - if 'non_lagged' not in operation.tags: - nodes_from = [PipelineNode('lagged')] - node = PipelineNode(operation.id, nodes_from=nodes_from) - pipeline = Pipeline(node) - start_time = perf_counter() - pipeline.fit(data) - stop_time = perf_counter() - start_time - assert stop_time <= time_limit or True - except NotImplementedError: - pass + if operation.id in reference_operations: + perfomance_values = get_operation_perfomance(operation, data_lengths, attempt) + reference_time = tuple(map(min, zip(perfomance_values, reference_time))) + + for operation in OperationTypesRepository('all')._repo: + if (operation.id not in to_skip and operation.presets and FAST_TRAIN_PRESET_NAME in operation.presets): + for _ in range(attempt): + perfomance_values = get_operation_perfomance(operation, data_lengths) + # if attempt is successful then stop + if all(x >= y for x, y in zip(reference_time, perfomance_values)): + break + else: + raise Exception(f"Operation {operation.id} cannot have ``fast-train`` tag")