diff --git a/functime/feature_extractors.py b/functime/feature_extractors.py index 6e3fc610..8e5b1c3e 100644 --- a/functime/feature_extractors.py +++ b/functime/feature_extractors.py @@ -16,6 +16,8 @@ from functime._functime_rust import rs_faer_lstsq1 from functime._utils import UseAtOwnRisk +from .type_alias import DetrendMethod + # from functime.feature_extractor import FeatureExtractor # noqa: F401 logger = logging.getLogger(__name__) @@ -29,6 +31,7 @@ MAP_EXPR = Union[Mapping[str, float], pl.Expr] MAP_LIST_EXPR = Union[Mapping[str, List[float]], pl.Expr] + # from polars.type_aliases import IntoExpr lib = _get_shared_lib_location(__file__) @@ -1076,14 +1079,12 @@ def mean_change(x: TIME_SERIES_T) -> FLOAT_EXPR: float | Expr """ if isinstance(x, pl.Series): - if len(x) < 1: - return None - elif len(x) == 1: + if len(x) <= 1: return 0 return (x[-1] - x[0]) / (x.len() - 1) else: return ( - pl.when(x.len() - 1 > 0) + pl.when(x.len() > 1) .then((x.last() - x.first()) / (x.len() - 1)) .otherwise(0) ) @@ -1765,7 +1766,7 @@ def streak_length_stats(x: TIME_SERIES_T, above: bool, threshold: float) -> MAP_ } else: return pl.struct( - y.min().clip_min(0).alias("min"), + pl.max_horizontal(y.min(), 0).alias("min"), y.max().alias("max"), y.mean().alias("mean"), y.std().alias("std"), @@ -2274,6 +2275,32 @@ def linear_trend(self) -> pl.Expr: """ return linear_trend(self._expr) + def detrend(self, method: DetrendMethod = "linear") -> pl.Expr: + """ + Detrends the time series by either removing a fitted linear regression or by + removing the mean. This assumes that data is in order. + + Parameters + ---------- + method + Either `linear` or `mean` + + Returns + ------- + An expression representing detrend-ed column + """ + + if method == "linear": + N = self._expr.count() + x = pl.int_range(0, N, dtype=pl.Float64, eager=False) + coeff = pl.cov(self._expr, x) / x.var() + const = self._expr.mean() - coeff * (N - 1) / 2 + return self._expr - x * coeff - const + elif method == "mean": + return self._expr - self._expr.mean() + else: + raise ValueError(f"Unknown detrend method: {method}") + def longest_streak_above_mean(self) -> pl.Expr: """ Returns the length of the longest consecutive subsequence in x that is greater than the mean of x. diff --git a/functime/preprocessing.py b/functime/preprocessing.py index 3240dd35..3b7be9ec 100644 --- a/functime/preprocessing.py +++ b/functime/preprocessing.py @@ -300,11 +300,14 @@ def transform(X: pl.LazyFrame) -> pl.LazyFrame: .group_by_dynamic( index_column=time_col, by=entity_col, - every=freq, + offset=f"{w}{offset_alias}", + every=f"1{offset_alias}", period=f"{offset_n * w}{offset_alias}", + start_by="datapoint", ) .agg([stat_exprs[stat](w) for stat in stats]) - # NOTE: Must lag by 1 to avoid data leakage + # NOTE: Must lag by 1 to avoid data leakage. + # But given the current configuration, shift by w is what we want to do .select([entity_col, time_col, values.shift(w).over(entity_col)]) ) for w in window_sizes @@ -312,14 +315,14 @@ def transform(X: pl.LazyFrame) -> pl.LazyFrame: # Join all window lazy operations X_rolling = X_all[0] for X_window in X_all[1:]: - X_rolling = X_rolling.join(X_window, on=[entity_col, time_col], how="outer") + X_rolling = X_rolling.join(X_window, on=[entity_col, time_col], how="inner") # Defensive join to match original X index - X_new = X.join(X_rolling, on=[entity_col, time_col], how="left").select( - X_rolling.columns - ) + # X_new = X.join(X_rolling, on=[entity_col, time_col], how="left").select( + # X_rolling.columns + # ) if fill_strategy: - X_new = X_new.fill_null(strategy=fill_strategy) - artifacts = {"X_new": X_new} + X_rolling = X_X_rolling.fill_null(strategy=fill_strategy) + artifacts = {"X_new": X_rolling} return artifacts return transform @@ -734,41 +737,53 @@ def detrend(freq: str, method: Literal["linear", "mean"] = "linear"): def transform(X: pl.LazyFrame) -> pl.LazyFrame: entity_col, time_col = X.columns[:2] if method == "linear": - x = pl.col(time_col).arg_sort() - betas = [ - (pl.cov(col, x) / x.var()).over(entity_col).alias(f"{col}__beta") - for col in X.columns[2:] - ] - alphas = [ - ( - pl.col(col).mean().over(entity_col) - - pl.col(f"{col}__beta") * x.mean() - ).alias(f"{col}__alpha") - for col in X.columns[2:] - ] - residuals = [ - ( - pl.col(col) - pl.col(f"{col}__alpha") - pl.col(f"{col}__beta") * x - ).alias(col) - for col in X.columns[2:] - ] - X_new = ( - X.with_columns(betas) - .with_columns(alphas) - .with_columns(residuals) - .cache() + cols = X.columns + X_new_temp = ( + X.with_columns( + pl.col(time_col).arg_sort().over(entity_col).alias("__x") + ) + .with_columns( + *[ + (pl.cov(pl.col(c), pl.col("__x")) / pl.col("__x").var()) + .over(entity_col) + .name.suffix("__beta") + for c in X.columns[2:] + ], + *[ + pl.col(c).mean().over(entity_col).name.suffix("__mean") + for c in X.columns[2:] + ], + ) + .with_columns( + (pl.col(c + "__mean") - pl.col(c + "__beta") * (pl.count() - 1) / 2) + .over(entity_col) + .alias(c + "__alpha") + for c in X.columns[2:] + ) + .with_columns( + ( + pl.col(c) + - pl.col(c + "__beta") * pl.col("__x") + - pl.col(c + "__alpha") + ).alias(c) + for c in X.columns[2:] + ) + .collect() ) + + X_new = X_new_temp.select(cols) + artifacts = { - "_beta": X_new.select([entity_col, cs.ends_with("__beta")]) - .unique() - .collect(streaming=True), - "_alpha": X_new.select([entity_col, cs.ends_with("__alpha")]) - .unique() - .collect(streaming=True), - "X_new": X_new.select(X.columns), - "_firsts": X_new.group_by(entity_col) - .agg(pl.col(time_col).first().alias("first")) - .collect(streaming=True), + "_beta": X_new_temp.select(entity_col, cs.ends_with("__beta")).unique( + subset=[entity_col] + ), + "_alpha": X_new_temp.select(entity_col, cs.ends_with("__alpha")).unique( + subset=[entity_col] + ), + "X_new": X_new.lazy(), + "_firsts": X_new_temp.group_by(entity_col).agg( + pl.col(time_col).min().alias("first_fitted") + ), } elif method == "mean": _mean = X.group_by(entity_col).agg( @@ -787,48 +802,76 @@ def transform(X: pl.LazyFrame) -> pl.LazyFrame: def invert(state: ModelState, X: pl.LazyFrame) -> pl.LazyFrame: entity_col, time_col = X.columns[:2] + offset_n, offset_alias = _strip_freq_alias(freq) if method == "linear": _beta = state.artifacts["_beta"] _alpha = state.artifacts["_alpha"] firsts = state.artifacts["_firsts"] - if freq.endswith("i"): + if offset_alias == "i": offset_expr = ( - pl.int_ranges( - start=pl.col("first"), end=pl.col("last"), step=int(freq[:-1]) - ) - .len() - .alias("offset") - ) - else: + (pl.col("first") - pl.col("first_fitted")) // offset_n + ).alias("offset") + elif offset_alias == "d": offset_expr = ( - pl.date_ranges( - start=pl.col("first"), end=pl.col("last"), interval=freq - ) - .len() - .alias("offset") + (pl.col("first") - pl.col("first_fitted")).dt.total_days() + // offset_n + ).alias("offset") + elif offset_alias == "ms": + offset_expr = ( + (pl.col("first") - pl.col("first_fitted")).dt.total_milliseconds() + // offset_n + ).alias("offset") + elif offset_alias == "us": + offset_expr = ( + (pl.col("first") - pl.col("first_fitted")).dt.total_microseconds() + // offset_n + ).alias("offset") + elif offset_alias == "m": + offset_expr = ( + (pl.col("first") - pl.col("first_fitted")).dt.total_minutes() + // offset_n + ).alias("offset") + elif offset_alias == "s": + offset_expr = ( + (pl.col("first") - pl.col("first_fitted")).dt.total_seconds() + // offset_n + ).alias("offset") + elif offset_alias == "ns": + offset_expr = ( + (pl.col("first") - pl.col("first_fitted")).dt.total_nanoseconds() + // offset_n + ).alias("offset") + else: + raise ValueError( + f"Currently, the offset alias {offset_alias} is not supported in .invert()." ) - x = pl.col(time_col).arg_sort() + pl.col("offset") - offsets = ( + + grouped = ( X.group_by(entity_col) - .agg(pl.col(time_col).last().alias("last")) - .join(firsts.lazy(), on=entity_col) - .with_columns(offset_expr) - .collect(streaming=True) + .agg(pl.col(time_col).min().alias("first")) + .collect() ) + offsets = grouped.join(firsts, on=entity_col).with_columns(offset_expr) + # Note : pl.col(offset) here is generated above, then left joined to X. + # So there is no need to do over. + # In the code below, alpha, beta are all constant over entity because + # of the left join + x = pl.col(time_col).arg_sort().over(entity_col) + pl.col("offset") X_new = ( X.join(offsets.lazy(), on=entity_col, how="left") .join(_beta.lazy(), on=entity_col, how="left") .join(_alpha.lazy(), on=entity_col, how="left") .with_columns( [ - pl.col(col) - + pl.col(f"{col}__alpha") - + pl.col(f"{col}__beta") * x + ( + pl.col(col) + + pl.col(f"{col}__alpha") + + pl.col(f"{col}__beta") * x + ) for col in X.columns[2:] ] ) .select(X.columns) - .lazy() ) else: X_new = ( diff --git a/functime/type_alias.py b/functime/type_alias.py new file mode 100644 index 00000000..542bb08a --- /dev/null +++ b/functime/type_alias.py @@ -0,0 +1,10 @@ +import sys +from typing import Literal + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: # 3.9 + from typing_extensions import TypeAlias + + +DetrendMethod: TypeAlias = Literal["linear", "mean"] diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 00000000..9530847e --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,4 @@ +pytest +pytest-benchmark +pandas +pyarrow diff --git a/tests/conftest.py b/tests/conftest.py index 27b68f42..097f206e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,12 +12,12 @@ from functime.offsets import freq_to_sp -@pytest.fixture(params=[250, 1000], ids=lambda x: f"n_periods({x})") +@pytest.fixture(params=[50], ids=lambda x: f"n_periods({x})") def n_periods(request): return request.param -@pytest.fixture(params=[50, 500], ids=lambda x: f"n_entities({x})") +@pytest.fixture(params=[10], ids=lambda x: f"n_entities({x})") def n_entities(request): return request.param diff --git a/tests/test_preprocessing.py b/tests/test_preprocessing.py index 6e522f91..ac4393c7 100644 --- a/tests/test_preprocessing.py +++ b/tests/test_preprocessing.py @@ -267,6 +267,7 @@ def test_yeojohnson(pd_X): @pytest.mark.parametrize("method", ["linear", "mean"]) def test_detrend(method, pd_X): + X_original = pl.DataFrame(pd_X.reset_index()) entity_col = pd_X.index.names[0] expected = pd_X.groupby(entity_col).transform( signal.detrend, type=method if method == "linear" else "constant" @@ -274,9 +275,10 @@ def test_detrend(method, pd_X): X = pl.from_pandas(pd_X.reset_index()).lazy() transformer = detrend(method=method, freq="1d") X_new = X.pipe(transformer).collect() - assert_frame_equal(X_new, pl.DataFrame(expected.reset_index())) - X_original = X_new.pipe(transformer.invert) - assert_frame_equal(X_original, X, check_dtype=False) + assert_frame_equal(X_new, pl.DataFrame(expected.reset_index()), rtol=1e-10) + + X_inverted = X_new.pipe(transformer.invert).collect() + assert_frame_equal(X_original, X_inverted, check_dtype=False, rtol=1e-10) def pd_fractional_diff(df, d, thres): @@ -338,7 +340,7 @@ def test_fractional_diff(pd_X): ### Temporarily commented out. Uncomment when benchmarking is ready. ### -# @pytest.mark.benchmark(group="fractional_diff") +# #@pytest.mark.benchmark(group="fractional_diff") # def test_fractional_diff_benchmark_functime(pd_X, benchmark): # X = pl.from_pandas(pd_X.reset_index()).lazy() # entity_col = pd_X.index.names[0] @@ -348,6 +350,6 @@ def test_fractional_diff(pd_X): # benchmark(X_new.collect) -# @pytest.mark.benchmark(group="fractional_diff") +# #@pytest.mark.benchmark(group="fractional_diff") # def test_fractional_diff_benchmark_pd(pd_X, benchmark): # benchmark(pd_fractional_diff, pd_X, d=0.5, thres=1e-3) diff --git a/tests/test_tsfresh.py b/tests/test_tsfresh.py index 560ad611..e9b5a5e9 100644 --- a/tests/test_tsfresh.py +++ b/tests/test_tsfresh.py @@ -139,7 +139,7 @@ def test_streak_length_stats(S, params, res, k): # For no streaks, this returns an empty DataFrame. # But, will return 0s and Nulls for Series. # Hence, ad hoc way of making the expected result empty. - res = res.filter(pl.col("max").is_not_null()) + # res = res.filter(pl.col("max").is_not_null()) assert_frame_equal( pl.DataFrame({"a": S}) @@ -205,15 +205,13 @@ def test_mean_abs_change(S, res, k): ([-1, 1, 2, float("inf")], [float("inf")], {}), ([-1, 1, 2, -float("inf")], [-float("inf")], {}), ([1], [0], {"check_dtype": False}), - ([], [], {"check_dtype": False}), + ([], [0], {"check_dtype": False}), ], ) def test_mean_change(S, res, k): - # if len(res) == 0: - if len(res) == 0: - assert mean_change(pl.Series(S)) is None - else: - assert mean_change(pl.Series(S)) == res[0] + # If input is an empty or len 1 Series, the mean change is 0 because there is no change. + + assert mean_change(pl.Series(S)) == res[0] assert_frame_equal( pl.DataFrame({"a": S}).select(mean_change(pl.col("a"))),