Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Streak lengths #100

Merged
merged 5 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 72 additions & 42 deletions functime/feature_extraction/tsfresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def absolute_energy(x: TIME_SERIES_T) -> FLOAT_INT_EXPR:
float | Expr
"""
if isinstance(x, pl.Series):
if x.len() > 300_000: # Would be different for different machines
return np.dot(x,x)
if x.len() > 300_000: # Would be different for different machines
return np.dot(x, x)
return x.dot(x)


Expand Down Expand Up @@ -244,20 +244,20 @@ def autoregressive_coefficients(x: TIME_SERIES_T, n_lags: int) -> List[float]:
"""
if isinstance(x, pl.Series):
length = len(x) - n_lags
y = x.fill_null(0.)
y = x.fill_null(0.0)
data_x = (
y.to_frame()
.select(
*(
pl.col(y.name).slice(n_lags - i, length=length).alias(str(i))
for i in range(1, n_lags + 1)
),
pl.lit(1)
pl.lit(1),
)
.to_numpy()
)
y_ = y.tail(length).to_numpy(zero_copy_only=True).reshape((-1,1))
out:np.ndarray = rs_faer_lstsq(data_x, y_)
y_ = y.tail(length).to_numpy(zero_copy_only=True).reshape((-1, 1))
out: np.ndarray = rs_faer_lstsq(data_x, y_)
return out.ravel()
else:
logger.info("Expression version of autoregressive_coefficients is not yet implemented due to "
Expand Down Expand Up @@ -445,6 +445,7 @@ def change_quantiles(

return expr.implode()


# Revisit later.
def cid_ce(x: TIME_SERIES_T, normalize: bool = False) -> FLOAT_EXPR:
"""
Expand Down Expand Up @@ -495,6 +496,7 @@ def count_above(x: TIME_SERIES_T, threshold: float = 0.0) -> FLOAT_EXPR:
"""
return 100 * (x >= threshold).sum() / x.len()


# Should this be percentage?
def count_above_mean(x: TIME_SERIES_T) -> INT_EXPR:
"""
Expand Down Expand Up @@ -787,15 +789,15 @@ def index_mass_quantile(x: TIME_SERIES_T, q: float) -> FLOAT_EXPR:
x_cumsum = x.abs().cumsum().set_sorted()
if isinstance(x, pl.Series):
if x.is_integer():
idx = x_cumsum.search_sorted(int(q*x_cumsum[-1]) + 1, "left")
else: # Search sorted is sensitive to dtype.
idx = x_cumsum.search_sorted(q*x_cumsum[-1], "left")
idx = x_cumsum.search_sorted(int(q * x_cumsum[-1]) + 1, "left")
else: # Search sorted is sensitive to dtype.
idx = x_cumsum.search_sorted(q * x_cumsum[-1], "left")
return (idx + 1) / x.len()
else:
# In lazy case we have to cast, which kind of wasts some time, but this is on
# par with the old implementation. Use max here because... believe it or not
# max (with fast track because I did set_sorted()) is faster than .last() ???
idx = x_cumsum.cast(pl.Float64).search_sorted(q*x_cumsum.max(), "left")
idx = x_cumsum.cast(pl.Float64).search_sorted(q * x_cumsum.max(), "left")
return (idx + 1) / x.len()


Expand Down Expand Up @@ -864,7 +866,7 @@ def last_location_of_minimum(x: TIME_SERIES_T) -> FLOAT_EXPR:


def lempel_ziv_complexity(
x: TIME_SERIES_T, threshold: Union[float, pl.Expr], as_ratio:bool=True
x: TIME_SERIES_T, threshold: Union[float, pl.Expr], as_ratio: bool = True
) -> FLOAT_EXPR:
"""
Calculate a complexity estimate based on the Lempel-Ziv compression algorithm. The
Expand All @@ -880,7 +882,7 @@ def lempel_ziv_complexity(
Either a number, or an expression representing a comparable quantity. If x > value,
then it will be binarized as 1 and 0 otherwise. If x is eager, then value must also
be eager as well.
as_ratio: bool
as_ratio: bool
If true, return the complexity / length of sequence

Returns
Expand Down Expand Up @@ -920,13 +922,15 @@ def linear_trend(x: TIME_SERIES_T) -> MAP_EXPR:
if isinstance(x, pl.Series):
n = x.len()
if n == 1:
return {"slope": np.nan, "intercept":np.nan, "rss": np.nan}
return {"slope": np.nan, "intercept": np.nan, "rss": np.nan}

m = n - 1
x_range_mean = m / 2
x_range = np.arange(start=0, stop=n)
x_mean = x.mean()
beta = np.dot(x - x_mean, x_range - x_range_mean) / (m * np.var(x_range, ddof=1))
beta = np.dot(x - x_mean, x_range - x_range_mean) / (
m * np.var(x_range, ddof=1)
)
alpha = x_mean - beta * x_range_mean
resid = x - (beta * x_range + alpha)
return {"slope": beta, "intercept": alpha, "rss": np.dot(resid, resid)}
Expand Down Expand Up @@ -964,7 +968,12 @@ def longest_streak_above_mean(x: TIME_SERIES_T) -> INT_EXPR:
result = y.filter(y.struct.field("values")).struct.field("lengths").max()
return 0 if result is None else result
else:
return y.filter(y.struct.field("values")).struct.field("lengths").max().fill_null(0)
return (
y.filter(y.struct.field("values"))
.struct.field("lengths")
.max()
.fill_null(0)
)


def longest_streak_below_mean(x: TIME_SERIES_T) -> INT_EXPR:
Expand All @@ -988,7 +997,13 @@ def longest_streak_below_mean(x: TIME_SERIES_T) -> INT_EXPR:
result = y.filter(y.struct.field("values")).struct.field("lengths").max()
return 0 if result is None else result
else:
return y.filter(y.struct.field("values")).struct.field("lengths").max().fill_null(0)
return (
y.filter(y.struct.field("values"))
.struct.field("lengths")
.max()
.fill_null(0)
)


def mean_abs_change(x: TIME_SERIES_T) -> FLOAT_EXPR:
"""
Expand Down Expand Up @@ -1042,7 +1057,11 @@ def mean_change(x: TIME_SERIES_T) -> FLOAT_EXPR:
return 0
return (x[-1] - x[0]) / (x.len() - 1)
else:
return pl.when(x.len() - 1 > 0).then((x.last() - x.first()) / (x.len() - 1)).otherwise(0)
return (
pl.when(x.len() - 1 > 0)
.then((x.last() - x.first()) / (x.len() - 1))
.otherwise(0)
)


def mean_n_absolute_max(x: TIME_SERIES_T, n_maxima: int) -> FLOAT_EXPR:
Expand Down Expand Up @@ -1083,10 +1102,10 @@ def mean_second_derivative_central(x: TIME_SERIES_T) -> FLOAT_EXPR:
return np.nan
return (x[-1] - x[-2] - x[1] + x[0]) / (2 * (x.len() - 2))
else:
return pl.when(x.len() < 3).then(
np.nan
).otherwise(
x.tail(2).sub(x.head(2)).diff().last() / (2 * (x.len() - 2))
return (
pl.when(x.len() < 3)
.then(np.nan)
.otherwise(x.tail(2).sub(x.head(2)).diff().last() / (2 * (x.len() - 2)))
)


Expand All @@ -1112,6 +1131,7 @@ def number_crossings(x: TIME_SERIES_T, crossing_value: float = 0.0) -> FLOAT_EXP
return y.eq(y.shift(1)).not_().sum()



def number_cwt_peaks(x: pl.Series, max_width: int = 5) -> float:
"""
Number of different peaks in x.
Expand Down Expand Up @@ -1266,28 +1286,28 @@ def permutation_entropy(
expr = permutation_entropy(pl.col(x.name), tau=tau, n_dims=n_dims, base=base)
return frame.select(expr).item(0, 0)
else:
if tau == 1: # Fast track the most common use case
if tau == 1: # Fast track the most common use case
return (
pl.concat_list(
x,
*(x.shift(-i) for i in range(1, n_dims))
).head(x.len() - n_dims + 1)
.list.eval(
pl.element().arg_sort()
).value_counts() # groupby and count, but returns a struct
.struct.field("counts") # extract the field named "counts"
pl.concat_list(x, *(x.shift(-i) for i in range(1, n_dims)))
.head(x.len() - n_dims + 1)
.list.eval(pl.element().arg_sort())
.value_counts() # groupby and count, but returns a struct
.struct.field("counts") # extract the field named "counts"
.entropy(base=base, normalize=True)
)
else:
max_shift = -n_dims + 1
return (
pl.concat_list(
x.take_every(tau),
*(x.shift(-i).take_every(tau) for i in range(1, n_dims))
).filter( # This is the correct way to filter (with respect to tau) in this case.
pl.repeat(True, x.len()).shift(max_shift, fill_value=False)
*(x.shift(-i).take_every(tau) for i in range(1, n_dims)),
)
.filter( # This is the correct way to filter (with respect to tau) in this case.
pl.repeat(True, x.len())
.shift(max_shift, fill_value=False)
.take_every(tau)
).list.eval(
)
.list.eval(
pl.element().arg_sort()
) # for each inner list, do an argsort
.value_counts() # groupby and count, but returns a struct
Expand Down Expand Up @@ -1398,7 +1418,7 @@ def _into_sequential_chunks(x: pl.Series, m: int) -> np.ndarray:


# Only works on series
def sample_entropy(x: TIME_SERIES_T, ratio: float = 0.2, m:int = 2) -> FLOAT_EXPR:
def sample_entropy(x: TIME_SERIES_T, ratio: float = 0.2, m: int = 2) -> FLOAT_EXPR:
"""
Calculate the sample entropy of a time series.
This only works for Series input right now.
Expand Down Expand Up @@ -1633,7 +1653,7 @@ def harmonic_mean(x: TIME_SERIES_T) -> FLOAT_EXPR:
-------
float | Expr
"""
return x.len() / (1.0/x).sum()
return x.len() / (1.0 / x).sum()


def range_over_mean(x: TIME_SERIES_T) -> FLOAT_EXPR:
Expand Down Expand Up @@ -1705,25 +1725,25 @@ def streak_length_stats(x: TIME_SERIES_T, above: bool, threshold: float) -> MAP_
y = y.filter(y.struct.field("values")).struct.field("lengths")
if isinstance(x, pl.Series):
return {
"min": y.min(),
"min": y.min() or 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know Python supports this lol 👍

"max": y.max(),
"mean": y.mean(),
"std": y.std(),
"10-percentile": y.quantile(0.1),
"median": y.median(),
"90-percentile": y.quantile(0.9),
"mode": y.mode()[0],
"mode": y.mode().sort()[0] if not y.is_empty() else None,
}
else:
return pl.struct(
y.min().alias("min"),
y.min().clip_min(0).alias("min"),
y.max().alias("max"),
y.mean().alias("mean"),
y.std().alias("std"),
y.quantile(0.1).alias("10-percentile"),
y.median().alias("median"),
y.quantile(0.9).alias("90-percentile"),
y.mode().first().alias("mode"),
y.mode().sort(nulls_last=True).first().alias("mode"),
)


Expand All @@ -1750,7 +1770,12 @@ def longest_streak_above(x: TIME_SERIES_T, threshold: float) -> TIME_SERIES_T:
streak_max = y.filter(y.struct.field("values")).struct.field("lengths").max()
return 0 if streak_max is None else streak_max
else:
return y.filter(y.struct.field("values")).struct.field("lengths").max().fill_null(0)
return (
y.filter(y.struct.field("values"))
.struct.field("lengths")
.max()
.fill_null(0)
)


def longest_streak_below(x: TIME_SERIES_T, threshold: float) -> TIME_SERIES_T:
Expand All @@ -1775,7 +1800,12 @@ def longest_streak_below(x: TIME_SERIES_T, threshold: float) -> TIME_SERIES_T:
streak_max = y.filter(y.struct.field("values")).struct.field("lengths").max()
return 0 if streak_max is None else streak_max
else:
return y.filter(y.struct.field("values")).struct.field("lengths").max().fill_null(0)
return (
y.filter(y.struct.field("values"))
.struct.field("lengths")
.max()
.fill_null(0)
)


def longest_winning_streak(x: TIME_SERIES_T) -> TIME_SERIES_T:
Expand Down
Loading