Skip to content

Commit

Permalink
Resolve #18: Levinson (chapter 5) implementation coverage testing
Browse files Browse the repository at this point in the history
- removed prints from implementations and tests, added proper logging instead
  • Loading branch information
mahdi authored and mahdiolfat committed Jan 31, 2024
1 parent 0d3dc09 commit 204321d
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 69 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ __pycache__
*.egg-info
env
.pyenv*
.env*
.coverage
.vscode

.ipynb_checkpoints

dsp

# images
*.svg
22 changes: 11 additions & 11 deletions pyssp/lattice.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Implementation of algorithm from Chapter 6."""

import logging
from typing import NoReturn

import numpy as np
import scipy as sp
from numpy.typing import ArrayLike

logger = logging.getLogger(__name__)


def fcov(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]:
"""Figure 6.15, Page 310.
Expand All @@ -26,19 +29,18 @@ def fcov(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]:
err = np.empty((p, 1))

for j in range(p):
print(j)
logger.info(j)
N = N - 1
# print(f"{eplus=}, {eplus.shape=}")
# print(f"{eminus=}, {eminus.shape=}")
logger.info(f"{eplus=}, {eplus.shape=}")
logger.info(f"{eminus=}, {eminus.shape=}")
gamma[j] = (np.transpose(-eminus) @ eplus) / (np.transpose(eminus) @ eminus)
temp1 = eplus + gamma[j] * eminus
temp2 = eminus + np.conjugate(gamma[j]) * eplus
err[j] = np.transpose(temp1) @ temp1
eplus = temp1[1:N]
eminus = temp2[:N - 1]
print(gamma)
print(err)
print()
logger.info(gamma)
logger.info(err)

return gamma, err

Expand All @@ -60,10 +62,10 @@ def burg(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]:
err = np.empty((p, 1))

for j in range(p):
print(j)
logger.info(j)
N = N - 1
# print(f"{eplus=}, {eplus.shape=}")
# print(f"{eminus=}, {eminus.shape=}")
logger.info(f"{eplus=}, {eplus.shape=}")
logger.info(f"{eminus=}, {eminus.shape=}")
eplusmag = np.transpose(eplus) @ eplus
eminusmag = np.transpose(eplus) @ eplus
gamma[j] = (np.transpose(-2 * eminus) @ eplus) / (eplusmag + eminusmag)
Expand All @@ -72,7 +74,6 @@ def burg(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]:
err[j] = np.transpose(temp1) @ temp1 + np.transpose(temp2) @ temp2
eplus = temp1[1:N]
eminus = temp2[:N - 1]
print()

return gamma, err

Expand Down Expand Up @@ -105,7 +106,6 @@ def mcov(x: ArrayLike, p: int) -> tuple[ArrayLike, ArrayLike]:
b = b1 + b2
a = sp.linalg.solve_toeplitz(Rx[:, 1], b)
a = np.concatenate(([1], a))
# print(a.shape)
err = np.dot(R[0], a) + np.dot(np.flip(R[p]), a)

return a, err
73 changes: 45 additions & 28 deletions pyssp/levinson.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Chapter 5 algorithm implementations."""

import logging
from typing import NoReturn

import numpy as np
from numpy.typing import ArrayLike

logger = logging.getLogger()


def rtoa(r: ArrayLike) -> tuple[np.ndarray, float]:
"""Recursively map a set of autocorrelations to a set of model parameters.
Expand All @@ -22,8 +26,8 @@ def rtoa(r: ArrayLike) -> tuple[np.ndarray, float]:
anT = np.conjugate(np.flipud(a))
a = an + gamma * np.concatenate((np.zeros(1), anT.ravel())).reshape(-1, 1)
epsilon = epsilon * (1 - np.abs(gamma)**2)
print(f"{gamma=},\n{a=},\n{epsilon=}\n")
return a, epsilon
logger.info(f"{gamma=},\n{a=},\n{epsilon=}\n")
return a.ravel(), epsilon.ravel()[0]


def gtoa(gamma: ArrayLike) -> ArrayLike:
Expand All @@ -40,13 +44,13 @@ def gtoa(gamma: ArrayLike) -> ArrayLike:
a = np.ones((1, 1))
_g = np.array(gamma)
p = len(_g)
for j in range(1, p):
for j in range(1, p + 1):
a = np.concatenate((a, np.zeros((1, 1))))
_a = a.copy()
af = np.conjugate(np.flipud(_a))
a = a + _g[j] * af
a = a + _g[j - 1] * af

return a
return a.ravel()


