Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fractional differencing function and test #104

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 93 additions & 2 deletions functime/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,6 @@ def deseasonalize_fourier(sp: int, K: int, robust: bool = False):
regressor_cls = TheilSenRegressor

def transform(X: pl.LazyFrame) -> pl.LazyFrame:

X = X.collect() # Not lazy
if X.shape[1] > 3:
raise ValueError(
Expand Down Expand Up @@ -945,7 +944,6 @@ def _deseasonalize(inputs: pl.Series):
return artifacts

def invert(state: ModelState, X: pl.LazyFrame) -> pl.LazyFrame:

X = X.collect()
entity_col, time_col, target_col = X.columns[:3]

Expand Down Expand Up @@ -996,3 +994,96 @@ def _reseasonalize(inputs: Mapping[str, Any]):
return y_new.lazy()

return transform, invert


@transformer
def fractional_diff(
d: float, min_weight: float | None = None, window_size: int | None = None
):
"""Compute the fractional differential of a time series.

This particular functionality is referenced in Advances in Financial Machine
Learning by Marcos Lopez de Prado (2018).

For feature creation purposes, it is suggested that the minimum value of d
is used that removes stationarity from the time series. This can be achieved
by running the augmented dickey-fuller test on the time series for different
values of d and selecting the minimum value that makes the time series
stationary.

Parameters
----------
d : float
The fractional order of the differencing operator.
min_weight : float, optional
The minimum weight to use for calculations. If specified, the window size is
computed from this value and not needed.
window_size : int, optional
The window size of the fractional differencing operator.
If specified, the minimum weight is not needed.
"""
if min_weight is None and window_size is None:
raise ValueError("Either `min_weight` or `window_size` must be specified.")

if min_weight is not None and window_size is not None:
raise ValueError("Only one of `min_weight` or `window_size` must be specified.")

def transform(X: pl.LazyFrame) -> pl.LazyFrame:
idx_cols = X.columns[:2]
entity_col = idx_cols[0]
time_col = idx_cols[1]

def get_ffd_weights(
d: float, threshold: float | None = None, window_size: int | None = None
):
w, k = [1.0], 1
while True:
w_ = -w[-1] / k * (d - k + 1)
if threshold is not None and abs(w_) < threshold:
break
if window_size is not None and k >= window_size:
break
w.append(w_)
k += 1
return w

weights = get_ffd_weights(d, min_weight, window_size)

num_cols = X.select(PL_NUMERIC_COLS(entity_col, time_col)).columns
X_new = (
X.sort(time_col)
.with_columns(
pl.col(time_col).cumcount().over(entity_col).alias("__FT_time_ind"),
)
.with_columns(
*[
pl.col(f"{col}")
.shift(i)
.over(entity_col)
.alias(f"__FT_{col}_t-{i}")
for i in range(len(weights))
for col in num_cols
]
)
.with_columns(
*[
pl.sum_horizontal(
[pl.col(f"__FT_{col}_t-{i}") * w for i, w in enumerate(weights)]
).alias(col)
for col in num_cols
]
)
.with_columns(
*[
pl.when(pl.col("__FT_time_ind") < (len(weights) - 1))
.then(None)
.otherwise(pl.col(f"{col}"))
.alias(f"{col}")
for col in num_cols
],
)
.select(~cs.contains("__FT_"))
)
return {"X_new": X_new}

return transform
85 changes: 84 additions & 1 deletion tests/test_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import PowerTransformer

from functime.preprocessing import boxcox, detrend, diff, lag, roll, scale, yeojohnson
from functime.preprocessing import (
boxcox,
detrend,
diff,
fractional_diff,
lag,
roll,
scale,
yeojohnson,
)


@pytest.fixture
Expand Down Expand Up @@ -268,3 +277,77 @@ def test_detrend(method, pd_X):
assert_frame_equal(X_new, pl.DataFrame(expected.reset_index()))
X_original = X_new.pipe(transformer.invert)
assert_frame_equal(X_original, X, check_dtype=False)


def pd_fractional_diff(df, d, thres):
"""Pandas implementation of fracdiff from Marcos Lopez de Prado."""

def getWeights_FFD(d, thres):
# thres>0 drops insignificant weights
w, k = [1.0], 1
while True:
w_ = -w[-1] / k * (d - k + 1)
if abs(w_) < thres:
break
w.append(w_)
k += 1
w = np.array(w[::-1]).reshape(-1, 1)
return w

def fracDiff_FFD(series, d, thres=1e-5):
# 1) Compute weights for the longest series
w = getWeights_FFD(d, thres)
width = len(w) - 1
# 2) Apply weights to values
df = {"time": series[series.columns[0]]}
for name in series.columns[1:]:
seriesF = series[[name]].dropna()
df_ = pd.Series()
for iloc1 in range(width, seriesF.shape[0]):
loc0, loc1 = seriesF.index[iloc1 - width], seriesF.index[iloc1]
if not np.isfinite(series.loc[loc1, name]):
continue # exclude NAs
df_[loc1] = np.dot(w.T, seriesF.loc[loc0:loc1])[0, 0]
df[name] = df_.copy(deep=True)
df = pd.concat(df, axis=1)
return df

numeric_cols = df.select_dtypes(include=["float"]).columns
cols = [df.index.names[1]]
cols.extend(numeric_cols)
return (
df.reset_index().groupby(df.index.names[0])[cols].apply(fracDiff_FFD, d, thres)
)


def test_fractional_diff(pd_X):
X = pl.from_pandas(pd_X.reset_index()).lazy()
entity_col = pd_X.index.names[0]
time_col = pd_X.index.names[1]
transformer = fractional_diff(d=0.5, min_weight=1e-3)
X_new = X.pipe(transformer).collect()
expected = (
pd_fractional_diff(pd_X, d=0.5, thres=1e-3)
.reset_index()
.drop(columns="level_1")
)
assert_frame_equal(
X_new.drop_nulls().sort(entity_col, time_col),
pl.DataFrame(expected).drop_nulls().sort(entity_col, time_col),
)


### Temporarily commented out. Uncomment when benchmarking is ready. ###
# @pytest.mark.benchmark(group="fractional_diff")
# def test_fractional_diff_benchmark_functime(pd_X, benchmark):
# X = pl.from_pandas(pd_X.reset_index()).lazy()
# entity_col = pd_X.index.names[0]
# time_col = pd_X.index.names[1]
# transformer = fractional_diff(d=0.5, min_weight=1e-3)
# X_new = X.pipe(transformer)
# benchmark(X_new.collect)


# @pytest.mark.benchmark(group="fractional_diff")
# def test_fractional_diff_benchmark_pd(pd_X, benchmark):
# benchmark(pd_fractional_diff, pd_X, d=0.5, thres=1e-3)