Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

time-series batch / whole-series feature calculation #77

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions tests/test_features_feature_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2141,3 +2141,200 @@ def test_feature_collection_various_timezones_segment_start_idxs():
)
res = fc.calculate(s_usa, segment_start_idxs=s_none.index[:3].values, n_jobs=0, return_df=True)
assert np.all(res.values == [])



# --------------------------- global_segmentation ---------------------------



# --------------------------- Start & end indices ---------------------------
def test_int_segment_idxs_time_indexed_data():
# Create some time-indexed data
series = np.random.rand(100)
ts_index = pd.date_range(start="2022-06-09 00:00:00", periods=len(series), freq="min")
df = pd.DataFrame({"Value": series}, index=ts_index)

# NOTE the window is of int dtype -> `TimeIndexSampleTridedRolling`
fc_tis_stroll = FeatureCollection(
FeatureDescriptor(
function = np.mean,
series_name="Value",
window=len(df)+20,
stride=100
)
)

# NOTE: The window and sride are of time dtype -> TimeStridedRolling
fc_t_stroll = FeatureCollection(
FeatureDescriptor(
function = np.mean,
series_name="Value",
window="100min",
stride="1min"
)
)

# Integer alike segment indices are not supported for time-indexed data
# -> `TimeIndexSampleTridedRolling` is used (based on win-stride-data dtype)
with pytest.raises((NotImplementedError, RuntimeError)):
fc_tis_stroll.calculate(data=df, segment_start_idxs=[0, 50, 100], n_jobs=0, return_df=True)

# Integer alike segment indices are not supported for time-indxed data
# -> `TimeStridedRolling` is used (based on win-stride-data dtype)
with pytest.raises((NotImplementedError, RuntimeError)):
fc_t_stroll.calculate(data=df, segment_start_idxs=[0, 50, 100], n_jobs=0, return_df=True)

def test_time_segment_idxs_time_indexed_data():
# Create some time-indexed data
series = np.random.rand(100)
ts_index = pd.date_range(start="2022-06-09 00:00:00", periods=len(series), freq="min")
df = pd.DataFrame({"Value": series}, index=ts_index)

# NOTE the window is of int dtype -> `TimeIndexSampleTridedRolling`
fc_tis_stroll = FeatureCollection(
FeatureDescriptor(
function = np.mean,
series_name="Value",
window=len(df)+20,
stride=100
)
)
# NOTE: The window and sride are of time dtype -> TimeStridedRolling
fc_t_stroll = FeatureCollection(
FeatureDescriptor(
function = np.mean,
series_name="Value",
window="100min",
stride="1min"
)
)

# Time based segment indices are supported for time-indexed data
# NOTE: it does not matter whether the window and stride are of int or time dtype
# within the FeatureDescriptors When both the segment_start_idxs and
# segment_end_idxs are set
fc_tis_stroll.calculate(
data=df,
segment_start_idxs=[df.index[0]],
segment_end_idxs=[df.index[-1]],
n_jobs=0,
return_df=True
)

# And this must most certainly work for a FeatureCollection withholding time-based
# window-stride featureDescriptors
fc_t_stroll.calculate(
data=df,
segment_start_idxs=[df.index[0]],
segment_end_idxs=[df.index[-1]],
n_jobs=0,
return_df=True
)

# --------------------------- calculate unsegmented ---------------------------
def test_calculate_unsegmented_time_index_data():
series = np.random.rand(100)
ts_index = pd.date_range(start="2022-06-09 00:00:00", periods=len(series), freq="min")
df = pd.DataFrame({"Value": series}, index=ts_index)

fc_no_ws_args = FeatureCollection(
FeatureDescriptor(
function = len, #np.mean,
series_name="Value",
)
)
fc_ws_int = FeatureCollection(
FeatureDescriptor(
function = len, #np.mean,
series_name="Value",
window=10,
stride=10
)
)
fc_ws_float = FeatureCollection(
FeatureDescriptor(
function = len, #np.mean,
series_name="Value",
window=5.6,
stride=6.6
)
)
fc_ws_time = FeatureCollection(
FeatureDescriptor(
function = len, #np.mean,
series_name="Value",
window="5min",
stride="1hour"
)
)

# NOTE: the datatype of the FeatureDescriptors does not matter
# at all when the calclulate unsegmented method is used
for fc in [fc_ws_int, fc_no_ws_args, fc_ws_time, fc_ws_float]:
out = fc.calculate_unsegmented(data=df, window_idx='end', return_df=True, include_final_window=True, n_jobs=0)
# assert that all the data was used
assert out.values[0] == len(df)
# assert that the otuput index is greater than the data index
# NOTE: this means that a datapoint is used, which is just outside the
# datarange of out
assert out.index[-1] > df.index[-1]

