Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: handle absence of global maxima in signal #127

Merged
merged 3 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions qats/app/funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ def calculate_psd(container, twin, fargs, nperseg, normalize):
fargs : tuple
Filter arguments. Time series are filtered before estimating psd.
nperseg : int
Size of segments used for ustimating PSD using Welch's method.
Size of segments used for estimating PSD using Welch's method.
NOTE: the minimum of specified `nperseg` and signal length is used.
normalize : bool
Normalize power spectral density on maximum density.

Expand All @@ -33,8 +34,14 @@ def calculate_psd(container, twin, fargs, nperseg, normalize):
container_out = dict()

for name, ts in container.items():
# resampling to average time step for robustness (necessary for series with varying time step)
f, s = ts.psd(twin=twin, filterargs=fargs, resample=ts.dt, taperfrac=0.1, nperseg=nperseg, normalize=normalize)
# NOTE: nperseg passed on as minimum of specified value and signal input length
# this is done in scipy anyway, which also issues a UserWArnings stating:
# "nperseg = {nperseg} is greater than input length = {input_length:d}, using nperseg = {input_length:d}"
t, _ = ts.get(twin=twin, resample=ts.dt)
nperseg_ = np.min((nperseg, t.size))
# calculate power spectral density
# (resampling to average time step for robustness, necessary for series with varying time step)
f, s = ts.psd(twin=twin, filterargs=fargs, resample=ts.dt, taperfrac=0.1, nperseg=nperseg_, normalize=normalize)
container_out[name] = tuple([f, s])

