From 73da768fd15d59b902b553d01f9b338281619de0 Mon Sep 17 00:00:00 2001 From: jonasvdd Date: Thu, 13 Oct 2022 11:25:42 +0200 Subject: [PATCH 1/4] :sparkles: support for np.ndarray parsing (segment_indices) --- tsflex/utils/attribute_parsing.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/tsflex/utils/attribute_parsing.py b/tsflex/utils/attribute_parsing.py index f543d3e9..ff58a86b 100644 --- a/tsflex/utils/attribute_parsing.py +++ b/tsflex/utils/attribute_parsing.py @@ -7,6 +7,7 @@ from typing import Any import pandas as pd +import numpy as np from tsflex.utils.time import parse_time_arg @@ -31,16 +32,26 @@ def determine_type(data: Any) -> DataType: if data is None: return DataType.UNDEFINED - elif isinstance(data, (pd.Series, pd.DataFrame)): - dtype_str = str(data.index.dtype) + elif isinstance(data, (pd.Series, pd.DataFrame, np.ndarray)): + if isinstance(data, np.ndarray): + if not len(data): + return DataType.UNDEFINED + dtype_str = str(data.dtype) + else: + dtype_str = str(data.index.dtype) if AttributeParser._datetime_regex.match(dtype_str) is not None: return DataType.TIME + elif dtype_str == 'object': + # we make the assumption that the fist element is the same type as the + # rest + return AttributeParser.determine_type(data[0]) elif any(r.match(dtype_str) for r in AttributeParser._numeric_regexes): return DataType.SEQUENCE elif isinstance(data, (int, float)): return DataType.SEQUENCE - + elif isinstance(data, pd.Timestamp): + return DataType.TIME elif isinstance(data, (str, pd.Timedelta)): # parse_time_arg already raises an error when an invalid datatype is passed parse_time_arg(data) @@ -54,7 +65,7 @@ def determine_type(data: Any) -> DataType: ) return dtype_list[0] - raise ValueError(f"Unsupported data type {type(data)}") + raise ValueError(f"Unsupported data type {type(data)} {str(data.dtype)} {data[:10]}") @staticmethod def check_expected_type(data: Any, expected: DataType) -> bool: From c23b512b10f6a00df65350e06b715ffe655ce47a Mon Sep 17 00:00:00 2001 From: jonasvdd Date: Thu, 13 Oct 2022 11:26:04 +0200 Subject: [PATCH 2/4] :muscle: first draft of `calculate_unsgemented` --- tsflex/features/feature_collection.py | 53 +++++++++++++++- .../segmenter/strided_rolling_factory.py | 61 ++++++++++++++++--- 2 files changed, 103 insertions(+), 11 deletions(-) diff --git a/tsflex/features/feature_collection.py b/tsflex/features/feature_collection.py index 28621ec1..46605302 100644 --- a/tsflex/features/feature_collection.py +++ b/tsflex/features/feature_collection.py @@ -341,6 +341,55 @@ def _process_segment_idxs( segment_idxs = segment_idxs.squeeze() # remove singleton dimensions return segment_idxs + def calculate_unsegmented( + self, + data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]], + **kwargs, + ) -> Union[List[pd.DataFrame], pd.DataFrame]: + """Calculate features over the whole series (`data`). + + This implies that all FeatureDescriptors will use the whole, unsegmented `data`. + + Parameters + ---------- + data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]] + The data for which the features will be calculated over. + **kwargs + Additional keyword arguments passed to the `calculate` method. + + Returns + ------- + Union[List[pd.DataFrame], pd.DataFrame] + The calculated features. + + """ + # Make sure that kwargs does not contain start_idx or end_idx + for k in ["segment_start_idxs", "segment_end_idxs", "stride"]: + assert k not in kwargs, f"`{k}` is not allowed in `calculate_unsegmented`" + + data = to_list(data) + min_idx = min([s.index[0] for s in data]) + max_idx = max([s.index[-1] for s in data]) + + # Add a small offset to max_idx to ensure that the last index is included + # TODO: can this be made any cleaner? + if isinstance(max_idx, int): + max_idx += 1 + elif isinstance(max_idx, float): + max_idx += 1e-6 + elif isinstance(max_idx, pd.Timestamp): + max_idx += pd.Timedelta("1us") + else: + raise ValueError(f"invalid index dtype {type(max_idx)}") + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=RuntimeWarning) + out = self.calculate( + data, segment_start_idxs=[min_idx], segment_end_idxs=[max_idx], **kwargs + ) + + return out + def calculate( self, data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]], @@ -526,7 +575,9 @@ def calculate( # Convert to numpy array (if necessary) if segment_start_idxs is not None: - segment_start_idxs = FeatureCollection._process_segment_idxs(segment_start_idxs) + segment_start_idxs = FeatureCollection._process_segment_idxs( + segment_start_idxs + ) if segment_end_idxs is not None: segment_end_idxs = FeatureCollection._process_segment_idxs(segment_end_idxs) diff --git a/tsflex/features/segmenter/strided_rolling_factory.py b/tsflex/features/segmenter/strided_rolling_factory.py index 6ca0467c..8c58f642 100644 --- a/tsflex/features/segmenter/strided_rolling_factory.py +++ b/tsflex/features/segmenter/strided_rolling_factory.py @@ -9,13 +9,17 @@ __author__ = "Jonas Van Der Donckt" +from tracemalloc import start + +import numpy as np + +from ...utils.attribute_parsing import AttributeParser, DataType from .strided_rolling import ( - StridedRolling, - TimeStridedRolling, SequenceStridedRolling, + StridedRolling, TimeIndexSampleStridedRolling, + TimeStridedRolling, ) -from ...utils.attribute_parsing import AttributeParser, DataType class StridedRollingFactory: @@ -54,8 +58,8 @@ def get_segmenter(data, window, strides, **kwargs) -> StridedRolling: Raises ------ ValueError - When incompatible data & window-stride data types are passed (e.g. time - window-stride args on sequence data-index). + When incompatible segment_indices, data & window-stride data types are + passed (e.g. time window-stride args on sequence data-index). Returns ------- @@ -64,17 +68,54 @@ def get_segmenter(data, window, strides, **kwargs) -> StridedRolling: """ data_dtype = AttributeParser.determine_type(data) + + # Get the start and end indices of the data and replace them with [] when None + start_indices = kwargs.get("segment_start_idxs") + # start_indices = [] if start_indices is None else start_indices + end_indices = kwargs.get("segment_end_idxs") + # end_indices = [] if end_indices is None else end_indices + if strides is None: - args_dtype = AttributeParser.determine_type(window) + ws_dtype = AttributeParser.determine_type(window) else: - args_dtype = AttributeParser.determine_type([window] + strides) + ws_dtype = AttributeParser.determine_type([window] + strides) + + if isinstance(start_indices, np.ndarray) and isinstance( + end_indices, np.ndarray + ): + # When both segment_indices are passed, this must match the data dtype + segment_dtype = AttributeParser.determine_type(start_indices) + assert segment_dtype == AttributeParser.determine_type(end_indices) + if segment_dtype != DataType.UNDEFINED: + assert segment_dtype == data_dtype, ( + "Currently, only TimeStridedRolling and SequenceStridedRolling are " + + "supported, as such, the segment and data dtype must match;" + + f"Got seg_dtype={segment_dtype} and data_dtype={data_dtype}." + ) + window = None + return StridedRollingFactory._datatype_to_stroll[segment_dtype]( + data, window, strides, **kwargs + ) + elif isinstance(start_indices, np.ndarray) or isinstance( + end_indices, np.ndarray + ): + # if only one of the start and end-indices are passed, we must check + # if these are compatible with the window and stride params + segment_dtype = AttributeParser.determine_type( + start_indices if start_indices is not None else end_indices + ) + assert segment_dtype == ws_dtype, ( + f"Segment start/end indices must be of the same type as the window " + + "and stride params when only one of the two segment indices is given." + + f"Got seg_dtype={segment_dtype} and ws_dtype={ws_dtype}." + ) - if window is None or data_dtype.value == args_dtype.value: + if window is None or data_dtype.value == ws_dtype.value: return StridedRollingFactory._datatype_to_stroll[data_dtype]( data, window, strides, **kwargs ) - elif data_dtype == DataType.TIME and args_dtype == DataType.SEQUENCE: + elif data_dtype == DataType.TIME and ws_dtype == DataType.SEQUENCE: # Note: this is very niche and thus requires advanced knowledge return TimeIndexSampleStridedRolling(data, window, strides, **kwargs) - elif data_dtype == DataType.SEQUENCE and args_dtype == DataType.TIME: + elif data_dtype == DataType.SEQUENCE and ws_dtype == DataType.TIME: raise ValueError("Cannot segment a sequence-series with a time window") From 2771c0b43c354c5bd24589fcb568dfd5de5387c3 Mon Sep 17 00:00:00 2001 From: jonasvdd Date: Thu, 13 Oct 2022 11:26:13 +0200 Subject: [PATCH 3/4] :hammer: tests --- tests/test_features_feature_collection.py | 152 ++++++++++++++++++++++ 1 file changed, 152 insertions(+) diff --git a/tests/test_features_feature_collection.py b/tests/test_features_feature_collection.py index e468e302..65d9f01d 100644 --- a/tests/test_features_feature_collection.py +++ b/tests/test_features_feature_collection.py @@ -2141,3 +2141,155 @@ def test_feature_collection_various_timezones_segment_start_idxs(): ) res = fc.calculate(s_usa, segment_start_idxs=s_none.index[:3].values, n_jobs=0, return_df=True) assert np.all(res.values == []) + + + +# --------------------------- global_segmentation --------------------------- + + + +# --------------------------- Start & end indices --------------------------- +def test_int_segment_idxs_time_indexed_data(): + # Create some time-indexed data + series = np.random.rand(100) + ts_index = pd.date_range(start="2022-06-09 00:00:00", periods=len(series), freq="min") + df = pd.DataFrame({"Value": series}, index=ts_index) + + # NOTE the window is of int dtype -> `TimeIndexSampleTridedRolling` + fc_tis_stroll = FeatureCollection( + FeatureDescriptor( + function = np.mean, + series_name="Value", + window=len(df)+20, + stride=100 + ) + ) + + # NOTE: The window and sride are of time dtype -> TimeStridedRolling + fc_t_stroll = FeatureCollection( + FeatureDescriptor( + function = np.mean, + series_name="Value", + window="100min", + stride="1min" + ) + ) + + # Integer alike segment indices are not supported for time-indexed data + # -> `TimeIndexSampleTridedRolling` is used (based on win-stride-data dtype) + with pytest.raises((NotImplementedError, RuntimeError)): + fc_tis_stroll.calculate(data=df, segment_start_idxs=[0, 50, 100], n_jobs=0, return_df=True) + + # Integer alike segment indices are not supported for time-indxed data + # -> `TimeStridedRolling` is used (based on win-stride-data dtype) + with pytest.raises((NotImplementedError, RuntimeError)): + fc_t_stroll.calculate(data=df, segment_start_idxs=[0, 50, 100], n_jobs=0, return_df=True) + +def test_time_segment_idxs_time_indexed_data(): + # Create some time-indexed data + series = np.random.rand(100) + ts_index = pd.date_range(start="2022-06-09 00:00:00", periods=len(series), freq="min") + df = pd.DataFrame({"Value": series}, index=ts_index) + + # NOTE the window is of int dtype -> `TimeIndexSampleTridedRolling` + fc_tis_stroll = FeatureCollection( + FeatureDescriptor( + function = np.mean, + series_name="Value", + window=len(df)+20, + stride=100 + ) + ) + # NOTE: The window and sride are of time dtype -> TimeStridedRolling + fc_t_stroll = FeatureCollection( + FeatureDescriptor( + function = np.mean, + series_name="Value", + window="100min", + stride="1min" + ) + ) + + # Time based segment indices are supported for time-indexed data + # NOTE: it does not matter whether the window and stride are of int or time dtype + # within the FeatureDescriptors When both the segment_start_idxs and + # segment_end_idxs are set + fc_tis_stroll.calculate( + data=df, + segment_start_idxs=[df.index[0]], + segment_end_idxs=[df.index[-1]], + n_jobs=0, + return_df=True + ) + + # And this must most certainly work for a FeatureCollection withholding time-based + # window-stride featureDescriptors + fc_t_stroll.calculate( + data=df, + segment_start_idxs=[df.index[0]], + segment_end_idxs=[df.index[-1]], + n_jobs=0, + return_df=True + ) + +# --------------------------- calculate unsegmented --------------------------- +def test_calculate_unsegmented_time_index_data(): + series = np.random.rand(100) + ts_index = pd.date_range(start="2022-06-09 00:00:00", periods=len(series), freq="min") + df = pd.DataFrame({"Value": series}, index=ts_index) + + fc_no_ws_args = FeatureCollection( + FeatureDescriptor( + function = len, #np.mean, + series_name="Value", + ) + ) + fc_ws_int = FeatureCollection( + FeatureDescriptor( + function = len, #np.mean, + series_name="Value", + window=10, + stride=10 + ) + ) + fc_ws_float = FeatureCollection( + FeatureDescriptor( + function = len, #np.mean, + series_name="Value", + window=5.6, + stride=6.6 + ) + ) + fc_ws_time = FeatureCollection( + FeatureDescriptor( + function = len, #np.mean, + series_name="Value", + window="5min", + stride="1hour" + ) + ) + + for fc in [fc_ws_int, fc_no_ws_args, fc_ws_time]: + out = fc.calculate_unsegmented(data=df, window_idx='end', return_df=True, include_final_window=True, n_jobs=0) + # assert that all the data was used + assert out.values[0] == len(df) + assert out.index[-1] > df.index[-1] + + +def test_calculate_unsegmented_time_index_data(): + series = np.random.rand(100) + ts_index = pd.date_range(start="2022-06-09 00:00:00", periods=len(series), freq="min") + df = pd.DataFrame({"Value": series}, index=ts_index) + + fc = FeatureCollection( + FeatureDescriptor( + function = len, #np.mean, + series_name="Value", + window=len(df)+20, + # stride=100 + ) + ) + out = fc.calculate_unsegmented(data=df, window_idx='end', return_df=True, include_final_window=True, n_jobs=0) + # assert that all the data was used + assert out.values[0] == len(df) + assert out.index[-1] > df.index[-1] From 0f715180f1a1973b64068ca6b5f46e3b7de6ff3f Mon Sep 17 00:00:00 2001 From: jonasvdd Date: Fri, 14 Oct 2022 07:55:32 +0200 Subject: [PATCH 4/4] :see_no_evil: extending test and fixing small :bug: --- tests/test_features_feature_collection.py | 67 +++++++++++++++++++---- tsflex/utils/attribute_parsing.py | 2 +- 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/tests/test_features_feature_collection.py b/tests/test_features_feature_collection.py index 65d9f01d..9d5e86c5 100644 --- a/tests/test_features_feature_collection.py +++ b/tests/test_features_feature_collection.py @@ -2269,27 +2269,72 @@ def test_calculate_unsegmented_time_index_data(): ) ) - for fc in [fc_ws_int, fc_no_ws_args, fc_ws_time]: + # NOTE: the datatype of the FeatureDescriptors does not matter + # at all when the calclulate unsegmented method is used + for fc in [fc_ws_int, fc_no_ws_args, fc_ws_time, fc_ws_float]: out = fc.calculate_unsegmented(data=df, window_idx='end', return_df=True, include_final_window=True, n_jobs=0) # assert that all the data was used assert out.values[0] == len(df) + # assert that the otuput index is greater than the data index + # NOTE: this means that a datapoint is used, which is just outside the + # datarange of out assert out.index[-1] > df.index[-1] + out = fc.calculate_unsegmented(data=df, window_idx='begin', return_df=True, include_final_window=True, n_jobs=0) + # assert that all the data was used + assert out.values[0] == len(df) + # Assert that the output index ins the first index item of data + assert out.index[0] == df.index[0] -def test_calculate_unsegmented_time_index_data(): + +def test_calculate_unsegmented_numeric_index_data(): series = np.random.rand(100) - ts_index = pd.date_range(start="2022-06-09 00:00:00", periods=len(series), freq="min") - df = pd.DataFrame({"Value": series}, index=ts_index) + df = pd.DataFrame({"Value": series}) - fc = FeatureCollection( + fc_no_ws_args = FeatureCollection( FeatureDescriptor( function = len, #np.mean, series_name="Value", - window=len(df)+20, - # stride=100 ) ) - out = fc.calculate_unsegmented(data=df, window_idx='end', return_df=True, include_final_window=True, n_jobs=0) - # assert that all the data was used - assert out.values[0] == len(df) - assert out.index[-1] > df.index[-1] + fc_ws_int = FeatureCollection( + FeatureDescriptor( + function = len, #np.mean, + series_name="Value", + window=10, + stride=10 + ) + ) + fc_ws_float = FeatureCollection( + FeatureDescriptor( + function = len, #np.mean, + series_name="Value", + window=5.6, + stride=6.6 + ) + ) + fc_ws_time = FeatureCollection( + FeatureDescriptor( + function = len, #np.mean, + series_name="Value", + window="5min", + stride="1hour" + ) + ) + + # NOTE: the datatype of the FeatureDescriptors does not matter + # at all when the calclulate unsegmented method is used + for fc in [fc_ws_int, fc_no_ws_args, fc_ws_time, fc_ws_float]: + out = fc.calculate_unsegmented(data=df, window_idx='end', return_df=True, include_final_window=True, n_jobs=0) + # assert that all the data was used + assert out.values[0] == len(df) + # assert that the otuput index is greater than the data index + # NOTE: this means that a datapoint is used, which is just outside the + # datarange of out + assert out.index[-1] > df.index[-1] + + out = fc.calculate_unsegmented(data=df, window_idx='begin', return_df=True, include_final_window=True, n_jobs=0) + # assert that all the data was used + assert out.values[0] == len(df) + # Assert that the output index ins the first index item of data + assert out.index[0] == df.index[0] \ No newline at end of file diff --git a/tsflex/utils/attribute_parsing.py b/tsflex/utils/attribute_parsing.py index ff58a86b..1171016e 100644 --- a/tsflex/utils/attribute_parsing.py +++ b/tsflex/utils/attribute_parsing.py @@ -65,7 +65,7 @@ def determine_type(data: Any) -> DataType: ) return dtype_list[0] - raise ValueError(f"Unsupported data type {type(data)} {str(data.dtype)} {data[:10]}") + raise ValueError(f"Unsupported data type {type(data)}") @staticmethod def check_expected_type(data: Any, expected: DataType) -> bool: