Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1182-fix #1215

Merged
merged 14 commits into from
Dec 12, 2023
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from copy import copy
from typing import Optional

import numpy as np

Expand Down Expand Up @@ -104,9 +103,9 @@ def predict(self, input_data: InputData) -> OutputData:
""" Get desired part of time series for averaging and calculate mean value """
forecast_length = input_data.task.task_params.forecast_length

elements_to_take = self._how_many_elements_use_for_averaging(input_data.features)
window = self._window(input_data.features)
# Prepare single forecast
mean_value = np.nanmean(input_data.features[-elements_to_take:])
mean_value = np.nanmean(input_data.features[-window:])
forecast = np.array([mean_value] * forecast_length).reshape((1, -1))

output_data = self._convert_to_output(input_data,
Expand All @@ -117,9 +116,13 @@ def predict(self, input_data: InputData) -> OutputData:
def predict_for_fit(self, input_data: InputData) -> OutputData:
input_data = copy(input_data)
forecast_length = input_data.task.task_params.forecast_length
parts = split_rolling_slices(input_data)
mean_values_for_chunks = self.average_by_axis(parts)
forecast = np.repeat(mean_values_for_chunks.reshape((-1, 1)), forecast_length, axis=1)
features = input_data.features
shape = features.shape[0]

window = self._window(features)
mean_values = np.array([np.mean(features[-window-shape+i:i+1]) for i in range(shape)])

forecast = np.repeat(mean_values.reshape((-1, 1)), forecast_length, axis=1)

# Update target
new_idx, transformed_target = ts_to_table(idx=input_data.idx, time_series=input_data.target,
Expand All @@ -133,42 +136,5 @@ def predict_for_fit(self, input_data: InputData) -> OutputData:
data_type=DataTypesEnum.table)
return output_data

def average_by_axis(self, parts: np.array):
""" Perform averaging for each column using last part of it """
mean_values_for_chunks = np.apply_along_axis(self._average, 1, parts)
return mean_values_for_chunks

def _average(self, row: np.array):
row = row[np.logical_not(np.isnan(row))]
if len(row) == 1:
return row

elements_to_take = self._how_many_elements_use_for_averaging(row)
return np.mean(row[-elements_to_take:])

def _how_many_elements_use_for_averaging(self, time_series: np.array):
elements_to_take = round(len(time_series) * self.part_for_averaging)
elements_to_take = fix_elements_number(elements_to_take)
return elements_to_take


def split_rolling_slices(input_data: InputData):
""" Prepare slices for features series.
Example of result for time series [0, 1, 2, 3]:
[[0, nan, nan, nan],
[0, 1, nan, nan],
[0, 1, 2, nan],
[0, 1, 2, 3]]
"""
nan_mask = np.triu(np.ones_like(input_data.features, dtype=bool), k=1)
final_matrix = np.tril(input_data.features, k=0)
final_matrix = np.array(final_matrix, dtype=float)
final_matrix[nan_mask] = np.nan

return final_matrix


def fix_elements_number(elements_to_take: int):
if elements_to_take < 2:
return 2
return elements_to_take
def _window(self, time_series: np.ndarray):
return max(2, round(time_series.shape[0] * self.part_for_averaging))
6 changes: 3 additions & 3 deletions fedot/core/repository/data/data_operation_repository.json
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@
},
"ransac_non_lin_reg": {
"meta": "regression_preprocessing",
"presets": ["fast_train", "*tree"],
"presets": ["*tree"],
"tags": ["affects_target", "non_linear", "filtering",
"correct_params", "non_applicable_for_ts", "non-default"]
"correct_params", "non_applicable_for_ts"]
},
"isolation_forest_reg": {
"meta": "regression_preprocessing",
Expand Down Expand Up @@ -293,7 +293,7 @@
},
"diff_filter": {
"meta": "custom_time_series_transformation",
"presets": ["fast_train", "ts"],
"presets": ["ts"],
"tags": [
"differential",
"non_lagged",
Expand Down
93 changes: 65 additions & 28 deletions test/integration/models/test_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pickle
from collections import defaultdict

from copy import deepcopy
from time import perf_counter
from typing import Tuple, Optional

import numpy as np
import pytest
Expand All @@ -10,6 +11,7 @@
from sklearn.metrics import mean_absolute_error, mean_squared_error, roc_auc_score as roc_auc
from sklearn.preprocessing import MinMaxScaler

from fedot.core.constants import FAST_TRAIN_PRESET_NAME
from fedot.core.data.data import InputData, OutputData
from fedot.core.data.data_split import train_test_data_setup
from fedot.core.data.supplementary_data import SupplementaryData
Expand All @@ -26,7 +28,7 @@
from fedot.core.pipelines.node import PipelineNode
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.repository.dataset_types import DataTypesEnum
from fedot.core.repository.operation_types_repository import OperationTypesRepository
from fedot.core.repository.operation_types_repository import OperationMetaInfo, OperationTypesRepository
from fedot.core.repository.tasks import Task, TaskTypesEnum, TsForecastingParams
from test.unit.common_tests import is_predict_ignores_target
from test.unit.data_operations.test_time_series_operations import synthetic_univariate_ts
Expand Down Expand Up @@ -135,6 +137,41 @@ def get_pca_incorrect_data():
return input_data


def get_operation_perfomance(operation: OperationMetaInfo,
data_lengths: Tuple[float, ...],
times: int = 1) -> Optional[Tuple[float, ...]]:
"""
Helper function to check perfomance of only the first valid operation pair (task_type, input_type).
"""
def fit_time_for_operation(operation: OperationMetaInfo,
data: InputData):
nodes_from = []
if task_type is TaskTypesEnum.ts_forecasting:
if 'non_lagged' not in operation.tags:
nodes_from = [PipelineNode('lagged')]
node = PipelineNode(operation.id, nodes_from=nodes_from)
pipeline = Pipeline(node)
start_time = perf_counter()
pipeline.fit(data)
return perf_counter() - start_time

for task_type in operation.task_type:
for data_type in operation.input_types:
perfomance_values = []
for length in data_lengths:
data = get_data_for_testing(task_type, data_type,
length=length, features_count=2,
random=True)
if data is not None:
min_evaluated_time = min(fit_time_for_operation(operation, data) for _ in range(times))
perfomance_values.append(min_evaluated_time)
if perfomance_values:
if len(perfomance_values) != len(data_lengths):
raise ValueError('not all measurements have been proceeded')
return tuple(perfomance_values)
raise Exception(f"Fit time for operation ``{operation.id}`` cannot be measured")


@pytest.fixture()
def classification_dataset():
samples = 1000
Expand Down Expand Up @@ -475,31 +512,31 @@ def test_operations_are_serializable():


def test_operations_are_fast():
# models that raise exception
to_skip = ['custom', 'decompose', 'class_decompose']
time_limits = defaultdict(lambda *args: 0.5, {'expensive': 2, 'non-default': 100})
"""
Test ensures that all operations with fast_train preset meet sustainability expectation.
Test defines operation complexity as polynomial function of data size.
If complexity function grows fast, then operation should not have fast_train tag.
"""

data_lengths = tuple(map(int, np.logspace(2.2, 4, 6)))
reference_operations = ['rf', 'rfr']
to_skip = ['custom', 'decompose', 'class_decompose', 'kmeans',
'resample', 'one_hot_encoding'] + reference_operations
reference_time = (float('inf'), ) * len(data_lengths)
# tries for time measuring
attempt = 2

for operation in OperationTypesRepository('all')._repo:
if operation.id in to_skip:
continue
time_limit = [time_limits[tag] for tag in time_limits if tag in operation.tags]
time_limit = max(time_limit) if time_limit else time_limits.default_factory()
for task_type in operation.task_type:
for data_type in operation.input_types:
data = get_data_for_testing(task_type, data_type,
length=100, features_count=2,
random=True)
if data is not None:
try:
nodes_from = []
if task_type is TaskTypesEnum.ts_forecasting:
if 'non_lagged' not in operation.tags:
nodes_from = [PipelineNode('lagged')]
node = PipelineNode(operation.id, nodes_from=nodes_from)
pipeline = Pipeline(node)
start_time = perf_counter()
pipeline.fit(data)
stop_time = perf_counter() - start_time
assert stop_time <= time_limit or True
except NotImplementedError:
pass
if operation.id in reference_operations:
perfomance_values = get_operation_perfomance(operation, data_lengths, attempt)
reference_time = tuple(map(min, zip(perfomance_values, reference_time)))

for operation in OperationTypesRepository('all')._repo:
if (operation.id not in to_skip and operation.presets and FAST_TRAIN_PRESET_NAME in operation.presets):
for _ in range(attempt):
perfomance_values = get_operation_perfomance(operation, data_lengths)
# if attempt is successful then stop
if all(x >= y for x, y in zip(reference_time, perfomance_values)):
break
else:
raise Exception(f"Operation {operation.id} cannot have ``fast-train`` tag")
Loading