def atog(a: ArrayLike) -> ArrayLike:
Expand All @@ -70,23 +74,25 @@ def atog(a: ArrayLike) -> ArrayLike:
gamma[p - 2] = _a[p - 2]

for j in range(p - 2, 0, -1):
# print(f"{gamma=}, {_a=}")
# logger.info(f"{gamma=}, {_a=}")
ai1 = _a[:j].copy()
ai2 = _a[:j].copy()
af = np.flipud(np.conjugate(ai1))
# print(f"{ai1=}, {ai2=}, {af=}")
# logger.info(f"{ai1=}, {ai2=}, {af=}")
s1 = ai2 - gamma[j] * af
s2 = 1 - np.abs(gamma[j])**2
_a = np.divide(s1, s2)
# print(f"{s1=}, {s2=}, {_a=}")
# logger.info(f"{s1=}, {s2=}, {_a=}")
gamma[j - 1] = _a[j - 1]

return gamma
return gamma.ravel()


def gtor(gamma: ArrayLike, epsilon: None | float = None) -> ArrayLike:
"""Find the autocorrelation sequence from the reflection coefficients and the modeling error.
Also called the Inverse Levinson-Durbin Recursion.
Page 241, Figure 5.9.
"""
_g = np.array(gamma)
Expand All @@ -99,19 +105,18 @@ def gtor(gamma: ArrayLike, epsilon: None | float = None) -> ArrayLike:
aa0 = np.concatenate((aa, np.zeros((1, 1)))).reshape(-1, 1)
aaf = np.conjugate(np.flipud(aa1))
aa = aa0 + _g[j] * aaf
print(aa)
logger.info(aa)
rf = -np.fliplr(r) @ aa
print(rf)
print(rf.shape)
print(r)
print(r.shape)
print()
logger.info(rf)
logger.info(rf.shape)
logger.info(r)
logger.info(r.shape)
r = np.concatenate((r[0], rf[0])).reshape(1, -1)

if epsilon is not None:
r = r * epsilon / np.prod(1 - np.abs(gamma)**2)

return r
return r.ravel()


def ator(a: ArrayLike, b: float) -> ArrayLike:
Expand Down Expand Up @@ -147,27 +152,39 @@ def glev(r: ArrayLike, b: ArrayLike) -> ArrayLike:
x = np.array([_b[0] / _r[0]]).reshape(-1, 1)
epsilon = _r[0]
for j in range(1, p):
print(j)
print(f"{_r=}, {_r.shape=}")
logger.info(j)
logger.info(f"{_r=}, {_r.shape=}")
_r1 = np.transpose(np.array(_r[1:j + 1]))
print(f"{_r1=}, {_r1.shape=}")
print(f"{x=}, {x.shape=}")
print(f"{a=}, {a.shape=}")
logger.info(f"{_r1=}, {_r1.shape=}")
logger.info(f"{x=}, {x.shape=}")
logger.info(f"{a=}, {a.shape=}")
g = _r1 @ np.flipud(a)
print(f"{g=}, {g.shape=}")
logger.info(f"{g=}, {g.shape=}")
gamma = -g / epsilon
print(f"{gamma=}, {gamma.shape=}")
logger.info(f"{gamma=}, {gamma.shape=}")
_a0 = np.concatenate([a, [[0]]])
_af = np.conjugate(np.flipud(_a0))
print(f"{_a0=}, {_a0.shape=}")
print(f"{_af=}, {_af.shape=}")
logger.info(f"{_a0=}, {_a0.shape=}")
logger.info(f"{_af=}, {_af.shape=}")
a = _a0 + gamma * _af
epsilon = epsilon * (1 - np.abs(gamma)**2)
print(f"{epsilon=}")
logger.info(f"{epsilon=}")
delta = _r1 @ np.flipud(x)
q = (_b[j] - delta[0, 0]) / epsilon
x = np.concatenate([x, [[0]]])
x = x + q * np.conjugate(np.flipud(a))
print()

return x
return x.ravel()


def shur_cohn_test() -> NoReturn:
"""Check stability of any rational filter."""
raise NotImplementedError()