out = fc.calculate_unsegmented(data=df, window_idx='begin', return_df=True, include_final_window=True, n_jobs=0)
# assert that all the data was used
assert out.values[0] == len(df)
# Assert that the output index ins the first index item of data
assert out.index[0] == df.index[0]


def test_calculate_unsegmented_numeric_index_data():
series = np.random.rand(100)
df = pd.DataFrame({"Value": series})

fc_no_ws_args = FeatureCollection(
FeatureDescriptor(
function = len, #np.mean,
series_name="Value",
)
)
fc_ws_int = FeatureCollection(
FeatureDescriptor(
function = len, #np.mean,
series_name="Value",
window=10,
stride=10
)
)
fc_ws_float = FeatureCollection(
FeatureDescriptor(
function = len, #np.mean,
series_name="Value",
window=5.6,
stride=6.6
)
)
fc_ws_time = FeatureCollection(
FeatureDescriptor(
function = len, #np.mean,
series_name="Value",
window="5min",
stride="1hour"
)
)

# NOTE: the datatype of the FeatureDescriptors does not matter
# at all when the calclulate unsegmented method is used
for fc in [fc_ws_int, fc_no_ws_args, fc_ws_time, fc_ws_float]:
out = fc.calculate_unsegmented(data=df, window_idx='end', return_df=True, include_final_window=True, n_jobs=0)
# assert that all the data was used
assert out.values[0] == len(df)
# assert that the otuput index is greater than the data index
# NOTE: this means that a datapoint is used, which is just outside the
# datarange of out
assert out.index[-1] > df.index[-1]

out = fc.calculate_unsegmented(data=df, window_idx='begin', return_df=True, include_final_window=True, n_jobs=0)
# assert that all the data was used
assert out.values[0] == len(df)
# Assert that the output index ins the first index item of data
assert out.index[0] == df.index[0]
53 changes: 52 additions & 1 deletion tsflex/features/feature_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,55 @@ def _process_segment_idxs(
segment_idxs = segment_idxs.squeeze() # remove singleton dimensions
return segment_idxs

def calculate_unsegmented(
self,
data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]],
**kwargs,
) -> Union[List[pd.DataFrame], pd.DataFrame]:
"""Calculate features over the whole series (`data`).

This implies that all FeatureDescriptors will use the whole, unsegmented `data`.

Parameters
----------
data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]]
The data for which the features will be calculated over.
**kwargs
Additional keyword arguments passed to the `calculate` method.

Returns
-------
Union[List[pd.DataFrame], pd.DataFrame]
The calculated features.

"""
# Make sure that kwargs does not contain start_idx or end_idx
for k in ["segment_start_idxs", "segment_end_idxs", "stride"]:
assert k not in kwargs, f"`{k}` is not allowed in `calculate_unsegmented`"

data = to_list(data)
min_idx = min([s.index[0] for s in data])
max_idx = max([s.index[-1] for s in data])

# Add a small offset to max_idx to ensure that the last index is included
# TODO: can this be made any cleaner?
if isinstance(max_idx, int):
max_idx += 1
elif isinstance(max_idx, float):
max_idx += 1e-6
elif isinstance(max_idx, pd.Timestamp):
max_idx += pd.Timedelta("1us")
else:
raise ValueError(f"invalid index dtype {type(max_idx)}")

with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
out = self.calculate(
data, segment_start_idxs=[min_idx], segment_end_idxs=[max_idx], **kwargs
)

return out

