Skip to content

Commit

Permalink
feat: Rustify lempel_ziv_complexity (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
abstractqqq authored Nov 1, 2023
1 parent 14fa081 commit c5a87f6
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 121 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "functime"
name = "_functime_rust"
crate-type = ["cdylib"]

[dependencies]
pyo3 = {version = "0.20.0", features = ["extension-module"]}
pyo3-polars = {version = "0.8.0", features = ["derive"]}
polars = {version = "*", features = ["fmt", "performant", "chunked_ids", "lazy", "zip_with", "random"]}
polars-core = "*"
polars-lazy = "*"
faer = {version = "0.14.1", features = ["ndarray"]}
ndarray = "0.15.6"
numpy = "0.20.0"
serde = {version = "1.0.190", features=["derive"]}
32 changes: 14 additions & 18 deletions functime/feature_extraction/tsfresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
from scipy.signal import find_peaks_cwt, ricker, welch
from scipy.spatial import KDTree

from functime._functime_rust import rs_faer_lstsq, rs_lempel_ziv_complexity
from functime._functime_rust import rs_faer_lstsq1
from functime._utils import UseAtOwnRisk
from functime.feature_extractor import FeatureExtractor # noqa: F401

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -262,8 +263,8 @@ def autoregressive_coefficients(x: TIME_SERIES_T, n_lags: int) -> List[float]:
)
.to_numpy()
)
y_ = y.tail(length).to_numpy(zero_copy_only=True).reshape((-1, 1))
out: np.ndarray = rs_faer_lstsq(data_x, y_)
y_ = y.tail(length).to_numpy(zero_copy_only=True).reshape((-1,1))
out:np.ndarray = rs_faer_lstsq1(data_x, y_)
return out.ravel()
else:
logger.info(
Expand Down Expand Up @@ -884,9 +885,9 @@ def lempel_ziv_complexity(
) -> FLOAT_EXPR:
"""
Calculate a complexity estimate based on the Lempel-Ziv compression algorithm. The
implementation here is currently taken from Lilian Besson. See the reference section
below. Instead of return the complexity value, we return a ratio w.r.t the length of
the input series.
implementation here is currently a Rust rewrite of Lilian Besson'code. See the reference
section below. Instead of return the complexity value, we return a ratio w.r.t the length of
the input series. If null is encountered, it will be interpreted as 0 in the bit sequence.
Parameters
----------
Expand All @@ -909,17 +910,12 @@ def lempel_ziv_complexity(
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv_complexity
"""
if isinstance(x, pl.Series):
b = bytes(x > threshold)
c = rs_lempel_ziv_complexity(b)
if as_ratio:
return c / x.len()
return c
frame = x.to_frame()
return frame.select(
pl.col(x.name).ts.lempel_ziv_complexity(threshold, as_ratio)
).item(0,0)
else:
logger.info(
"Expression version of lempel_ziv_complexity is not yet implemented due to "
"technical difficulty regarding Polars Expression Plugins."
)
return NotImplemented
return x.ts.lempel_ziv_complexity(threshold, as_ratio)


def linear_trend(x: TIME_SERIES_T) -> MAP_EXPR:
Expand Down Expand Up @@ -1424,7 +1420,7 @@ def _into_sequential_chunks(x: pl.Series, m: int) -> np.ndarray:
df = (
x.to_frame()
.select(
pl.col(name), *(pl.col(name).shift(-i).suffix(str(i)) for i in range(1, m))
pl.col(name), *(pl.col(name).shift(-i).name.suffix(str(i)) for i in range(1, m))
)
.slice(0, n_rows)
)
Expand Down Expand Up @@ -1466,7 +1462,7 @@ def sample_entropy(x: TIME_SERIES_T, ratio: float = 0.2, m: int = 2) -> FLOAT_EX
)
- mat.shape[0]
)
mat = _into_sequential_chunks(x, m + 1) #
mat = _into_sequential_chunks(x, m + 1)
tree = KDTree(mat)
a = (
np.sum(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import math
from typing import Union

import polars as pl
from polars.type_aliases import ClosedInterval
from polars.utils.udfs import _get_shared_lib_location

import functime.feature_extraction.tsfresh as f

# from polars.type_aliases import IntoExpr
# from polars.utils.udfs import _get_shared_lib_location

# lib = _get_shared_lib_location(__file__)
lib = _get_shared_lib_location(__file__)


@pl.api.register_expr_namespace("ts")
Expand Down Expand Up @@ -83,18 +84,6 @@ def benford_correlation(self) -> pl.Expr:
"""
return f.benford_correlation(self._expr)

def benford_correlation2(self) -> pl.Expr:
"""
Returns the correlation between the first digit distribution of the input time series and
the Newcomb-Benford's Law distribution. This version may be numerically unstable due to float
point precision issue, but is faster for bigger data.
Returns
-------
An expression of the output
"""
return f.benford_correlation2(self._expr)

def binned_entropy(self, bin_count: int = 10) -> pl.Expr:
"""
Calculates the entropy of a binned histogram for a given time series. It is highly recommended
Expand Down Expand Up @@ -345,26 +334,41 @@ def last_location_of_minimum(self) -> pl.Expr:
"""
return f.last_location_of_minimum(self._expr)

# def lempel_ziv_complexity(self, threshold:float, as_ratio:bool=True) -> pl.Expr:
# """
# Returns the Lempel Ziv Complexity. This will binarize the function and if value > threshold,
# this the binarized value will be 1 and otherwise 0. Null will be treated as 0s in the binarization.

# Parameters
# ----------
# thrshold
# A python literal or a Polars Expression to compare with
# as_ratio
# If true, return the complexity divided length of the sequence
# """
# out = (self._expr > threshold)._register_plugin(
# lib=lib,
# symbol="pl_lempel_ziv_complexity",
# is_elementwise=True,
# )
# if as_ratio:
# return out / self._expr.len()
# return out
def lempel_ziv_complexity(self, threshold:Union[float, pl.Expr], as_ratio:bool=True) -> pl.Expr:
"""
Calculate a complexity estimate based on the Lempel-Ziv compression algorithm. The
implementation here is currently a Rust rewrite of Lilian Besson'code. Instead of returning
the complexity value, we return a ratio w.r.t the length of the input series. If null is
encountered, it will be interpreted as 0 in the bit sequence.
Parameters
----------
x : pl.Expr | pl.Series
Input time-series.
threshold: float | pl.Expr
Either a number, or an expression representing a comparable quantity. If x > threshold,
then it will be binarized as 1 and 0 otherwise.
as_ratio: bool
If true, return the complexity divided by length of sequence
Returns
-------
Expr
Reference
---------
https://github.com/Naereen/Lempel-Ziv_Complexity/tree/master
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv_complexity
"""
out = (self._expr > threshold).register_plugin(
lib=lib,
symbol="pl_lempel_ziv_complexity",
is_elementwise=False,
returns_scalar=True
)
if as_ratio:
return out / self._expr.len()
return out

def linear_trend(self) -> pl.Expr:
"""
Expand Down
22 changes: 11 additions & 11 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ classifiers = [
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
"bottleneck",
"dask",
"flaml<3,>=2.0.2",
"holidays",
"numpy",
"polars>=0.19.11",
"scikit-learn<2,>=1.2.2",
"scipy",
"tqdm",
"typing-extensions",
"zarr",
"dask",
"bottleneck",
"flaml>=2.0.2,<3",
"holidays",
"numpy",
"polars>=0.19.12",
"scikit-learn>=1.2.2,<2",
"scipy",
"tqdm",
"typing-extensions",
"zarr",
]
[project.optional-dependencies]
ann = [
Expand Down
83 changes: 64 additions & 19 deletions src/feature_extraction/feature_extractor.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
use pyo3::prelude::*;
use polars_core::prelude::*;
//use polars_lazy::prelude::*;
use polars_core::prelude::*;
//use polars_lazy::prelude::*;
use pyo3_polars::{derive::polars_expr, export::polars_core::{series::Series, prelude::{*}}};
use std::collections::HashSet;

#[pyfunction]
pub fn rs_lempel_ziv_complexity(
s: &[u8]
) -> usize {

#[polars_expr(output_type=UInt32)]
fn pl_lempel_ziv_complexity(inputs: &[Series]) -> PolarsResult<Series> {

let input: &Series = &inputs[0];
let name = input.name();
let input = input.bool()?;
let bits: Vec<bool> = input.into_iter().map(
|op_b| op_b.unwrap_or(false)
).collect();

let mut ind:usize = 0;
let mut inc:usize = 1;

let mut sub_strings: HashSet<&[u8]> = HashSet::new();
while ind + inc <= s.len() {
let subseq: &[u8] = &s[ind..ind+inc];
let mut sub_strings: HashSet<&[bool]> = HashSet::new();
while ind + inc <= bits.len() {
let subseq: &[bool] = &bits[ind..ind+inc];
if sub_strings.contains(subseq) {
inc += 1;
} else {
Expand All @@ -21,19 +31,54 @@ pub fn rs_lempel_ziv_complexity(
inc = 1;
}
}
sub_strings.len()
let c = sub_strings.len();
Ok(Series::new(name, [c as u32]))

}

// #[polars_expr(output_type=UInt32)]
// fn pl_lempel_ziv_complexity(inputs: &[Series]) -> PolarsResult<Series> {



// // Test this when Faer updates its Polars interop

// let input: &Series = &inputs[0];
// let name = input.name();
// let input = input.bool()?;
// let bools: Vec<u8> = input.into_iter().map(
// |op_b| (op_b.unwrap_or(false) as u8)
// ).collect();
// let c:usize = rs_lempel_ziv_complexity(&bools);
// Ok(Series::new(name, [c as u32]))

// }
// let n_lag: &u32 = &inputs[1].u32()?.get(0).unwrap();
// let name: &str = input.name();

// let todf: Result<DataFrame, PolarsError> = df!(name => input);
// match todf {
// Ok(df) => {
// let length: u32 = inputs.len() as u32 - n_lag;
// let mut shifts:Vec<Expr> = (1..(*n_lag + 1)).map(|i| {
// col(name).slice(n_lag - i, length).alias(i.to_string().as_ref())
// }).collect();
// shifts.push(lit(1.0));
// // Construct X
// let df_lazy_x: LazyFrame = df.lazy().select(shifts);
// // Construct Y
// let df_lazy_y: LazyFrame = df.lazy().select(
// [col(name).tail(Some(length as usize)).alias("target")]
// );
// let mat_x = polars_to_faer_f64(df_lazy_x);
// let mat_y = polars_to_faer_f64(df_lazy_y);
// let coeffs = match (mat_x, mat_y) {
// // Use lstsq_solver1 because it is better for matrix that has nrows >>> ncols
// (Ok(x), Ok(y)) => {
// use super::lstsq_solver1;
// use ndarray::Array;
// lstsq_solver1(x, y)
// },
// _ => {
// return PolarsError::ComputeError("Cannot convert autoregressive matrix to Faer matrix.")
// }
// };
// // Coeffs is a 2d array because Faer returns a matrix (the vector as a matrix)
// // coeffs.into_iter() traverses every element in order.
// let output: Series = Series::from_iter(coeffs.into_iter());
// Ok(output)
// }
// , Err(e) => {
// return PolarsResult::Err(e)
// }
// }
// }
32 changes: 1 addition & 31 deletions src/feature_extraction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1 @@
pub mod feature_extractor;
use ndarray::{Array2, ArrayView2};
use faer::{IntoFaer, IntoNdarray, Side};
use faer::prelude::*;


pub fn faer_lstsq(
x: ArrayView2<f64>,
y: ArrayView2<f64>,
) -> Array2<f64> {

// Solving X * beta = rhs using Faer-rs
// This speeds things up significantly but is only suitable for m x k matrices
// where m >> k.

// Zero Copy
let x_ = x.into_faer();
let y_ = y.into_faer();
// Solver. Use closed form solution to solve the least square
// This is faster because xtx has small dimension. So we use the closed
// form solution approach.
let xt = x_.transpose();
let xtx = xt * x_;
let cholesky = xtx.cholesky(Side::Lower).unwrap(); // Can unwrap because xtx is positive semidefinite
let xtx_inv = cholesky.inverse();
// Solution
let beta = xtx_inv * xt * y_;
// To Ndarray (for NumPy Interop), generate output. Will Copy.
let out = beta.as_ref().into_ndarray();
out.to_owned()
}
pub mod feature_extractor;
14 changes: 7 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
use faer::IntoFaer;
use numpy::{PyReadonlyArray2, PyArray2, ToPyArray};
use pyo3::prelude::*;
mod feature_extraction;
use feature_extraction::{
faer_lstsq,
feature_extractor::rs_lempel_ziv_complexity
};
pub mod linalg;
use linalg::lstsq_solver1;

#[pymodule]
fn _functime_rust(_py: Python, m: &PyModule) -> PyResult<()> {

m.add_function(wrap_pyfunction!(rs_lempel_ziv_complexity, m)?)?;
// Normal Rust function interop
// m.add_function(wrap_pyfunction!(rs_lempel_ziv_complexity, m)?)?;

// Functions that Requires NumPy Interop
#[pyfn(m)]
fn rs_faer_lstsq<'py>(
fn rs_faer_lstsq1<'py>(
py: Python<'py>,
x: PyReadonlyArray2<'py, f64>,
y: PyReadonlyArray2<'py, f64>,
) -> &'py PyArray2<f64> {
let x_ = x.as_array();
let y_ = y.as_array();
let beta = faer_lstsq(x_, y_);
let beta = lstsq_solver1(x_.into_faer(), y_.into_faer());
beta.to_pyarray(py)
}
Ok(())
Expand Down
Loading

0 comments on commit c5a87f6

Please sign in to comment.