From 204321d7319fee63254433f1d9e7aa8fbead887c Mon Sep 17 00:00:00 2001 From: mahdi Date: Wed, 31 Jan 2024 16:38:04 -0500 Subject: [PATCH] Resolve #18: Levinson (chapter 5) implementation coverage testing - removed prints from implementations and tests, added proper logging instead --- .gitignore | 5 +++ pyssp/lattice.py | 22 +++++------ pyssp/levinson.py | 73 ++++++++++++++++++++++-------------- pyssp/modeling.py | 2 +- pyssp/optimal.py | 11 ++++-- tests/test_levinson.py | 85 ++++++++++++++++++++++++++++++++++++++---- tests/test_modeling.py | 38 ++++++++++--------- 7 files changed, 167 insertions(+), 69 deletions(-) diff --git a/.gitignore b/.gitignore index 9c1e869..d66090a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,8 +3,13 @@ __pycache__ *.egg-info env .pyenv* +.env* .coverage .vscode .ipynb_checkpoints +dsp + +# images +*.svg diff --git a/pyssp/lattice.py b/pyssp/lattice.py index 25c270b..6b6e1aa 100644 --- a/pyssp/lattice.py +++ b/pyssp/lattice.py @@ -1,11 +1,14 @@ """Implementation of algorithm from Chapter 6.""" +import logging from typing import NoReturn import numpy as np import scipy as sp from numpy.typing import ArrayLike +logger = logging.getLogger(__name__) + def fcov(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]: """Figure 6.15, Page 310. @@ -26,19 +29,18 @@ def fcov(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]: err = np.empty((p, 1)) for j in range(p): - print(j) + logger.info(j) N = N - 1 - # print(f"{eplus=}, {eplus.shape=}") - # print(f"{eminus=}, {eminus.shape=}") + logger.info(f"{eplus=}, {eplus.shape=}") + logger.info(f"{eminus=}, {eminus.shape=}") gamma[j] = (np.transpose(-eminus) @ eplus) / (np.transpose(eminus) @ eminus) temp1 = eplus + gamma[j] * eminus temp2 = eminus + np.conjugate(gamma[j]) * eplus err[j] = np.transpose(temp1) @ temp1 eplus = temp1[1:N] eminus = temp2[:N - 1] - print(gamma) - print(err) - print() + logger.info(gamma) + logger.info(err) return gamma, err @@ -60,10 +62,10 @@ def burg(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]: err = np.empty((p, 1)) for j in range(p): - print(j) + logger.info(j) N = N - 1 - # print(f"{eplus=}, {eplus.shape=}") - # print(f"{eminus=}, {eminus.shape=}") + logger.info(f"{eplus=}, {eplus.shape=}") + logger.info(f"{eminus=}, {eminus.shape=}") eplusmag = np.transpose(eplus) @ eplus eminusmag = np.transpose(eplus) @ eplus gamma[j] = (np.transpose(-2 * eminus) @ eplus) / (eplusmag + eminusmag) @@ -72,7 +74,6 @@ def burg(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]: err[j] = np.transpose(temp1) @ temp1 + np.transpose(temp2) @ temp2 eplus = temp1[1:N] eminus = temp2[:N - 1] - print() return gamma, err @@ -105,7 +106,6 @@ def mcov(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]: b = b1 + b2 a = sp.linalg.solve_toeplitz(Rx[:, 1], b) a = np.concatenate(([1], a)) - # print(a.shape) err = np.dot(R[0], a) + np.dot(np.flip(R[p]), a) return a, err diff --git a/pyssp/levinson.py b/pyssp/levinson.py index 4f1324a..e7ae8a0 100644 --- a/pyssp/levinson.py +++ b/pyssp/levinson.py @@ -1,9 +1,13 @@ """Chapter 5 algorithm implementations.""" +import logging +from typing import NoReturn import numpy as np from numpy.typing import ArrayLike +logger = logging.getLogger() + def rtoa(r: ArrayLike) -> tuple[np.ndarray, float]: """Recursively map a set of autocorrelations to a set of model parameters. @@ -22,8 +26,8 @@ def rtoa(r: ArrayLike) -> tuple[np.ndarray, float]: anT = np.conjugate(np.flipud(a)) a = an + gamma * np.concatenate((np.zeros(1), anT.ravel())).reshape(-1, 1) epsilon = epsilon * (1 - np.abs(gamma)**2) - print(f"{gamma=},\n{a=},\n{epsilon=}\n") - return a, epsilon + logger.info(f"{gamma=},\n{a=},\n{epsilon=}\n") + return a.ravel(), epsilon.ravel()[0] def gtoa(gamma: ArrayLike) -> ArrayLike: @@ -40,13 +44,13 @@ def gtoa(gamma: ArrayLike) -> ArrayLike: a = np.ones((1, 1)) _g = np.array(gamma) p = len(_g) - for j in range(1, p): + for j in range(1, p + 1): a = np.concatenate((a, np.zeros((1, 1)))) _a = a.copy() af = np.conjugate(np.flipud(_a)) - a = a + _g[j] * af + a = a + _g[j - 1] * af - return a + return a.ravel() def atog(a: ArrayLike) -> ArrayLike: @@ -70,23 +74,25 @@ def atog(a: ArrayLike) -> ArrayLike: gamma[p - 2] = _a[p - 2] for j in range(p - 2, 0, -1): - # print(f"{gamma=}, {_a=}") + # logger.info(f"{gamma=}, {_a=}") ai1 = _a[:j].copy() ai2 = _a[:j].copy() af = np.flipud(np.conjugate(ai1)) - # print(f"{ai1=}, {ai2=}, {af=}") + # logger.info(f"{ai1=}, {ai2=}, {af=}") s1 = ai2 - gamma[j] * af s2 = 1 - np.abs(gamma[j])**2 _a = np.divide(s1, s2) - # print(f"{s1=}, {s2=}, {_a=}") + # logger.info(f"{s1=}, {s2=}, {_a=}") gamma[j - 1] = _a[j - 1] - return gamma + return gamma.ravel() def gtor(gamma: ArrayLike, epsilon: None | float = None) -> ArrayLike: """Find the autocorrelation sequence from the reflection coefficients and the modeling error. + Also called the Inverse Levinson-Durbin Recursion. + Page 241, Figure 5.9. """ _g = np.array(gamma) @@ -99,19 +105,18 @@ def gtor(gamma: ArrayLike, epsilon: None | float = None) -> ArrayLike: aa0 = np.concatenate((aa, np.zeros((1, 1)))).reshape(-1, 1) aaf = np.conjugate(np.flipud(aa1)) aa = aa0 + _g[j] * aaf - print(aa) + logger.info(aa) rf = -np.fliplr(r) @ aa - print(rf) - print(rf.shape) - print(r) - print(r.shape) - print() + logger.info(rf) + logger.info(rf.shape) + logger.info(r) + logger.info(r.shape) r = np.concatenate((r[0], rf[0])).reshape(1, -1) if epsilon is not None: r = r * epsilon / np.prod(1 - np.abs(gamma)**2) - return r + return r.ravel() def ator(a: ArrayLike, b: float) -> ArrayLike: @@ -147,27 +152,39 @@ def glev(r: ArrayLike, b: ArrayLike) -> ArrayLike: x = np.array([_b[0] / _r[0]]).reshape(-1, 1) epsilon = _r[0] for j in range(1, p): - print(j) - print(f"{_r=}, {_r.shape=}") + logger.info(j) + logger.info(f"{_r=}, {_r.shape=}") _r1 = np.transpose(np.array(_r[1:j + 1])) - print(f"{_r1=}, {_r1.shape=}") - print(f"{x=}, {x.shape=}") - print(f"{a=}, {a.shape=}") + logger.info(f"{_r1=}, {_r1.shape=}") + logger.info(f"{x=}, {x.shape=}") + logger.info(f"{a=}, {a.shape=}") g = _r1 @ np.flipud(a) - print(f"{g=}, {g.shape=}") + logger.info(f"{g=}, {g.shape=}") gamma = -g / epsilon - print(f"{gamma=}, {gamma.shape=}") + logger.info(f"{gamma=}, {gamma.shape=}") _a0 = np.concatenate([a, [[0]]]) _af = np.conjugate(np.flipud(_a0)) - print(f"{_a0=}, {_a0.shape=}") - print(f"{_af=}, {_af.shape=}") + logger.info(f"{_a0=}, {_a0.shape=}") + logger.info(f"{_af=}, {_af.shape=}") a = _a0 + gamma * _af epsilon = epsilon * (1 - np.abs(gamma)**2) - print(f"{epsilon=}") + logger.info(f"{epsilon=}") delta = _r1 @ np.flipud(x) q = (_b[j] - delta[0, 0]) / epsilon x = np.concatenate([x, [[0]]]) x = x + q * np.conjugate(np.flipud(a)) - print() - return x + return x.ravel() + + +def shur_cohn_test() -> NoReturn: + """Check stability of any rational filter.""" + raise NotImplementedError() + + +def splitlev(r: ArrayLike, b: ArrayLike) -> ArrayLike: + """Implement the split levinson recursion. + + Table 5.7, Page 274. + """ + raise NotImplementedError() diff --git a/pyssp/modeling.py b/pyssp/modeling.py index 7f4eda8..09e5ddb 100644 --- a/pyssp/modeling.py +++ b/pyssp/modeling.py @@ -142,7 +142,7 @@ def covm(x: ArrayLike, p: int)-> tuple[ArrayLike, float]: X = convm(_x, p + 1) Xq = X[p - 1:N - 1, :p].copy() Xsol = np.linalg.lstsq(-Xq, X[p:N, 0], rcond=None)[0] - logger.warning(f"{Xsol=}") + logger.info(f"{Xsol=}") a = np.hstack(([1], Xsol)) err = np.abs(X[p:N,0] @ X[p:N,] @ a) return a, err.ravel()[0] diff --git a/pyssp/optimal.py b/pyssp/optimal.py index e4566a6..784b11c 100644 --- a/pyssp/optimal.py +++ b/pyssp/optimal.py @@ -1,10 +1,13 @@ """Optimal Filters, Chapter 7.""" +import logging from typing import NoReturn import numpy as np from numpy.typing import ArrayLike +logger = logging.getLogger(__name__) + def kalman(y: list[float], A: ArrayLike, C: ArrayLike, sigmaw: list[float], sigmav: list[float]) -> tuple[np.ndarray, ...]: @@ -22,11 +25,11 @@ def kalman(y: list[float], A: ArrayLike, C: ArrayLike, sigmaw: list[float], Qw = np.diag(np.array(sigmaw, ndmin=1)) Qv = np.diag(np.array(sigmav, ndmin=1)) N = np.shape(_y)[1] - print(_y) + logger.info(_y) p = np.shape(_A)[0] q = np.shape(_C)[0] - print(f"{p}, {q}, {N=}") + logger.info(f"{p}, {q}, {N=}") # The a priori error covariance matrix P0 = np.zeros((N + 1, p, p)) @@ -66,12 +69,12 @@ def kalman(y: list[float], A: ArrayLike, C: ArrayLike, sigmaw: list[float], return P0, P1, K, xhat0, xhat1 -def wiener_denoise() -> None: +def wiener_denoise() -> NoReturn: """Denoising based on IIR wiener filters.""" raise NotImplementedError() -def wiener_systemid() -> None: +def wiener_systemid() -> NoReturn: """Systemid based on FIR wiener filters.""" raise NotImplementedError() diff --git a/tests/test_levinson.py b/tests/test_levinson.py index 968d009..d2943b4 100644 --- a/tests/test_levinson.py +++ b/tests/test_levinson.py @@ -1,8 +1,12 @@ -"""""" +"""Test validation of levinson based recursion routines.""" +import logging import numpy as np -from pyssp.levinson import glev, gtor +from pyssp import levinson + + +logger = logging.getLogger(__name__) def test_glev(): @@ -10,15 +14,80 @@ def test_glev(): r = [4, 2, 1] b = [9, 6, 12] - res = glev(r, b) - print(res) + expected = [2, -1, 3] + + sol = levinson.glev(r, b) + logger.info(sol) + + assert np.allclose(sol, expected) -def test_gtor(): +def test_gtor() -> None: '''Based on example 5.2.6''' + expected_rx = np.array([2, -1, -1/4, 1/8]) gamma = [1/2, 1/2, 1/2] epsilon = 2 * (3 / 4)**3 - res = gtor(gamma, epsilon) - true_results = np.array([2, -1, -1/4, 1/8]) - print(res) \ No newline at end of file + rx = levinson.gtor(gamma, epsilon) + assert np.allclose(rx, expected_rx) + + +def test_atog() -> None: + + a = [1, 0.5, -0.1, -0.5] + expected_g = np.array([0.5, 0.2, -0.5]) + + gamma = levinson.atog(a) + logger.info(gamma) + logger.info(expected_g) + + assert np.allclose(gamma, expected_g) + + +def test_rtog() -> None: + rx = [2, -1, -1/4, 1/8] + expected_g = np.array([0.5, 0.5, 0.5]) + + gamma = levinson.rtog(rx) + logger.info(gamma) + logger.info(expected_g) + + assert np.allclose(gamma, expected_g) + + +def test_ator() -> None: + a = [1, 1, 7/8, 1/2] + epsilon = 2 * (3 / 4)**3 + b = epsilon**2 + expected_rx = np.array([2, -1, -1/4, 1/8]) + + rx = levinson.ator(a, b = b) + logger.info(rx) + logger.info(expected_rx) + + assert np.array_equal(rx, expected_rx) + + +def test_gtoa() -> None: + gamma = [0.5, 0.2, -0.5] + expected_a = np.array([1, 0.5, -0.1, -0.5]) + + a = levinson.gtoa(gamma) + logger.info(a) + logger.info(expected_a) + + assert np.allclose(a, expected_a) + +def test_rtoa() -> None: + rx = np.array([2, -1, -1/4, 1/8]) + expected_a = [1, 1, 7/8, 1/2] + expected_eps = 2 * (3 / 4)**3 + + a, eps = levinson.rtoa(rx) + + logger.info(f"{a=}") + logger.info(f"{expected_a=}") + logger.info(f"{eps=}") + logger.info(f"{expected_eps=}") + + assert np.allclose(a, expected_a) and np.allclose(eps, expected_eps) diff --git a/tests/test_modeling.py b/tests/test_modeling.py index de1b349..2e5cd1d 100644 --- a/tests/test_modeling.py +++ b/tests/test_modeling.py @@ -23,8 +23,8 @@ def test_pade() -> None: expected_b = [1, 1.5, 0.75] a, b = modeling.pade(x, p = 0, q = 2) - logger.warning(a) - logger.warning(b) + logger.info(a) + logger.info(b) assert np.array_equal(a, expected_a) and np.array_equal(b, expected_b) @@ -32,8 +32,8 @@ def test_pade() -> None: expected_b = [1, 1] a, b = modeling.pade(x, p = 1, q = 1) - logger.warning(a) - logger.warning(b) + logger.info(a) + logger.info(b) assert np.array_equal(a, expected_a) and np.array_equal(b, expected_b) @@ -45,7 +45,7 @@ def test_prony(): xn[N:] = 0 res = modeling.prony(xn, p = 1, q = 1) - logger.warning(res) + logger.info(res) def test_shanks(): @@ -53,7 +53,7 @@ def test_shanks(): T = 10 * (N - 1) + 1 xn = np.ones(T) xn[N:] = 0 - + expected_a = [1, -0.95] expected_b = [1, 0.301] expected_err = 3.95 @@ -64,33 +64,37 @@ def test_shanks(): assert np.array_equal(np.around(b, decimals=3), expected_b) assert round(err, 3) == expected_err + def test_spike(): gn = np.array([-0.2, 0, 1]) h, err = modeling.spike(gn, 4, 11) d = np.convolve(h, gn) - logger.warning(f"{h=}") - logger.warning(f"{err=}") - logger.warning(f"{d=}, {np.argmax(d)=}") + logger.info(f"{h=}") + logger.info(f"{err=}") + logger.info(f"{d=}, {np.argmax(d)=}") + def test_ipf(): ... + def test_acm(): x = np.ones(20) x[1::2] = x[1::2] * -1 - logger.warning(x) + logger.info(x) a, err = modeling.acm(x, 2) - logging.warning(f"{a=}") - logging.warning(f"{err=}") + logger.info(f"{a=}") + logger.info(f"{err=}") + def test_covm(): x = np.ones(20) x[1::2] = x[1::2] * -1 - logger.warning(x) + logger.info(x) a, err = modeling.covm(x, 1) - logging.warning(f"{a=}") - logging.warning(f"{err=}") + logger.info(f"{a=}") + logger.info(f"{err=}") def test_durbin(): @@ -99,7 +103,7 @@ def test_durbin(): zeros, poles, _ = signal.tf2zpk([1], ap) px = system.ARMA(p=poles, q=[1], N=64) - logger.warning(f"{px=}") + logger.info(f"{px=}") rx = np.correlate(px, px, "same") - logger.warning(modeling.durbin(rx, p=4, q=0)) \ No newline at end of file + logger.info(modeling.durbin(rx, p=4, q=0))