def calculate(
self,
data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]],
Expand Down Expand Up @@ -526,7 +575,9 @@ def calculate(

# Convert to numpy array (if necessary)
if segment_start_idxs is not None:
segment_start_idxs = FeatureCollection._process_segment_idxs(segment_start_idxs)
segment_start_idxs = FeatureCollection._process_segment_idxs(
segment_start_idxs
)
if segment_end_idxs is not None:
segment_end_idxs = FeatureCollection._process_segment_idxs(segment_end_idxs)

Expand Down
61 changes: 51 additions & 10 deletions tsflex/features/segmenter/strided_rolling_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@

__author__ = "Jonas Van Der Donckt"

from tracemalloc import start

import numpy as np

from ...utils.attribute_parsing import AttributeParser, DataType
from .strided_rolling import (
StridedRolling,
TimeStridedRolling,
SequenceStridedRolling,
StridedRolling,
TimeIndexSampleStridedRolling,
TimeStridedRolling,
)
from ...utils.attribute_parsing import AttributeParser, DataType


class StridedRollingFactory:
Expand Down Expand Up @@ -54,8 +58,8 @@ def get_segmenter(data, window, strides, **kwargs) -> StridedRolling:
Raises
------
ValueError
When incompatible data & window-stride data types are passed (e.g. time
window-stride args on sequence data-index).
When incompatible segment_indices, data & window-stride data types are
passed (e.g. time window-stride args on sequence data-index).

Returns
-------
Expand All @@ -64,17 +68,54 @@ def get_segmenter(data, window, strides, **kwargs) -> StridedRolling:

"""
data_dtype = AttributeParser.determine_type(data)

# Get the start and end indices of the data and replace them with [] when None
start_indices = kwargs.get("segment_start_idxs")
# start_indices = [] if start_indices is None else start_indices
end_indices = kwargs.get("segment_end_idxs")
# end_indices = [] if end_indices is None else end_indices

if strides is None:
args_dtype = AttributeParser.determine_type(window)
ws_dtype = AttributeParser.determine_type(window)
else:
args_dtype = AttributeParser.determine_type([window] + strides)
ws_dtype = AttributeParser.determine_type([window] + strides)

if isinstance(start_indices, np.ndarray) and isinstance(
end_indices, np.ndarray
):
# When both segment_indices are passed, this must match the data dtype
segment_dtype = AttributeParser.determine_type(start_indices)
assert segment_dtype == AttributeParser.determine_type(end_indices)
if segment_dtype != DataType.UNDEFINED:
assert segment_dtype == data_dtype, (
"Currently, only TimeStridedRolling and SequenceStridedRolling are "
+ "supported, as such, the segment and data dtype must match;"
+ f"Got seg_dtype={segment_dtype} and data_dtype={data_dtype}."
)
window = None
return StridedRollingFactory._datatype_to_stroll[segment_dtype](
data, window, strides, **kwargs
)
elif isinstance(start_indices, np.ndarray) or isinstance(
end_indices, np.ndarray
):
# if only one of the start and end-indices are passed, we must check
# if these are compatible with the window and stride params
segment_dtype = AttributeParser.determine_type(
start_indices if start_indices is not None else end_indices
)
assert segment_dtype == ws_dtype, (
f"Segment start/end indices must be of the same type as the window "
+ "and stride params when only one of the two segment indices is given."
+ f"Got seg_dtype={segment_dtype} and ws_dtype={ws_dtype}."
)

if window is None or data_dtype.value == args_dtype.value:
if window is None or data_dtype.value == ws_dtype.value:
return StridedRollingFactory._datatype_to_stroll[data_dtype](
data, window, strides, **kwargs
)
elif data_dtype == DataType.TIME and args_dtype == DataType.SEQUENCE:
elif data_dtype == DataType.TIME and ws_dtype == DataType.SEQUENCE:
# Note: this is very niche and thus requires advanced knowledge
return TimeIndexSampleStridedRolling(data, window, strides, **kwargs)
elif data_dtype == DataType.SEQUENCE and args_dtype == DataType.TIME:
elif data_dtype == DataType.SEQUENCE and ws_dtype == DataType.TIME:
raise ValueError("Cannot segment a sequence-series with a time window")
17 changes: 14 additions & 3 deletions tsflex/utils/attribute_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any

import pandas as pd
import numpy as np

from tsflex.utils.time import parse_time_arg

Expand All @@ -31,16 +32,26 @@ def determine_type(data: Any) -> DataType:
if data is None:
return DataType.UNDEFINED

elif isinstance(data, (pd.Series, pd.DataFrame)):
dtype_str = str(data.index.dtype)
elif isinstance(data, (pd.Series, pd.DataFrame, np.ndarray)):
if isinstance(data, np.ndarray):
if not len(data):
return DataType.UNDEFINED
dtype_str = str(data.dtype)
else:
dtype_str = str(data.index.dtype)
if AttributeParser._datetime_regex.match(dtype_str) is not None:
return DataType.TIME
elif dtype_str == 'object':
# we make the assumption that the fist element is the same type as the
# rest
return AttributeParser.determine_type(data[0])
elif any(r.match(dtype_str) for r in AttributeParser._numeric_regexes):
return DataType.SEQUENCE

elif isinstance(data, (int, float)):
return DataType.SEQUENCE

elif isinstance(data, pd.Timestamp):
return DataType.TIME
elif isinstance(data, (str, pd.Timedelta)):
# parse_time_arg already raises an error when an invalid datatype is passed
parse_time_arg(data)
Expand Down