diff --git a/qats/app/funcs.py b/qats/app/funcs.py index 6bc8dd8..70e0067 100644 --- a/qats/app/funcs.py +++ b/qats/app/funcs.py @@ -21,7 +21,8 @@ def calculate_psd(container, twin, fargs, nperseg, normalize): fargs : tuple Filter arguments. Time series are filtered before estimating psd. nperseg : int - Size of segments used for ustimating PSD using Welch's method. + Size of segments used for estimating PSD using Welch's method. + NOTE: the minimum of specified `nperseg` and signal length is used. normalize : bool Normalize power spectral density on maximum density. @@ -33,8 +34,14 @@ def calculate_psd(container, twin, fargs, nperseg, normalize): container_out = dict() for name, ts in container.items(): - # resampling to average time step for robustness (necessary for series with varying time step) - f, s = ts.psd(twin=twin, filterargs=fargs, resample=ts.dt, taperfrac=0.1, nperseg=nperseg, normalize=normalize) + # NOTE: nperseg passed on as minimum of specified value and signal input length + # this is done in scipy anyway, which also issues a UserWArnings stating: + # "nperseg = {nperseg} is greater than input length = {input_length:d}, using nperseg = {input_length:d}" + t, _ = ts.get(twin=twin, resample=ts.dt) + nperseg_ = np.min((nperseg, t.size)) + # calculate power spectral density + # (resampling to average time step for robustness, necessary for series with varying time step) + f, s = ts.psd(twin=twin, filterargs=fargs, resample=ts.dt, taperfrac=0.1, nperseg=nperseg_, normalize=normalize) container_out[name] = tuple([f, s]) return container_out diff --git a/qats/app/gui.py b/qats/app/gui.py index 939c96b..93f091d 100644 --- a/qats/app/gui.py +++ b/qats/app/gui.py @@ -1462,12 +1462,12 @@ def __init__(self, psdnorm, nperseg, nbins, twindec, parent=None): self.psdnpersegspinbox.setSingleStep(10) self.psdnpersegspinbox.setEnabled(True) self.psdnpersegspinbox.setValue(nperseg) - self.psdnpersegspinbox.setToolTip("When esimtating power spectral density using Welch's method the signal\n" - "is dived into overlapping segments and psd is estimated for each segment\n" + self.psdnpersegspinbox.setToolTip("When estimating power spectral density using Welch's method the signal\n" + "is divided into overlapping segments and psd is estimated for each segment\n" "and then averaged. The overlap is half of the segment length. The \n" "psd-estimate is smoother with shorter segments.") psdlayout = QHBoxLayout() - psdlayout.addWidget(QLabel("Length of segment used when estimating power spectral density")) + psdlayout.addWidget(QLabel("Length of segment used when estimating power spectral density *")) psdlayout.addStretch(1) psdlayout.addWidget(self.psdnpersegspinbox) layout.addLayout(psdlayout) @@ -1494,14 +1494,15 @@ def __init__(self, psdnorm, nperseg, nbins, twindec, parent=None): self.twindecspinbox.setValue(twindec) self.twindecspinbox.setToolTip("Number of decimals in the data processing time window from/to boxes.") twindeclayout = QHBoxLayout() - twindeclayout.addWidget(QLabel("Number of decimals in data processing time window *")) + twindeclayout.addWidget(QLabel("Number of decimals in data processing time window **")) twindeclayout.addStretch(1) twindeclayout.addWidget(self.twindecspinbox) layout.addLayout(twindeclayout) # help text helptext = QHBoxLayout() - helptext.addWidget(QLabel("* Close and re-open application for this setting to have effect")) + helptext.addWidget(QLabel("* Parameter 'nperseg' in scipy.signal.welch \n (signal length is used if smaller than specified value)\n" + "** Close and re-open application for this setting to have effect")) helptext.addStretch(1) layout.addLayout(helptext) diff --git a/qats/signal.py b/qats/signal.py index 66c124e..56b05af 100644 --- a/qats/signal.py +++ b/qats/signal.py @@ -11,6 +11,7 @@ from scipy.signal import coherence as spcoherence from scipy.signal import csd as spcsd from scipy.signal import filtfilt, sosfiltfilt, welch +from typing import Tuple def extend_signal_ends(x: np.ndarray, n: int) -> np.ndarray: @@ -122,7 +123,7 @@ def smooth(x: np.ndarray, window_len: int = 11, window: str = 'rectangular', mod return y -def taper(x: np.ndarray, window: str = 'tukey', alpha: float = 0.001) -> (np.ndarray, float): +def taper(x: np.ndarray, window: str = 'tukey', alpha: float = 0.001) -> Tuple[np.ndarray, float]: """ Taper the input time serie using a window function @@ -440,8 +441,8 @@ def average_frequency(t: np.ndarray, x: np.ndarray, up: bool = True) -> float: x : array_like Signal. up : bool, optional - - True: Period based on average time between up-crossings - - False: Period based on average time between down-crossings + - True: Frequency based on average time between up-crossings + - False: Frequency based on average time between down-crossings Returns @@ -462,12 +463,23 @@ def average_frequency(t: np.ndarray, x: np.ndarray, up: bool = True) -> float: crossings = np.diff(crossings) # array with value=1 at position of each up-crossing and -1 at each down-crossing crossings[crossings != indicator] = 0 # remove crossings with opposite direction - i = np.where(crossings == indicator)[0] + 1 # indices for crossings - d = (t[i[-1]] - t[i[0]]) / (np.abs(np.sum(crossings)) - 1) # duration between first and last crossing - return 1./d + ind = np.where(crossings == indicator)[0] + 1 # indices for crossings + + if ind.size > 1: + # more than one crossing -> calculate frequency + # duration between first and last crossing + dt = t[ind[-1]] - t[ind[0]] + n_cycles = ind.size - 1 # no. of cycles = no. of crossings minus 1 + # average frequency of up- or downcrossings + freq = n_cycles / dt + else: + # one crossing at most -> frequency is n/a + freq = np.nan + + return freq -def find_maxima(x, local: bool = False, threshold: float = None, up: bool = True) -> (np.ndarray, np.ndarray): +def find_maxima(x, local: bool = False, threshold: float = None, up: bool = True) -> Tuple[np.ndarray, np.ndarray]: """ Return sorted maxima @@ -544,32 +556,46 @@ def find_maxima(x, local: bool = False, threshold: float = None, up: bool = True crossing_indices_up = np.where(crossings == 1)[0] + 1 # up-crossings crossing_indices_do = np.where(crossings == -1)[0] + 1 # down-crossings - # use up-crossings as the basis - crossing_indices = crossing_indices_up - - # if there is a down-crossing after the last up-crossing, add that crossing as well - # (this is to avoid losing the last peak, and is particularly important for problems with few crossings, - # e.g., due to low-frequent oscillations) - if crossing_indices_up[-1] < crossing_indices_do[-1]: - crossing_indices = np.append(crossing_indices, crossing_indices_do[-1]) - - # number of crossings and number of peaks - n_crossings = crossing_indices.size - n_peaks = n_crossings - 1 - + # number of up-/downcrossings + n_up = crossing_indices_up.size + n_do = crossing_indices_do.size + # no global maxima if the signal crosses mean only once - if n_crossings < 2: - return np.array([]), np.array([], dtype=int) - - # initiate arrays to be populated subsequently - maxima = np.zeros(n_peaks) - maxima_indices = np.zeros(n_peaks, dtype=int) - - # loop to find max. between each up-crossing: - for j, start in enumerate(crossing_indices[:-1]): - stop = crossing_indices[j + 1] - maxima[j] = x[start:stop].max() - maxima_indices[j] = start + np.argmax(x[start:stop]) + if (n_up == 0) or (n_do == 0): + maxima = np.array([]) + maxima_indices = np.array([], dtype=int) + else: + # signal crosses mean at least two times + # use up-crossings as the basis + crossing_indices = crossing_indices_up + + # if there is a down-crossing after the last up-crossing, add that crossing as well + # (this is to avoid losing the last peak, and is particularly important for problems with few crossings, + # e.g., due to low-frequent oscillations) + if crossing_indices_up[-1] < crossing_indices_do[-1]: + crossing_indices = np.append(crossing_indices, crossing_indices_do[-1]) + + # number of crossings and number of peaks + n_crossings = crossing_indices.size + n_peaks = n_crossings - 1 + + # no global maxima if `n_crossings` is now 1 + # (e.g., because there is one upcrossing and one downcrossing, but + # the downcrossing occurs first) + if n_crossings < 2: + maxima = np.array([]) + maxima_indices = np.array([], dtype=int) + else: + # there is at least one global maximum + # initiate arrays to be populated subsequently + maxima = np.zeros(n_crossings - 1) + maxima_indices = np.zeros(n_crossings - 1, dtype=int) + + # loop to find max. between each up-crossing: + for j, start in enumerate(crossing_indices[:-1]): + stop = crossing_indices[j + 1] + maxima[j] = x[start:stop].max() + maxima_indices[j] = start + np.argmax(x[start:stop]) else: # local maxima (all peaks) @@ -581,29 +607,26 @@ def find_maxima(x, local: bool = False, threshold: float = None, up: bool = True maxima_indices = np.where(d2s == 1)[0] # unpack tuple returned from np.where maxima = x[maxima_indices] - n_peaks = maxima.size + n_peaks = maxima.size - # return quickly if no peaks/local maxima were found (e.g., if time series is monotonically increasing) - if n_peaks == 0: - return np.array([]), np.array([], dtype=int) - - # discard maxima lower than specified threshold - if threshold is not None: - above_threshold = (maxima >= threshold) - maxima = maxima[above_threshold] - maxima_indices = maxima_indices[above_threshold] - else: - pass + if n_peaks > 0: + # discard maxima lower than specified threshold + if threshold is not None: + above_threshold = (maxima >= threshold) + maxima = maxima[above_threshold] + maxima_indices = maxima_indices[above_threshold] + else: + pass - # sort ascending - ascending = np.argsort(maxima) - maxima = maxima[ascending] - maxima_indices = maxima_indices[ascending] + # sort ascending + ascending = np.argsort(maxima) + maxima = maxima[ascending] + maxima_indices = maxima_indices[ascending] return maxima, maxima_indices -def psd(x: np.ndarray, dt: float, **kwargs) -> (np.ndarray, np.ndarray): +def psd(x: np.ndarray, dt: float, **kwargs) -> Tuple[np.ndarray, np.ndarray]: """ Estimate power spectral density of discrete time signal X using Welch’s method. @@ -640,7 +663,7 @@ def psd(x: np.ndarray, dt: float, **kwargs) -> (np.ndarray, np.ndarray): return f, p -def csd(x: np.ndarray, y: np.ndarray, dt: float, **kwargs) -> (np.ndarray, np.ndarray): +def csd(x: np.ndarray, y: np.ndarray, dt: float, **kwargs) -> Tuple[np.ndarray, np.ndarray]: """ Estimate cross power spectral density of discrete-time signals X and Y using Welch’s method. @@ -680,7 +703,7 @@ def csd(x: np.ndarray, y: np.ndarray, dt: float, **kwargs) -> (np.ndarray, np.nd return f, p -def coherence(x: np.ndarray, y: np.ndarray, dt: float, **kwargs) -> (np.ndarray, np.ndarray): +def coherence(x: np.ndarray, y: np.ndarray, dt: float, **kwargs) -> Tuple[np.ndarray, np.ndarray]: """ Estimate the magnitude squared coherence estimate of discrete-time signals X and Y using Welch’s method. @@ -720,7 +743,7 @@ def coherence(x: np.ndarray, y: np.ndarray, dt: float, **kwargs) -> (np.ndarray, return f, c -def tfe(x: np.ndarray, y: np.ndarray, dt: float, clim: float = None, **kwargs) -> (np.ndarray, np.ndarray): +def tfe(x: np.ndarray, y: np.ndarray, dt: float, clim: float = None, **kwargs) -> Tuple[np.ndarray, np.ndarray]: """ Estimate the transfer function between two discrete-time signals X and Y using Welch’s method.