diff --git a/functime/feature_extraction/tsfresh.py b/functime/feature_extraction/tsfresh.py index 2897d747..9e0f5dcd 100644 --- a/functime/feature_extraction/tsfresh.py +++ b/functime/feature_extraction/tsfresh.py @@ -41,8 +41,8 @@ def absolute_energy(x: TIME_SERIES_T) -> FLOAT_INT_EXPR: float | Expr """ if isinstance(x, pl.Series): - if x.len() > 300_000: # Would be different for different machines - return np.dot(x,x) + if x.len() > 300_000: # Would be different for different machines + return np.dot(x, x) return x.dot(x) @@ -244,7 +244,7 @@ def autoregressive_coefficients(x: TIME_SERIES_T, n_lags: int) -> List[float]: """ if isinstance(x, pl.Series): length = len(x) - n_lags - y = x.fill_null(0.) + y = x.fill_null(0.0) data_x = ( y.to_frame() .select( @@ -252,12 +252,12 @@ def autoregressive_coefficients(x: TIME_SERIES_T, n_lags: int) -> List[float]: pl.col(y.name).slice(n_lags - i, length=length).alias(str(i)) for i in range(1, n_lags + 1) ), - pl.lit(1) + pl.lit(1), ) .to_numpy() ) - y_ = y.tail(length).to_numpy(zero_copy_only=True).reshape((-1,1)) - out:np.ndarray = rs_faer_lstsq(data_x, y_) + y_ = y.tail(length).to_numpy(zero_copy_only=True).reshape((-1, 1)) + out: np.ndarray = rs_faer_lstsq(data_x, y_) return out.ravel() else: logger.info("Expression version of autoregressive_coefficients is not yet implemented due to " @@ -445,6 +445,7 @@ def change_quantiles( return expr.implode() + # Revisit later. def cid_ce(x: TIME_SERIES_T, normalize: bool = False) -> FLOAT_EXPR: """ @@ -495,6 +496,7 @@ def count_above(x: TIME_SERIES_T, threshold: float = 0.0) -> FLOAT_EXPR: """ return 100 * (x >= threshold).sum() / x.len() + # Should this be percentage? def count_above_mean(x: TIME_SERIES_T) -> INT_EXPR: """ @@ -787,15 +789,15 @@ def index_mass_quantile(x: TIME_SERIES_T, q: float) -> FLOAT_EXPR: x_cumsum = x.abs().cumsum().set_sorted() if isinstance(x, pl.Series): if x.is_integer(): - idx = x_cumsum.search_sorted(int(q*x_cumsum[-1]) + 1, "left") - else: # Search sorted is sensitive to dtype. - idx = x_cumsum.search_sorted(q*x_cumsum[-1], "left") + idx = x_cumsum.search_sorted(int(q * x_cumsum[-1]) + 1, "left") + else: # Search sorted is sensitive to dtype. + idx = x_cumsum.search_sorted(q * x_cumsum[-1], "left") return (idx + 1) / x.len() else: # In lazy case we have to cast, which kind of wasts some time, but this is on # par with the old implementation. Use max here because... believe it or not # max (with fast track because I did set_sorted()) is faster than .last() ??? - idx = x_cumsum.cast(pl.Float64).search_sorted(q*x_cumsum.max(), "left") + idx = x_cumsum.cast(pl.Float64).search_sorted(q * x_cumsum.max(), "left") return (idx + 1) / x.len() @@ -864,7 +866,7 @@ def last_location_of_minimum(x: TIME_SERIES_T) -> FLOAT_EXPR: def lempel_ziv_complexity( - x: TIME_SERIES_T, threshold: Union[float, pl.Expr], as_ratio:bool=True + x: TIME_SERIES_T, threshold: Union[float, pl.Expr], as_ratio: bool = True ) -> FLOAT_EXPR: """ Calculate a complexity estimate based on the Lempel-Ziv compression algorithm. The @@ -880,7 +882,7 @@ def lempel_ziv_complexity( Either a number, or an expression representing a comparable quantity. If x > value, then it will be binarized as 1 and 0 otherwise. If x is eager, then value must also be eager as well. - as_ratio: bool + as_ratio: bool If true, return the complexity / length of sequence Returns @@ -920,13 +922,15 @@ def linear_trend(x: TIME_SERIES_T) -> MAP_EXPR: if isinstance(x, pl.Series): n = x.len() if n == 1: - return {"slope": np.nan, "intercept":np.nan, "rss": np.nan} + return {"slope": np.nan, "intercept": np.nan, "rss": np.nan} m = n - 1 x_range_mean = m / 2 x_range = np.arange(start=0, stop=n) x_mean = x.mean() - beta = np.dot(x - x_mean, x_range - x_range_mean) / (m * np.var(x_range, ddof=1)) + beta = np.dot(x - x_mean, x_range - x_range_mean) / ( + m * np.var(x_range, ddof=1) + ) alpha = x_mean - beta * x_range_mean resid = x - (beta * x_range + alpha) return {"slope": beta, "intercept": alpha, "rss": np.dot(resid, resid)} @@ -964,7 +968,12 @@ def longest_streak_above_mean(x: TIME_SERIES_T) -> INT_EXPR: result = y.filter(y.struct.field("values")).struct.field("lengths").max() return 0 if result is None else result else: - return y.filter(y.struct.field("values")).struct.field("lengths").max().fill_null(0) + return ( + y.filter(y.struct.field("values")) + .struct.field("lengths") + .max() + .fill_null(0) + ) def longest_streak_below_mean(x: TIME_SERIES_T) -> INT_EXPR: @@ -988,7 +997,13 @@ def longest_streak_below_mean(x: TIME_SERIES_T) -> INT_EXPR: result = y.filter(y.struct.field("values")).struct.field("lengths").max() return 0 if result is None else result else: - return y.filter(y.struct.field("values")).struct.field("lengths").max().fill_null(0) + return ( + y.filter(y.struct.field("values")) + .struct.field("lengths") + .max() + .fill_null(0) + ) + def mean_abs_change(x: TIME_SERIES_T) -> FLOAT_EXPR: """ @@ -1042,7 +1057,11 @@ def mean_change(x: TIME_SERIES_T) -> FLOAT_EXPR: return 0 return (x[-1] - x[0]) / (x.len() - 1) else: - return pl.when(x.len() - 1 > 0).then((x.last() - x.first()) / (x.len() - 1)).otherwise(0) + return ( + pl.when(x.len() - 1 > 0) + .then((x.last() - x.first()) / (x.len() - 1)) + .otherwise(0) + ) def mean_n_absolute_max(x: TIME_SERIES_T, n_maxima: int) -> FLOAT_EXPR: @@ -1083,10 +1102,10 @@ def mean_second_derivative_central(x: TIME_SERIES_T) -> FLOAT_EXPR: return np.nan return (x[-1] - x[-2] - x[1] + x[0]) / (2 * (x.len() - 2)) else: - return pl.when(x.len() < 3).then( - np.nan - ).otherwise( - x.tail(2).sub(x.head(2)).diff().last() / (2 * (x.len() - 2)) + return ( + pl.when(x.len() < 3) + .then(np.nan) + .otherwise(x.tail(2).sub(x.head(2)).diff().last() / (2 * (x.len() - 2))) ) @@ -1112,6 +1131,7 @@ def number_crossings(x: TIME_SERIES_T, crossing_value: float = 0.0) -> FLOAT_EXP return y.eq(y.shift(1)).not_().sum() + def number_cwt_peaks(x: pl.Series, max_width: int = 5) -> float: """ Number of different peaks in x. @@ -1266,16 +1286,13 @@ def permutation_entropy( expr = permutation_entropy(pl.col(x.name), tau=tau, n_dims=n_dims, base=base) return frame.select(expr).item(0, 0) else: - if tau == 1: # Fast track the most common use case + if tau == 1: # Fast track the most common use case return ( - pl.concat_list( - x, - *(x.shift(-i) for i in range(1, n_dims)) - ).head(x.len() - n_dims + 1) - .list.eval( - pl.element().arg_sort() - ).value_counts() # groupby and count, but returns a struct - .struct.field("counts") # extract the field named "counts" + pl.concat_list(x, *(x.shift(-i) for i in range(1, n_dims))) + .head(x.len() - n_dims + 1) + .list.eval(pl.element().arg_sort()) + .value_counts() # groupby and count, but returns a struct + .struct.field("counts") # extract the field named "counts" .entropy(base=base, normalize=True) ) else: @@ -1283,11 +1300,14 @@ def permutation_entropy( return ( pl.concat_list( x.take_every(tau), - *(x.shift(-i).take_every(tau) for i in range(1, n_dims)) - ).filter( # This is the correct way to filter (with respect to tau) in this case. - pl.repeat(True, x.len()).shift(max_shift, fill_value=False) + *(x.shift(-i).take_every(tau) for i in range(1, n_dims)), + ) + .filter( # This is the correct way to filter (with respect to tau) in this case. + pl.repeat(True, x.len()) + .shift(max_shift, fill_value=False) .take_every(tau) - ).list.eval( + ) + .list.eval( pl.element().arg_sort() ) # for each inner list, do an argsort .value_counts() # groupby and count, but returns a struct @@ -1398,7 +1418,7 @@ def _into_sequential_chunks(x: pl.Series, m: int) -> np.ndarray: # Only works on series -def sample_entropy(x: TIME_SERIES_T, ratio: float = 0.2, m:int = 2) -> FLOAT_EXPR: +def sample_entropy(x: TIME_SERIES_T, ratio: float = 0.2, m: int = 2) -> FLOAT_EXPR: """ Calculate the sample entropy of a time series. This only works for Series input right now. @@ -1633,7 +1653,7 @@ def harmonic_mean(x: TIME_SERIES_T) -> FLOAT_EXPR: ------- float | Expr """ - return x.len() / (1.0/x).sum() + return x.len() / (1.0 / x).sum() def range_over_mean(x: TIME_SERIES_T) -> FLOAT_EXPR: @@ -1705,25 +1725,25 @@ def streak_length_stats(x: TIME_SERIES_T, above: bool, threshold: float) -> MAP_ y = y.filter(y.struct.field("values")).struct.field("lengths") if isinstance(x, pl.Series): return { - "min": y.min(), + "min": y.min() or 0, "max": y.max(), "mean": y.mean(), "std": y.std(), "10-percentile": y.quantile(0.1), "median": y.median(), "90-percentile": y.quantile(0.9), - "mode": y.mode()[0], + "mode": y.mode().sort()[0] if not y.is_empty() else None, } else: return pl.struct( - y.min().alias("min"), + y.min().clip_min(0).alias("min"), y.max().alias("max"), y.mean().alias("mean"), y.std().alias("std"), y.quantile(0.1).alias("10-percentile"), y.median().alias("median"), y.quantile(0.9).alias("90-percentile"), - y.mode().first().alias("mode"), + y.mode().sort(nulls_last=True).first().alias("mode"), ) @@ -1750,7 +1770,12 @@ def longest_streak_above(x: TIME_SERIES_T, threshold: float) -> TIME_SERIES_T: streak_max = y.filter(y.struct.field("values")).struct.field("lengths").max() return 0 if streak_max is None else streak_max else: - return y.filter(y.struct.field("values")).struct.field("lengths").max().fill_null(0) + return ( + y.filter(y.struct.field("values")) + .struct.field("lengths") + .max() + .fill_null(0) + ) def longest_streak_below(x: TIME_SERIES_T, threshold: float) -> TIME_SERIES_T: @@ -1775,7 +1800,12 @@ def longest_streak_below(x: TIME_SERIES_T, threshold: float) -> TIME_SERIES_T: streak_max = y.filter(y.struct.field("values")).struct.field("lengths").max() return 0 if streak_max is None else streak_max else: - return y.filter(y.struct.field("values")).struct.field("lengths").max().fill_null(0) + return ( + y.filter(y.struct.field("values")) + .struct.field("lengths") + .max() + .fill_null(0) + ) def longest_winning_streak(x: TIME_SERIES_T) -> TIME_SERIES_T: diff --git a/tests/test_tsfresh.py b/tests/test_tsfresh.py index ff6790e4..d86a8532 100644 --- a/tests/test_tsfresh.py +++ b/tests/test_tsfresh.py @@ -54,6 +54,7 @@ ratio_n_unique_to_length, root_mean_square, sample_entropy, + streak_length_stats, sum_reoccurring_points, sum_reoccurring_values, symmetry_looking, @@ -65,6 +66,99 @@ np.random.seed(42) +@pytest.mark.parametrize( + "S, params, res, k", + [ + ( + [0, 0, 0], + [True, 0], + [2, 2, 2.0, None, 2.0, 2.0, 2.0, 2], + {"check_dtype": False}, + ), + ( + [0, 0, 0], + [False, 0], + [2, 2, 2.0, None, 2.0, 2.0, 2.0, 2], + {"check_dtype": False}, + ), + ( + [0, 0, 0], + [False, 1], + [2, 2, 2.0, None, 2.0, 2.0, 2.0, 2], + {"check_dtype": False}, + ), + # won't work with no matches - error + # add error handling for this + ( + [0, 0, 0], + [True, 1], + [0, None, None, None, None, None, None, None], + {"check_dtype": False}, + ), + ( + [0, 1, 1, 0, 2, 2, 2], + [True, 0], + [2, 3, 2.5, 0.707107, 2.0, 2.5, 3.0, 2], + {"check_dtype": False}, + ), + # # floats + ( + [0, 1.5, 1.5, 0, 2.5, 2.5, 2.5], + [True, 0], + [2, 3, 2.5, 0.707107, 2.0, 2.5, 3.0, 2], + {"check_dtype": False}, + ), + # # negative floats + ( + [0, -1.5, -1.5, 0, -2.5, -2.5, -2.5], + [False, 0], + [2, 3, 2.5, 0.707107, 2.0, 2.5, 3.0, 2], + {"check_dtype": False}, + ), + ], +) +def test_streak_length_stats(S, params, res, k): + above = params[0] + threshold = params[1] + keys = [ + "min", + "max", + "mean", + "std", + "10-percentile", + "median", + "90-percentile", + "mode", + ] + res = pl.DataFrame(dict(zip(keys, res))) + + assert_frame_equal( + pl.DataFrame(streak_length_stats(pl.Series("a", S), above, threshold)), res, **k + ) + + # For no streaks, this returns an empty DataFrame. + # But, will return 0s and Nulls for Series. + # Hence, ad hoc way of making the expected result empty. + res = res.filter(pl.col("max").is_not_null()) + + assert_frame_equal( + pl.DataFrame({"a": S}) + .select(streak_length_stats(pl.col("a"), above, threshold)) + .unnest("min"), + res, + **k, + ) + + assert_frame_equal( + pl.LazyFrame({"a": S}) + .select(streak_length_stats(pl.col("a"), above, threshold)) + .unnest("min") + .collect(), + res, + **k, + ) + + @pytest.mark.parametrize( "S, res, k", [ @@ -128,7 +222,7 @@ def test_mean_change(S, res, k): ) assert_frame_equal( pl.LazyFrame({"a": S}).lazy().select(mean_change(pl.col("a"))).collect(), - pl.DataFrame({"a":pl.Series("a", res, dtype=pl.Float64)}), + pl.DataFrame({"a": pl.Series("a", res, dtype=pl.Float64)}), **k, ) @@ -1229,28 +1323,21 @@ def test_augmented_dickey_fuller(x, param, res): [ (pl.Series(range(10)), 0.0), (pl.Series([1, 3, 5]), 0.0), - (pl.Series([1, 3, 7, -3]),-3.0), + (pl.Series([1, 3, 7, -3]), -3.0), ], ) def test_mean_second_derivative_central(x, res): - assert mean_second_derivative_central(x) == res df = x.to_frame() assert_frame_equal( - df.select( - mean_second_derivative_central(pl.col(x.name)).alias("1") - ), - pl.DataFrame({ - "1": pl.Series([res]) - }) + df.select(mean_second_derivative_central(pl.col(x.name)).alias("1")), + pl.DataFrame({"1": pl.Series([res])}), ) assert_frame_equal( - df.lazy().select( - mean_second_derivative_central(pl.col(x.name)).alias("1") - ).collect(), - pl.DataFrame({ - "1": pl.Series([res]) - }) + df.lazy() + .select(mean_second_derivative_central(pl.col(x.name)).alias("1")) + .collect(), + pl.DataFrame({"1": pl.Series([res])}), ) @@ -1413,6 +1500,8 @@ def test_range_over_mean_and_range(S, res): df.lazy().select(range_over_mean(pl.col(x.name))).collect(), pl.DataFrame({x.name: [res]}), ) + + @pytest.mark.parametrize( "S, res, m", [ @@ -1422,12 +1511,12 @@ def test_range_over_mean_and_range(S, res): ([10, 20, 20, 30], [1], 15), ([10, -10, 10, -10], [3], 0), ([-10, 10.1, -10, 10.1, -10], [4], 10), - ([10,11,12,10,11], [3], 10.5) + ([10, 11, 12, 10, 11], [3], 10.5), ], ) def test_number_crossing(S, res, m): assert number_crossings(pl.Series(S), m) == res[0] - + assert_frame_equal( pl.DataFrame({"a": S}).select(number_crossings(pl.col("a"), m)), pl.DataFrame(pl.Series("a", res, pl.UInt32)), @@ -1438,11 +1527,14 @@ def test_number_crossing(S, res, m): ) -@pytest.mark.parametrize("S, t, d, b, res", [ - ([4, 7, 9, 10, 6, 11, 3], 1, 3, 2, 1.5219281), - (list(range(10)), 1, 3, math.e, 0.0), - ([10]*10, 1, 3, math.e, 0.0) -]) +@pytest.mark.parametrize( + "S, t, d, b, res", + [ + ([4, 7, 9, 10, 6, 11, 3], 1, 3, 2, 1.5219281), + (list(range(10)), 1, 3, math.e, 0.0), + ([10] * 10, 1, 3, math.e, 0.0), + ], +) def test_permutation_entropy(S, t, d, b, res): # Test 1 comes from: https://www.aptech.com/blog/permutation-entropy/ # Linear case. Should be 0 because there is no randomness in the permutations @@ -1450,25 +1542,31 @@ def test_permutation_entropy(S, t, d, b, res): x = pl.Series(S) - res_series = permutation_entropy(x, tau = t, n_dims = d, base = b) + res_series = permutation_entropy(x, tau=t, n_dims=d, base=b) assert np.isclose(res, res_series) df = x.to_frame() res_eager = df.select( - permutation_entropy(pl.col(x.name), tau = t, n_dims = d, base = b) + permutation_entropy(pl.col(x.name), tau=t, n_dims=d, base=b) ).item(0, 0) assert np.isclose(res, res_eager) - res_lazy = df.lazy().select( - permutation_entropy(pl.col(x.name), tau = t, n_dims = d, base = b) - ).collect().item(0, 0) + res_lazy = ( + df.lazy() + .select(permutation_entropy(pl.col(x.name), tau=t, n_dims=d, base=b)) + .collect() + .item(0, 0) + ) assert np.isclose(res, res_lazy) -@pytest.mark.parametrize("S, res", [ - (list(range(100)), 0.010471299867295437), - (np.sin(2 * np.pi * np.arange(3000)/100), 0.16367903754688098), - ([1], np.nan) -]) +@pytest.mark.parametrize( + "S, res", + [ + (list(range(100)), 0.010471299867295437), + (np.sin(2 * np.pi * np.arange(3000) / 100), 0.16367903754688098), + ([1], np.nan), + ], +) def test_sample_entropy(S, res): # Test 1's answer comes from comparing result with Tsfresh # Thest 2's answer comes from running this using the Python code on Wikipedia @@ -1477,12 +1575,16 @@ def test_sample_entropy(S, res): res_series = sample_entropy(x) assert np.isclose(res, res_series, atol=1e-12, equal_nan=True) -@pytest.mark.parametrize("S, res", [ - (list(range(300)), 0.04539477814685819), - (np.sin(2 * np.pi * np.arange(300)/100), 0.09072899366212879), - ([1, 2], 0.), - ([1], np.nan) -]) + +@pytest.mark.parametrize( + "S, res", + [ + (list(range(300)), 0.04539477814685819), + (np.sin(2 * np.pi * np.arange(300) / 100), 0.09072899366212879), + ([1, 2], 0.0), + ([1], np.nan), + ], +) def test_fourier_entropy(S, res): # All tests' answers come from comparing with tsfresh. x = pl.Series(S)