return container_out
Expand Down
11 changes: 6 additions & 5 deletions qats/app/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -1462,12 +1462,12 @@ def __init__(self, psdnorm, nperseg, nbins, twindec, parent=None):
self.psdnpersegspinbox.setSingleStep(10)
self.psdnpersegspinbox.setEnabled(True)
self.psdnpersegspinbox.setValue(nperseg)
self.psdnpersegspinbox.setToolTip("When esimtating power spectral density using Welch's method the signal\n"
"is dived into overlapping segments and psd is estimated for each segment\n"
self.psdnpersegspinbox.setToolTip("When estimating power spectral density using Welch's method the signal\n"
"is divided into overlapping segments and psd is estimated for each segment\n"
"and then averaged. The overlap is half of the segment length. The \n"
"psd-estimate is smoother with shorter segments.")
psdlayout = QHBoxLayout()
psdlayout.addWidget(QLabel("Length of segment used when estimating power spectral density"))
psdlayout.addWidget(QLabel("Length of segment used when estimating power spectral density *"))
psdlayout.addStretch(1)
psdlayout.addWidget(self.psdnpersegspinbox)
layout.addLayout(psdlayout)
Expand All @@ -1494,14 +1494,15 @@ def __init__(self, psdnorm, nperseg, nbins, twindec, parent=None):
self.twindecspinbox.setValue(twindec)
self.twindecspinbox.setToolTip("Number of decimals in the data processing time window from/to boxes.")
twindeclayout = QHBoxLayout()
twindeclayout.addWidget(QLabel("Number of decimals in data processing time window *"))
twindeclayout.addWidget(QLabel("Number of decimals in data processing time window **"))
twindeclayout.addStretch(1)
twindeclayout.addWidget(self.twindecspinbox)
layout.addLayout(twindeclayout)

# help text
helptext = QHBoxLayout()
helptext.addWidget(QLabel("* Close and re-open application for this setting to have effect"))
helptext.addWidget(QLabel("* Parameter 'nperseg' in scipy.signal.welch \n (signal length is used if smaller than specified value)\n"
"** Close and re-open application for this setting to have effect"))
helptext.addStretch(1)
layout.addLayout(helptext)

Expand Down
127 changes: 75 additions & 52 deletions qats/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from scipy.signal import coherence as spcoherence
from scipy.signal import csd as spcsd
from scipy.signal import filtfilt, sosfiltfilt, welch
from typing import Tuple


def extend_signal_ends(x: np.ndarray, n: int) -> np.ndarray:
Expand Down Expand Up @@ -122,7 +123,7 @@ def smooth(x: np.ndarray, window_len: int = 11, window: str = 'rectangular', mod
return y


def taper(x: np.ndarray, window: str = 'tukey', alpha: float = 0.001) -> (np.ndarray, float):
def taper(x: np.ndarray, window: str = 'tukey', alpha: float = 0.001) -> Tuple[np.ndarray, float]:
"""
Taper the input time serie using a window function

Expand Down Expand Up @@ -440,8 +441,8 @@ def average_frequency(t: np.ndarray, x: np.ndarray, up: bool = True) -> float:
x : array_like
Signal.
up : bool, optional
- True: Period based on average time between up-crossings
- False: Period based on average time between down-crossings
- True: Frequency based on average time between up-crossings
- False: Frequency based on average time between down-crossings


Returns
Expand All @@ -462,12 +463,23 @@ def average_frequency(t: np.ndarray, x: np.ndarray, up: bool = True) -> float:

crossings = np.diff(crossings) # array with value=1 at position of each up-crossing and -1 at each down-crossing
crossings[crossings != indicator] = 0 # remove crossings with opposite direction
i = np.where(crossings == indicator)[0] + 1 # indices for crossings
d = (t[i[-1]] - t[i[0]]) / (np.abs(np.sum(crossings)) - 1) # duration between first and last crossing
return 1./d
ind = np.where(crossings == indicator)[0] + 1 # indices for crossings

if ind.size > 1:
# more than one crossing -> calculate frequency
# duration between first and last crossing
dt = t[ind[-1]] - t[ind[0]]
n_cycles = ind.size - 1 # no. of cycles = no. of crossings minus 1
# average frequency of up- or downcrossings
freq = n_cycles / dt
else:
# one crossing at most -> frequency is n/a
freq = np.nan

return freq


def find_maxima(x, local: bool = False, threshold: float = None, up: bool = True) -> (np.ndarray, np.ndarray):
def find_maxima(x, local: bool = False, threshold: float = None, up: bool = True) -> Tuple[np.ndarray, np.ndarray]:
"""
Return sorted maxima

Expand Down Expand Up @@ -544,32 +556,46 @@ def find_maxima(x, local: bool = False, threshold: float = None, up: bool = True
crossing_indices_up = np.where(crossings == 1)[0] + 1 # up-crossings
crossing_indices_do = np.where(crossings == -1)[0] + 1 # down-crossings

# use up-crossings as the basis
crossing_indices = crossing_indices_up

# if there is a down-crossing after the last up-crossing, add that crossing as well
# (this is to avoid losing the last peak, and is particularly important for problems with few crossings,
# e.g., due to low-frequent oscillations)
if crossing_indices_up[-1] < crossing_indices_do[-1]:
crossing_indices = np.append(crossing_indices, crossing_indices_do[-1])

# number of crossings and number of peaks
n_crossings = crossing_indices.size
n_peaks = n_crossings - 1

# number of up-/downcrossings
n_up = crossing_indices_up.size
n_do = crossing_indices_do.size

# no global maxima if the signal crosses mean only once
if n_crossings < 2:
return np.array([]), np.array([], dtype=int)

# initiate arrays to be populated subsequently
maxima = np.zeros(n_peaks)
maxima_indices = np.zeros(n_peaks, dtype=int)

# loop to find max. between each up-crossing:
for j, start in enumerate(crossing_indices[:-1]):
stop = crossing_indices[j + 1]
maxima[j] = x[start:stop].max()
maxima_indices[j] = start + np.argmax(x[start:stop])
if (n_up == 0) or (n_do == 0):
maxima = np.array([])
maxima_indices = np.array([], dtype=int)
else:
# signal crosses mean at least two times
# use up-crossings as the basis
crossing_indices = crossing_indices_up

# if there is a down-crossing after the last up-crossing, add that crossing as well
# (this is to avoid losing the last peak, and is particularly important for problems with few crossings,
# e.g., due to low-frequent oscillations)
if crossing_indices_up[-1] < crossing_indices_do[-1]:
crossing_indices = np.append(crossing_indices, crossing_indices_do[-1])

# number of crossings and number of peaks
n_crossings = crossing_indices.size
n_peaks = n_crossings - 1

# no global maxima if `n_crossings` is now 1
# (e.g., because there is one upcrossing and one downcrossing, but
# the downcrossing occurs first)
if n_crossings < 2:
maxima = np.array([])
maxima_indices = np.array([], dtype=int)
else:
# there is at least one global maximum
# initiate arrays to be populated subsequently
maxima = np.zeros(n_crossings - 1)
maxima_indices = np.zeros(n_crossings - 1, dtype=int)

# loop to find max. between each up-crossing:
for j, start in enumerate(crossing_indices[:-1]):
stop = crossing_indices[j + 1]
maxima[j] = x[start:stop].max()
maxima_indices[j] = start + np.argmax(x[start:stop])

else:
# local maxima (all peaks)
Expand All @@ -581,29 +607,26 @@ def find_maxima(x, local: bool = False, threshold: float = None, up: bool = True
maxima_indices = np.where(d2s == 1)[0] # unpack tuple returned from np.where
maxima = x[maxima_indices]

n_peaks = maxima.size
n_peaks = maxima.size

# return quickly if no peaks/local maxima were found (e.g., if time series is monotonically increasing)
if n_peaks == 0:
return np.array([]), np.array([], dtype=int)

# discard maxima lower than specified threshold
if threshold is not None:
above_threshold = (maxima >= threshold)
maxima = maxima[above_threshold]
maxima_indices = maxima_indices[above_threshold]
else:
pass
if n_peaks > 0:
# discard maxima lower than specified threshold
if threshold is not None:
above_threshold = (maxima >= threshold)
maxima = maxima[above_threshold]
maxima_indices = maxima_indices[above_threshold]
else:
pass

# sort ascending
ascending = np.argsort(maxima)
maxima = maxima[ascending]
maxima_indices = maxima_indices[ascending]
# sort ascending
ascending = np.argsort(maxima)
maxima = maxima[ascending]
maxima_indices = maxima_indices[ascending]

return maxima, maxima_indices


def psd(x: np.ndarray, dt: float, **kwargs) -> (np.ndarray, np.ndarray):
def psd(x: np.ndarray, dt: float, **kwargs) -> Tuple[np.ndarray, np.ndarray]:
"""
Estimate power spectral density of discrete time signal X using Welch’s method.

Expand Down Expand Up @@ -640,7 +663,7 @@ def psd(x: np.ndarray, dt: float, **kwargs) -> (np.ndarray, np.ndarray):
return f, p


def csd(x: np.ndarray, y: np.ndarray, dt: float, **kwargs) -> (np.ndarray, np.ndarray):
def csd(x: np.ndarray, y: np.ndarray, dt: float, **kwargs) -> Tuple[np.ndarray, np.ndarray]:
"""
Estimate cross power spectral density of discrete-time signals X and Y using Welch’s method.

Expand Down Expand Up @@ -680,7 +703,7 @@ def csd(x: np.ndarray, y: np.ndarray, dt: float, **kwargs) -> (np.ndarray, np.nd
return f, p


def coherence(x: np.ndarray, y: np.ndarray, dt: float, **kwargs) -> (np.ndarray, np.ndarray):
def coherence(x: np.ndarray, y: np.ndarray, dt: float, **kwargs) -> Tuple[np.ndarray, np.ndarray]:
"""
Estimate the magnitude squared coherence estimate of discrete-time signals X and Y using Welch’s method.

Expand Down Expand Up @@ -720,7 +743,7 @@ def coherence(x: np.ndarray, y: np.ndarray, dt: float, **kwargs) -> (np.ndarray,
return f, c


def tfe(x: np.ndarray, y: np.ndarray, dt: float, clim: float = None, **kwargs) -> (np.ndarray, np.ndarray):
def tfe(x: np.ndarray, y: np.ndarray, dt: float, clim: float = None, **kwargs) -> Tuple[np.ndarray, np.ndarray]:
"""
Estimate the transfer function between two discrete-time signals X and Y using Welch’s method.

Expand Down
Loading