Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 79 additions & 49 deletions _delphi_utils_python/delphi_utils/smooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,15 @@ def __init__(
raise ValueError("Invalid impute_method given.")
if self.boundary_method not in valid_boundary_methods:
raise ValueError("Invalid boundary_method given.")
if self.window_length <= 1:
raise ValueError("Window length is too short.")

if smoother_name == "savgol":
# The polynomial fitting is done on a past window of size window_length
# including the current day value.
self.coeffs = self.savgol_coeffs(-self.window_length + 1, 0)
self.coeffs = self.savgol_coeffs(
-self.window_length + 1, 0, self.poly_fit_degree
)
else:
self.coeffs = None

Expand All @@ -163,21 +167,41 @@ def smooth(self, signal: Union[np.ndarray, pd.Series]) -> Union[np.ndarray, pd.S
A smoothed 1D signal. Returns an array of the same type and length as
the input.
"""
# If all nans, pass through
if np.all(np.isnan(signal)):
return signal

is_pandas_series = isinstance(signal, pd.Series)
pandas_index = signal.index if is_pandas_series else None
signal = signal.to_numpy() if is_pandas_series else signal

signal = self.impute(signal)
# Find where the first non-nan value is located and truncate the initial nans
ix = np.where(~np.isnan(signal))[0][0]
signal = signal[ix:]

if self.smoother_name == "savgol":
signal_smoothed = self.savgol_smoother(signal)
elif self.smoother_name == "left_gauss_linear":
signal_smoothed = self.left_gauss_linear_smoother(signal)
elif self.smoother_name == "moving_average":
signal_smoothed = self.moving_average_smoother(signal)
else:
# Don't smooth in certain edge cases
if len(signal) < self.poly_fit_degree or len(signal) == 1:
signal_smoothed = signal.copy()

signal_smoothed = signal_smoothed if not is_pandas_series else pd.Series(signal_smoothed)
else:
# Impute
signal = self.impute(signal)

# Smooth
if self.smoother_name == "savgol":
signal_smoothed = self.savgol_smoother(signal)
elif self.smoother_name == "left_gauss_linear":
signal_smoothed = self.left_gauss_linear_smoother(signal)
elif self.smoother_name == "moving_average":
signal_smoothed = self.moving_average_smoother(signal)
elif self.smoother_name == "identity":
signal_smoothed = signal

# Append the nans back, since we want to preserve length
signal_smoothed = np.hstack([np.nan*np.ones(ix), signal_smoothed])
# Convert back to pandas if necessary
if is_pandas_series:
signal_smoothed = pd.Series(signal_smoothed)
signal_smoothed.index = pandas_index
return signal_smoothed

def impute(self, signal):
Expand Down Expand Up @@ -279,33 +303,39 @@ def left_gauss_linear_smoother(self, signal):
signal_smoothed[signal_smoothed <= self.minval] = self.minval
return signal_smoothed

def savgol_predict(self, signal):
def savgol_predict(self, signal, poly_fit_degree, nr):
"""Predict a single value using the savgol method.

Fits a polynomial through the values given by the signal and returns the value
of the polynomial at the right-most signal-value. More precisely, fits a polynomial
f(t) of degree poly_fit_degree through the points signal[-n], signal[-n+1] ..., signal[-1],
and returns the evaluation of the polynomial at the location of signal[0].
of the polynomial at the right-most signal-value. More precisely, for a signal of length
n, fits a poly_fit_degree polynomial through the points signal[-n+1+nr], signal[-n+2+nr],
..., signal[nr], and returns the evaluation of the polynomial at signal[0]. Hence, if
nr=0, then the last value of the signal is smoothed, and if nr=-1, then the value after
the last signal value is anticipated.

Parameters
----------
signal: np.ndarray
A 1D signal to smooth.
poly_fit_degree: int
The degree of the polynomial fit.
nr: int
An integer that determines the position of the predicted value relative to the signal.

Returns
----------
predicted_value: float
The anticipated value that comes after the end of the signal based on a polynomial fit.
"""
# Add one
coeffs = self.savgol_coeffs(-len(signal) + 1, 0)
coeffs = self.savgol_coeffs(-len(signal) + 1 + nr, nr, poly_fit_degree)
predicted_value = signal @ coeffs
return predicted_value

def savgol_coeffs(self, nl, nr):
def savgol_coeffs(self, nl, nr, poly_fit_degree):
"""Solve for the Savitzky-Golay coefficients.

The coefficients c_i give a filter so that
Solves for the Savitzky-Golay coefficients. The coefficients c_i
give a filter so that
y = sum_{i=-{n_l}}^{n_r} c_i x_i
is the value at 0 (thus the constant term) of the polynomial fit
through the points {x_i}. The coefficients are c_i are calculated as
Expand All @@ -322,23 +352,20 @@ def savgol_coeffs(self, nl, nr):
The right window bound for the polynomial fit, inclusive.
poly_fit_degree: int
The degree of the polynomial to be fit.
gaussian_bandwidth: float or None
If float, performs regression with Gaussian weights whose variance is
the gaussian_bandwidth. If None, performs unweighted regression.

Returns
----------
coeffs: np.ndarray
A vector of coefficients of length nl that determines the savgol
A vector of coefficients of length nr - nl + 1 that determines the savgol
convolution filter.
"""
if nl >= nr:
raise ValueError("The left window bound should be less than the right.")
if nr > 0:
raise warnings.warn("The filter is no longer causal.")
warnings.warn("The filter is no longer causal.")

A = np.vstack( # pylint: disable=invalid-name
[np.arange(nl, nr + 1) ** j for j in range(self.poly_fit_degree + 1)]
A = np.vstack(
[np.arange(nl, nr + 1) ** j for j in range(poly_fit_degree + 1)]
).T

if self.gaussian_bandwidth is None:
Expand Down Expand Up @@ -382,15 +409,17 @@ def savgol_smoother(self, signal):
# - identity keeps the original signal (doesn't smooth)
# - nan writes nans
if self.boundary_method == "shortened_window": # pylint: disable=no-else-return
for ix in range(len(self.coeffs)):
for ix in range(min(len(self.coeffs), len(signal))):
if ix == 0:
signal_smoothed[ix] = signal[ix]
else:
# At the very edge, the design matrix is often singular, in which case
# we just fall back to the raw signal
try:
signal_smoothed[ix] = self.savgol_predict(signal[:ix+1])
except np.linalg.LinAlgError:
signal_smoothed[ix] = self.savgol_predict(
signal[: ix + 1], self.poly_fit_degree, 0
)
except np.linalg.LinAlgError: # for small ix, the design matrix is singular
signal_smoothed[ix] = signal[ix]
return signal_smoothed
elif self.boundary_method == "identity":
Expand All @@ -403,19 +432,13 @@ def savgol_smoother(self, signal):
def savgol_impute(self, signal):
"""Impute the nan values in signal using savgol.

This method fills the nan values in the signal with an M-degree polynomial fit
This method fills the nan values in the signal with a quadratic polynomial fit
on a rolling window of the immediate past up to window_length data points.

In the case of a single data point in the past, the single data point is
continued. In the case of no data points in the past (i.e. the signal starts
with nan), an error is raised.
A number of boundary cases are handled involving nan filling close to the boundary.

Note that in the case of many adjacent nans, the method will use previously
imputed values to do the fitting for later values. E.g. for
>>> x = np.array([1.0, 2.0, np.nan, 1.0, np.nan])
the last np.nan will be fit on np.array([1.0, 2.0, *, 1.0]), where * is the
result of imputing based on np.array([1.0, 2.0]) (depends on the savgol
settings).
imputed values to do the fitting for later values.

Parameters
----------
Expand All @@ -428,20 +451,27 @@ def savgol_impute(self, signal):
An imputed 1D signal.
"""
signal_imputed = np.copy(signal)
for ix in np.where(np.isnan(signal))[0]:
for ix in np.where(np.isnan(signal_imputed))[0]:
# Boundary cases
if ix < self.window_length:
# A nan following a single value should just be extended
# At the boundary, a single value should just be extended
if ix == 1:
signal_imputed[ix] = signal_imputed[0]
# Otherwise, use savgol fitting
signal_imputed[ix] = signal_imputed[ix - 1]
# Reduce the polynomial degree if needed
elif ix == 2:
signal_imputed[ix] = self.savgol_predict(
signal_imputed[:ix], 1, -1
)
# Otherwise, use savgol fitting on the largest window prior
else:
coeffs = self.savgol_coeffs(-ix, -1)
signal_imputed[ix] = signal_imputed[:ix] @ coeffs
# Use a polynomial fit on the past window length to impute
signal_imputed[ix] = self.savgol_predict(
signal_imputed[:ix], self.poly_fit_degree, -1
)
# Away from the boundary, use savgol fitting on a fixed window
else:
coeffs = self.savgol_coeffs(-self.window_length, -1)
signal_imputed[ix] = (
signal_imputed[ix - self.window_length : ix] @ coeffs
signal_imputed[ix] = self.savgol_predict(
signal_imputed[ix - self.window_length : ix],
self.poly_fit_degree,
-1,
)
return signal_imputed
68 changes: 67 additions & 1 deletion _delphi_utils_python/tests/test_smooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Tests for the smoothing utility.
Authors: Dmitry Shemetov, Addison Hu, Maria Jahja
"""
from numpy.lib.polynomial import poly
import pytest

import numpy as np
Expand All @@ -10,11 +11,26 @@


class TestSmoothers:
def test_bad_inputs(self):
with pytest.raises(ValueError):
Smoother(smoother_name="hamburger")
with pytest.raises(ValueError):
Smoother(impute_method="hamburger")
with pytest.raises(ValueError):
Smoother(boundary_method="hamburger")
with pytest.raises(ValueError):
Smoother(window_length=1)

def test_identity_smoother(self):
signal = np.arange(30) + np.random.rand(30)
assert np.allclose(signal, Smoother(smoother_name="identity").smooth(signal))

def test_moving_average_smoother(self):
# Test non-integer window-length
with pytest.raises(ValueError):
signal = np.array([1, 1, 1])
Smoother(smoother_name="window_average", window_length=5.5).smooth(signal)

# The raw and smoothed lengths should match
signal = np.ones(30)
smoother = Smoother(smoother_name="moving_average")
Expand Down Expand Up @@ -109,10 +125,44 @@ def test_causal_savgol_smoother(self):
smoother_name="savgol", window_length=window_length, poly_fit_degree=1,
)
smoothed_signal2 = smoother.smooth(signal)

assert np.allclose(smoothed_signal1, smoothed_signal2)

# Test the all nans case
signal = np.nan * np.ones(10)
smoother = Smoother(window_length=9)
smoothed_signal = smoother.smooth(signal)
assert np.all(np.isnan(smoothed_signal))

# Test the case where the signal is length 1
signal = np.ones(1)
smoother = Smoother()
smoothed_signal = smoother.smooth(signal)
assert np.allclose(smoothed_signal, signal)

# Test the case where the signal length is less than polynomial_fit_degree
signal = np.ones(2)
smoother = Smoother(poly_fit_degree=3)
smoothed_signal = smoother.smooth(signal)
assert np.allclose(smoothed_signal, signal)

# Test an edge fitting case
signal = np.array([np.nan, 1, np.nan])
smoother = Smoother(poly_fit_degree=1, window_length=2)
smoothed_signal = smoother.smooth(signal)
assert np.allclose(smoothed_signal, np.array([np.nan, 1, 1]), equal_nan=True)

# Test a range of cases where the signal size following a sequence of nans is returned
for i in range(10):
signal = np.hstack([[np.nan, np.nan, np.nan], np.ones(i)])
smoother = Smoother(poly_fit_degree=0, window_length=5)
smoothed_signal = smoother.smooth(signal)
assert np.allclose(smoothed_signal, signal, equal_nan=True)

def test_impute(self):
# test front nan error
with pytest.raises(ValueError):
Smoother().impute(signal=np.array([np.nan, 1, 1]))

# test the nan imputer
signal = np.array([i if i % 3 else np.nan for i in range(1, 40)])
assert np.allclose(Smoother(impute_method=None).impute(signal), signal, equal_nan=True)
Expand Down Expand Up @@ -180,6 +230,14 @@ def test_impute(self):
smoothed_signal = smoother.savgol_impute(signal)
assert np.allclose(smoothed_signal, signal)

# test that we don't hit a matrix inversion error when there are
# nans on less than window_length away from the boundary
signal = np.hstack([[1], np.nan*np.ones(12), np.arange(5)])
smoother = Smoother(smoother_name="savgol", poly_fit_degree=2,
boundary_method="identity", window_length=10)
smoothed_signal = smoother.impute(signal)
assert np.allclose(smoothed_signal, np.hstack([[1], np.ones(12), np.arange(5)]))

def test_pandas_series_input(self):
# The savgol method should match the linear regression method on the first
# window_length-many values of the signal, if the savgol_weighting is set to true,
Expand Down Expand Up @@ -223,3 +281,11 @@ def test_pandas_series_input(self):
assert np.allclose(
signal[window_length - 1 :], smoothed_signal[window_length - 1 :]
)

# Test that the index of the series gets preserved
signal = pd.Series(np.ones(30), index=np.arange(50, 80))
smoother = Smoother(smoother_name="moving_average", window_length=10)
smoothed_signal = signal.transform(smoother.smooth)
ix1 = signal.index
ix2 = smoothed_signal.index
assert ix1.equals(ix2)