def splitlev(r: ArrayLike, b: ArrayLike) -> ArrayLike:
"""Implement the split levinson recursion.
Table 5.7, Page 274.
"""
raise NotImplementedError()
2 changes: 1 addition & 1 deletion pyssp/modeling.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def covm(x: ArrayLike, p: int)-> tuple[ArrayLike, float]:
X = convm(_x, p + 1)
Xq = X[p - 1:N - 1, :p].copy()
Xsol = np.linalg.lstsq(-Xq, X[p:N, 0], rcond=None)[0]
logger.warning(f"{Xsol=}")
logger.info(f"{Xsol=}")
a = np.hstack(([1], Xsol))
err = np.abs(X[p:N,0] @ X[p:N,] @ a)
return a, err.ravel()[0]
Expand Down
11 changes: 7 additions & 4 deletions pyssp/optimal.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""Optimal Filters, Chapter 7."""

import logging
from typing import NoReturn

import numpy as np
from numpy.typing import ArrayLike

logger = logging.getLogger(__name__)


def kalman(y: list[float], A: ArrayLike, C: ArrayLike, sigmaw: list[float],
sigmav: list[float]) -> tuple[np.ndarray, ...]:
Expand All @@ -22,11 +25,11 @@ def kalman(y: list[float], A: ArrayLike, C: ArrayLike, sigmaw: list[float],
Qw = np.diag(np.array(sigmaw, ndmin=1))
Qv = np.diag(np.array(sigmav, ndmin=1))
N = np.shape(_y)[1]
print(_y)
logger.info(_y)

p = np.shape(_A)[0]
q = np.shape(_C)[0]
print(f"{p}, {q}, {N=}")
logger.info(f"{p}, {q}, {N=}")

# The a priori error covariance matrix
P0 = np.zeros((N + 1, p, p))
Expand Down Expand Up @@ -66,12 +69,12 @@ def kalman(y: list[float], A: ArrayLike, C: ArrayLike, sigmaw: list[float],
return P0, P1, K, xhat0, xhat1


def wiener_denoise() -> None:
def wiener_denoise() -> NoReturn:
"""Denoising based on IIR wiener filters."""
raise NotImplementedError()


def wiener_systemid() -> None:
def wiener_systemid() -> NoReturn:
"""Systemid based on FIR wiener filters."""
raise NotImplementedError()

Expand Down
85 changes: 77 additions & 8 deletions tests/test_levinson.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,93 @@
""""""
"""Test validation of levinson based recursion routines."""

import logging

import numpy as np
from pyssp.levinson import glev, gtor
from pyssp import levinson


logger = logging.getLogger(__name__)


def test_glev():
'''Example 5.3.1, Page 266'''
r = [4, 2, 1]
b = [9, 6, 12]

res = glev(r, b)
print(res)
expected = [2, -1, 3]

sol = levinson.glev(r, b)
logger.info(sol)

assert np.allclose(sol, expected)


def test_gtor():
def test_gtor() -> None:
'''Based on example 5.2.6'''
expected_rx = np.array([2, -1, -1/4, 1/8])

gamma = [1/2, 1/2, 1/2]
epsilon = 2 * (3 / 4)**3
res = gtor(gamma, epsilon)
true_results = np.array([2, -1, -1/4, 1/8])
print(res)
rx = levinson.gtor(gamma, epsilon)
assert np.allclose(rx, expected_rx)


def test_atog() -> None:

a = [1, 0.5, -0.1, -0.5]
expected_g = np.array([0.5, 0.2, -0.5])

gamma = levinson.atog(a)
logger.info(gamma)
logger.info(expected_g)

assert np.allclose(gamma, expected_g)


def test_rtog() -> None:
rx = [2, -1, -1/4, 1/8]
expected_g = np.array([0.5, 0.5, 0.5])

gamma = levinson.rtog(rx)
logger.info(gamma)
logger.info(expected_g)

assert np.allclose(gamma, expected_g)


def test_ator() -> None:
a = [1, 1, 7/8, 1/2]
epsilon = 2 * (3 / 4)**3
b = epsilon**2
expected_rx = np.array([2, -1, -1/4, 1/8])

rx = levinson.ator(a, b = b)
logger.info(rx)
logger.info(expected_rx)

assert np.array_equal(rx, expected_rx)


def test_gtoa() -> None:
gamma = [0.5, 0.2, -0.5]
expected_a = np.array([1, 0.5, -0.1, -0.5])

a = levinson.gtoa(gamma)
logger.info(a)
logger.info(expected_a)

assert np.allclose(a, expected_a)

def test_rtoa() -> None:
rx = np.array([2, -1, -1/4, 1/8])
expected_a = [1, 1, 7/8, 1/2]
expected_eps = 2 * (3 / 4)**3

a, eps = levinson.rtoa(rx)

logger.info(f"{a=}")
logger.info(f"{expected_a=}")
logger.info(f"{eps=}")
logger.info(f"{expected_eps=}")

assert np.allclose(a, expected_a) and np.allclose(eps, expected_eps)
Loading

0 comments on commit 204321d

Please sign in to comment.