Skip to content

Commit

Permalink
fix: Make detrend expr and off by one roll error (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
abstractqqq authored Dec 29, 2023
1 parent c6d5982 commit 9355ed8
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 83 deletions.
37 changes: 32 additions & 5 deletions functime/feature_extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from functime._functime_rust import rs_faer_lstsq1
from functime._utils import UseAtOwnRisk

from .type_alias import DetrendMethod

# from functime.feature_extractor import FeatureExtractor # noqa: F401

logger = logging.getLogger(__name__)
Expand All @@ -29,6 +31,7 @@
MAP_EXPR = Union[Mapping[str, float], pl.Expr]
MAP_LIST_EXPR = Union[Mapping[str, List[float]], pl.Expr]


# from polars.type_aliases import IntoExpr

lib = _get_shared_lib_location(__file__)
Expand Down Expand Up @@ -1076,14 +1079,12 @@ def mean_change(x: TIME_SERIES_T) -> FLOAT_EXPR:
float | Expr
"""
if isinstance(x, pl.Series):
if len(x) < 1:
return None
elif len(x) == 1:
if len(x) <= 1:
return 0
return (x[-1] - x[0]) / (x.len() - 1)
else:
return (
pl.when(x.len() - 1 > 0)
pl.when(x.len() > 1)
.then((x.last() - x.first()) / (x.len() - 1))
.otherwise(0)
)
Expand Down Expand Up @@ -1765,7 +1766,7 @@ def streak_length_stats(x: TIME_SERIES_T, above: bool, threshold: float) -> MAP_
}
else:
return pl.struct(
y.min().clip_min(0).alias("min"),
pl.max_horizontal(y.min(), 0).alias("min"),
y.max().alias("max"),
y.mean().alias("mean"),
y.std().alias("std"),
Expand Down Expand Up @@ -2274,6 +2275,32 @@ def linear_trend(self) -> pl.Expr:
"""
return linear_trend(self._expr)

def detrend(self, method: DetrendMethod = "linear") -> pl.Expr:
"""
Detrends the time series by either removing a fitted linear regression or by
removing the mean. This assumes that data is in order.
Parameters
----------
method
Either `linear` or `mean`
Returns
-------
An expression representing detrend-ed column
"""

if method == "linear":
N = self._expr.count()
x = pl.int_range(0, N, dtype=pl.Float64, eager=False)
coeff = pl.cov(self._expr, x) / x.var()
const = self._expr.mean() - coeff * (N - 1) / 2
return self._expr - x * coeff - const
elif method == "mean":
return self._expr - self._expr.mean()
else:
raise ValueError(f"Unknown detrend method: {method}")

def longest_streak_above_mean(self) -> pl.Expr:
"""
Returns the length of the longest consecutive subsequence in x that is greater than the mean of x.
Expand Down
171 changes: 107 additions & 64 deletions functime/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,26 +300,29 @@ def transform(X: pl.LazyFrame) -> pl.LazyFrame:
.group_by_dynamic(
index_column=time_col,
by=entity_col,
every=freq,
offset=f"{w}{offset_alias}",
every=f"1{offset_alias}",
period=f"{offset_n * w}{offset_alias}",
start_by="datapoint",
)
.agg([stat_exprs[stat](w) for stat in stats])
# NOTE: Must lag by 1 to avoid data leakage
# NOTE: Must lag by 1 to avoid data leakage.
# But given the current configuration, shift by w is what we want to do
.select([entity_col, time_col, values.shift(w).over(entity_col)])
)
for w in window_sizes
]
# Join all window lazy operations
X_rolling = X_all[0]
for X_window in X_all[1:]:
X_rolling = X_rolling.join(X_window, on=[entity_col, time_col], how="outer")
X_rolling = X_rolling.join(X_window, on=[entity_col, time_col], how="inner")
# Defensive join to match original X index
X_new = X.join(X_rolling, on=[entity_col, time_col], how="left").select(
X_rolling.columns
)
# X_new = X.join(X_rolling, on=[entity_col, time_col], how="left").select(
# X_rolling.columns
# )
if fill_strategy:
X_new = X_new.fill_null(strategy=fill_strategy)
artifacts = {"X_new": X_new}
X_rolling = X_X_rolling.fill_null(strategy=fill_strategy)
artifacts = {"X_new": X_rolling}
return artifacts

return transform
Expand Down Expand Up @@ -734,41 +737,53 @@ def detrend(freq: str, method: Literal["linear", "mean"] = "linear"):
def transform(X: pl.LazyFrame) -> pl.LazyFrame:
entity_col, time_col = X.columns[:2]
if method == "linear":
x = pl.col(time_col).arg_sort()
betas = [
(pl.cov(col, x) / x.var()).over(entity_col).alias(f"{col}__beta")
for col in X.columns[2:]
]
alphas = [
(
pl.col(col).mean().over(entity_col)
- pl.col(f"{col}__beta") * x.mean()
).alias(f"{col}__alpha")
for col in X.columns[2:]
]
residuals = [
(
pl.col(col) - pl.col(f"{col}__alpha") - pl.col(f"{col}__beta") * x
).alias(col)
for col in X.columns[2:]
]
X_new = (
X.with_columns(betas)
.with_columns(alphas)
.with_columns(residuals)
.cache()
cols = X.columns
X_new_temp = (
X.with_columns(
pl.col(time_col).arg_sort().over(entity_col).alias("__x")
)
.with_columns(
*[
(pl.cov(pl.col(c), pl.col("__x")) / pl.col("__x").var())
.over(entity_col)
.name.suffix("__beta")
for c in X.columns[2:]
],
*[
pl.col(c).mean().over(entity_col).name.suffix("__mean")
for c in X.columns[2:]
],
)
.with_columns(
(pl.col(c + "__mean") - pl.col(c + "__beta") * (pl.count() - 1) / 2)
.over(entity_col)
.alias(c + "__alpha")
for c in X.columns[2:]
)
.with_columns(
(
pl.col(c)
- pl.col(c + "__beta") * pl.col("__x")
- pl.col(c + "__alpha")
).alias(c)
for c in X.columns[2:]
)
.collect()
)

X_new = X_new_temp.select(cols)

artifacts = {
"_beta": X_new.select([entity_col, cs.ends_with("__beta")])
.unique()
.collect(streaming=True),
"_alpha": X_new.select([entity_col, cs.ends_with("__alpha")])
.unique()
.collect(streaming=True),
"X_new": X_new.select(X.columns),
"_firsts": X_new.group_by(entity_col)
.agg(pl.col(time_col).first().alias("first"))
.collect(streaming=True),
"_beta": X_new_temp.select(entity_col, cs.ends_with("__beta")).unique(
subset=[entity_col]
),
"_alpha": X_new_temp.select(entity_col, cs.ends_with("__alpha")).unique(
subset=[entity_col]
),
"X_new": X_new.lazy(),
"_firsts": X_new_temp.group_by(entity_col).agg(
pl.col(time_col).min().alias("first_fitted")
),
}
elif method == "mean":
_mean = X.group_by(entity_col).agg(
Expand All @@ -787,48 +802,76 @@ def transform(X: pl.LazyFrame) -> pl.LazyFrame:

def invert(state: ModelState, X: pl.LazyFrame) -> pl.LazyFrame:
entity_col, time_col = X.columns[:2]
offset_n, offset_alias = _strip_freq_alias(freq)
if method == "linear":
_beta = state.artifacts["_beta"]
_alpha = state.artifacts["_alpha"]
firsts = state.artifacts["_firsts"]
if freq.endswith("i"):
if offset_alias == "i":
offset_expr = (
pl.int_ranges(
start=pl.col("first"), end=pl.col("last"), step=int(freq[:-1])
)
.len()
.alias("offset")
)
else:
(pl.col("first") - pl.col("first_fitted")) // offset_n
).alias("offset")
elif offset_alias == "d":
offset_expr = (
pl.date_ranges(
start=pl.col("first"), end=pl.col("last"), interval=freq
)
.len()
.alias("offset")
(pl.col("first") - pl.col("first_fitted")).dt.total_days()
// offset_n
).alias("offset")
elif offset_alias == "ms":
offset_expr = (
(pl.col("first") - pl.col("first_fitted")).dt.total_milliseconds()
// offset_n
).alias("offset")
elif offset_alias == "us":
offset_expr = (
(pl.col("first") - pl.col("first_fitted")).dt.total_microseconds()
// offset_n
).alias("offset")
elif offset_alias == "m":
offset_expr = (
(pl.col("first") - pl.col("first_fitted")).dt.total_minutes()
// offset_n
).alias("offset")
elif offset_alias == "s":
offset_expr = (
(pl.col("first") - pl.col("first_fitted")).dt.total_seconds()
// offset_n
).alias("offset")
elif offset_alias == "ns":
offset_expr = (
(pl.col("first") - pl.col("first_fitted")).dt.total_nanoseconds()
// offset_n
).alias("offset")
else:
raise ValueError(
f"Currently, the offset alias {offset_alias} is not supported in .invert()."
)
x = pl.col(time_col).arg_sort() + pl.col("offset")
offsets = (

grouped = (
X.group_by(entity_col)
.agg(pl.col(time_col).last().alias("last"))
.join(firsts.lazy(), on=entity_col)
.with_columns(offset_expr)
.collect(streaming=True)
.agg(pl.col(time_col).min().alias("first"))
.collect()
)
offsets = grouped.join(firsts, on=entity_col).with_columns(offset_expr)
# Note : pl.col(offset) here is generated above, then left joined to X.
# So there is no need to do over.
# In the code below, alpha, beta are all constant over entity because
# of the left join
x = pl.col(time_col).arg_sort().over(entity_col) + pl.col("offset")
X_new = (
X.join(offsets.lazy(), on=entity_col, how="left")
.join(_beta.lazy(), on=entity_col, how="left")
.join(_alpha.lazy(), on=entity_col, how="left")
.with_columns(
[
pl.col(col)
+ pl.col(f"{col}__alpha")
+ pl.col(f"{col}__beta") * x
(
pl.col(col)
+ pl.col(f"{col}__alpha")
+ pl.col(f"{col}__beta") * x
)
for col in X.columns[2:]
]
)
.select(X.columns)
.lazy()
)
else:
X_new = (
Expand Down
10 changes: 10 additions & 0 deletions functime/type_alias.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import sys
from typing import Literal

if sys.version_info >= (3, 10):
from typing import TypeAlias
else: # 3.9
from typing_extensions import TypeAlias


DetrendMethod: TypeAlias = Literal["linear", "mean"]
4 changes: 4 additions & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pytest
pytest-benchmark
pandas
pyarrow
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
from functime.offsets import freq_to_sp


@pytest.fixture(params=[250, 1000], ids=lambda x: f"n_periods({x})")
@pytest.fixture(params=[50], ids=lambda x: f"n_periods({x})")
def n_periods(request):
return request.param


@pytest.fixture(params=[50, 500], ids=lambda x: f"n_entities({x})")
@pytest.fixture(params=[10], ids=lambda x: f"n_entities({x})")
def n_entities(request):
return request.param

Expand Down
12 changes: 7 additions & 5 deletions tests/test_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,18 @@ def test_yeojohnson(pd_X):

@pytest.mark.parametrize("method", ["linear", "mean"])
def test_detrend(method, pd_X):
X_original = pl.DataFrame(pd_X.reset_index())
entity_col = pd_X.index.names[0]
expected = pd_X.groupby(entity_col).transform(
signal.detrend, type=method if method == "linear" else "constant"
)
X = pl.from_pandas(pd_X.reset_index()).lazy()
transformer = detrend(method=method, freq="1d")
X_new = X.pipe(transformer).collect()
assert_frame_equal(X_new, pl.DataFrame(expected.reset_index()))
X_original = X_new.pipe(transformer.invert)
assert_frame_equal(X_original, X, check_dtype=False)
assert_frame_equal(X_new, pl.DataFrame(expected.reset_index()), rtol=1e-10)

X_inverted = X_new.pipe(transformer.invert).collect()
assert_frame_equal(X_original, X_inverted, check_dtype=False, rtol=1e-10)


def pd_fractional_diff(df, d, thres):
Expand Down Expand Up @@ -338,7 +340,7 @@ def test_fractional_diff(pd_X):


### Temporarily commented out. Uncomment when benchmarking is ready. ###
# @pytest.mark.benchmark(group="fractional_diff")
# #@pytest.mark.benchmark(group="fractional_diff")
# def test_fractional_diff_benchmark_functime(pd_X, benchmark):
# X = pl.from_pandas(pd_X.reset_index()).lazy()
# entity_col = pd_X.index.names[0]
Expand All @@ -348,6 +350,6 @@ def test_fractional_diff(pd_X):
# benchmark(X_new.collect)


# @pytest.mark.benchmark(group="fractional_diff")
# #@pytest.mark.benchmark(group="fractional_diff")
# def test_fractional_diff_benchmark_pd(pd_X, benchmark):
# benchmark(pd_fractional_diff, pd_X, d=0.5, thres=1e-3)
12 changes: 5 additions & 7 deletions tests/test_tsfresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def test_streak_length_stats(S, params, res, k):
# For no streaks, this returns an empty DataFrame.
# But, will return 0s and Nulls for Series.
# Hence, ad hoc way of making the expected result empty.
res = res.filter(pl.col("max").is_not_null())
# res = res.filter(pl.col("max").is_not_null())

assert_frame_equal(
pl.DataFrame({"a": S})
Expand Down Expand Up @@ -205,15 +205,13 @@ def test_mean_abs_change(S, res, k):
([-1, 1, 2, float("inf")], [float("inf")], {}),
([-1, 1, 2, -float("inf")], [-float("inf")], {}),
([1], [0], {"check_dtype": False}),
([], [], {"check_dtype": False}),
([], [0], {"check_dtype": False}),
],
)
def test_mean_change(S, res, k):
# if len(res) == 0:
if len(res) == 0:
assert mean_change(pl.Series(S)) is None
else:
assert mean_change(pl.Series(S)) == res[0]
# If input is an empty or len 1 Series, the mean change is 0 because there is no change.

assert mean_change(pl.Series(S)) == res[0]

assert_frame_equal(
pl.DataFrame({"a": S}).select(mean_change(pl.col("a"))),
Expand Down

0 comments on commit 9355ed8

Please sign in to comment.