Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hosvd #67

Merged
merged 5 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyttb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pyttb.cp_als import cp_als
from pyttb.cp_apr import *
from pyttb.export_data import export_data
from pyttb.hosvd import hosvd
from pyttb.import_data import import_data
from pyttb.khatrirao import khatrirao
from pyttb.ktensor import ktensor
Expand Down
138 changes: 138 additions & 0 deletions pyttb/hosvd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Higher Order SVD Implementation"""
import warnings
from typing import List, Optional

import numpy as np
import scipy

import pyttb as ttb


def hosvd(
tensor,
tol: float,
verbosity: float = 1,
dimorder: Optional[List[int]] = None,
sequential: bool = True,
ranks: Optional[List[int]] = None,
):
"""Compute sequentially-truncated higher-order SVD (Tucker).

Computes a Tucker decomposition with relative error
specified by tol, i.e., it computes a ttensor T such that
||X-T||/||X|| <= tol.

Parameters
----------
tensor: Tensor to factor
tol: Relative error to stop at
verbosity: Print level
dimorder: Order to loop through dimensions
sequential: Use sequentially-truncated version
ranks: Specify ranks to consider rather than computing

Example
-------
>>> data = np.array([[29, 39.], [63., 85.]])
>>> tol = 1e-4
>>> disable_printing = -1
>>> tensorInstance = ttb.tensor().from_data(data)
>>> result = hosvd(tensorInstance, tol, verbosity=disable_printing)
>>> ((result.full() - tensorInstance).norm() / tensorInstance.norm()) < tol
True
"""
# In tucker als this is N
d = tensor.ndims

if ranks is not None:
if len(ranks) != d:
raise ValueError(
f"Ranks must be a list of length tensor ndims. Ndims: {d} but got "
f"ranks: {ranks}."
)
else:
ranks = [0] * d

# Set up dimorder if not specified (this is copy past from tucker_als
if not dimorder:
dimorder = list(range(d))
else:
if not isinstance(dimorder, list):
raise ValueError("Dimorder must be a list")
elif tuple(range(d)) != tuple(sorted(dimorder)):
raise ValueError(
"Dimorder must be a list or permutation of range(tensor.ndims)"
)

# TODO should unify printing throughout. Probably easier to use python logging levels
if verbosity > 0:
print("Computing HOSVD...\n")

normxsqr = (tensor**2).collapse()
eigsumthresh = ((tol**2) * normxsqr) / d

if verbosity > 2:
print(
f"||X||^2 = {normxsqr: g}\n"
f"tol = {tol: g}\n"
f"eigenvalue sum threshold = tol^2 ||X||^2 / d = {eigsumthresh: g}"
)

# Main Loop
factor_matrices = [np.empty(1)] * d
# Copy input tensor, shrinks every step for sequential
Y = ttb.tensor.from_tensor_type(tensor)

for k in dimorder:
# Compute Gram matrix
Yk = ttb.tenmat.from_tensor_type(Y, np.array([k])).double()
Z = np.dot(Yk, Yk.transpose())

# Compute eigenvalue decomposition
D, V = scipy.linalg.eigh(Z)
pi = np.argsort(-D, kind="quicksort")
eigvec = D[pi]

# If rank not provided compute it.
if ranks[k] == 0:
eigsum = np.cumsum(eigvec[::-1])
eigsum = eigsum[::-1]
ranks[k] = np.where(eigsum > eigsumthresh)[0][-1]

if verbosity > 5:
print(f"Reverse cummulative sum of evals of Gram matrix:")
for i in range(len(eigsum)):
print_msg = f"{i: d}: {eigsum[i]: 6.4f}"
if i == ranks[k]:
print_msg += " <-- Cutoff"
print(print_msg)

# Extract factor matrix b picking leading eigenvectors of V
# NOTE: Plus 1 in pi slice for inclusive range to match MATLAB
factor_matrices[k] = V[:, pi[0 : ranks[k] + 1]]

# Shrink!
if sequential:
Y = Y.ttm(factor_matrices[k].transpose(), k)
# Extract final core
if sequential:
G = Y
else:
G = Y.ttm(factor_matrices, transpose=True)

result = ttb.ttensor.from_data(G, factor_matrices)

if verbosity > 0:
diffnormsqr = ((tensor - result.full()) ** 2).collapse()
relnorm = np.sqrt(diffnormsqr / normxsqr)
print(f" Size of core: {G.shape}")
if relnorm <= tol:
print(f"||X-T||/||X|| = {relnorm: g} <=" f"{tol: f} (tol)")
else:
print(
"Tolerance not satisfied!! "
f"||X-T||/||X|| = {relnorm: g} >="
f"{tol: f} (tol)"
)
warnings.warn("Specified tolerance was not achieved")
return result
21 changes: 10 additions & 11 deletions pyttb/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import warnings
from itertools import permutations
from math import factorial
from numbers import Real
from typing import Any, Callable, List, Literal, Optional, Tuple, Union

import numpy as np
Expand Down Expand Up @@ -181,9 +180,9 @@ def collapse(
self,
dims: Optional[np.ndarray] = None,
fun: Union[
Literal["sum"], Callable[[np.ndarray], Union[Real, np.ndarray]]
Literal["sum"], Callable[[np.ndarray], Union[float, np.ndarray]]
] = "sum",
) -> Union[Real, np.ndarray, tensor]:
) -> Union[float, np.ndarray, tensor]:
"""
Collapse tensor along specified dimensions.

Expand Down Expand Up @@ -389,7 +388,7 @@ def full(self) -> tensor:
"""
return ttb.tensor.from_data(self.data)

def innerprod(self, other: Union[tensor, ttb.sptensor, ttb.ktensor]) -> Real:
def innerprod(self, other: Union[tensor, ttb.sptensor, ttb.ktensor]) -> float:
"""
Efficient inner product with a tensor

Expand Down Expand Up @@ -542,7 +541,7 @@ def issymmetric(
return bool((all_diffs == 0).all())
return bool((all_diffs == 0).all()), all_diffs, all_perms

def logical_and(self, B: Union[Real, tensor]) -> tensor:
def logical_and(self, B: Union[float, tensor]) -> tensor:
"""
Logical and for tensors

Expand Down Expand Up @@ -578,7 +577,7 @@ def logical_not(self) -> tensor:
"""
return ttb.tensor.from_data(np.logical_not(self.data))

def logical_or(self, other: Union[Real, tensor]) -> tensor:
def logical_or(self, other: Union[float, tensor]) -> tensor:
"""
Logical or for tensors

Expand All @@ -598,7 +597,7 @@ def tensor_or(x, y):

return ttb.tt_tenfun(tensor_or, self, other)

def logical_xor(self, other: Union[Real, tensor]) -> tensor:
def logical_xor(self, other: Union[float, tensor]) -> tensor:
"""
Logical xor for tensors

Expand Down Expand Up @@ -867,7 +866,7 @@ def reshape(self, shape: Tuple[int, ...]) -> tensor:

return ttb.tensor.from_data(np.reshape(self.data, shape, order="F"), shape)

def squeeze(self) -> Union[tensor, np.ndarray, Real]:
def squeeze(self) -> Union[tensor, np.ndarray, float]:
"""
Removes singleton dimensions from a tensor

Expand Down Expand Up @@ -1029,7 +1028,7 @@ def symmetrize(
def ttm(
self,
matrix: Union[np.ndarray, List[np.ndarray]],
dims: Optional[Union[Real, np.ndarray]] = None,
dims: Optional[Union[float, np.ndarray]] = None,
transpose: bool = False,
) -> tensor:
"""
Expand All @@ -1046,7 +1045,7 @@ def ttm(
dims = np.arange(self.ndims)
elif isinstance(dims, list):
dims = np.array(dims)
elif isinstance(dims, Real):
elif isinstance(dims, (float, int, np.generic)):
dims = np.array([dims])

if isinstance(matrix, list):
Expand All @@ -1060,7 +1059,7 @@ def ttm(
return Y

if not isinstance(matrix, np.ndarray):
assert False, "matrix must be of type numpy.ndarray"
assert False, f"matrix must be of type numpy.ndarray but got:\n{matrix}"

if not (dims.size == 1 and np.isin(dims, np.arange(self.ndims))):
assert False, "dims must contain values in [0,self.dims)"
Expand Down
67 changes: 67 additions & 0 deletions tests/test_hosvd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import numpy as np
import pytest

import pyttb as ttb


@pytest.fixture()
def sample_tensor():
data = np.array([[29, 39.0], [63.0, 85.0]])
shape = (2, 2)
params = {"data": data, "shape": shape}
tensorInstance = ttb.tensor().from_data(data, shape)
return params, tensorInstance


@pytest.mark.indevelopment
def test_hosvd_simple_convergence(capsys, sample_tensor):
(data, T) = sample_tensor
tol = 1e-4
result = ttb.hosvd(T, tol)
assert (result.full() - T).norm() / T.norm() < tol, f"Failed to converge"

tol = 1e-4
result = ttb.hosvd(T, tol, sequential=False)
assert (
result.full() - T
).norm() / T.norm() < tol, f"Failed to converge for non-sequential option"

impossible_tol = 1e-20
with pytest.warns(UserWarning):
result = ttb.hosvd(T, impossible_tol)
assert (
result.full() - T
).norm() / T.norm() > impossible_tol, f"Converged beyond provided precision"


@pytest.mark.indevelopment
def test_hosvd_default_init(capsys, sample_tensor):
(data, T) = sample_tensor
_ = ttb.hosvd(T, 1)


@pytest.mark.indevelopment
def test_hosvd_smoke_test_verbosity(capsys, sample_tensor):
"""For now just make sure verbosity calcs don't crash"""
(data, T) = sample_tensor
ttb.hosvd(T, 1, verbosity=10)


@pytest.mark.indevelopment
def test_hosvd_incorrect_ranks(capsys, sample_tensor):
(data, T) = sample_tensor
ranks = list(range(T.ndims - 1))
with pytest.raises(ValueError):
_ = ttb.hosvd(T, 1, ranks=ranks)


@pytest.mark.indevelopment
def test_hosvd_incorrect_dimorder(capsys, sample_tensor):
(data, T) = sample_tensor
dimorder = list(range(T.ndims - 1))
with pytest.raises(ValueError):
_ = ttb.hosvd(T, 1, dimorder=dimorder)

dimorder = 1
with pytest.raises(ValueError):
_ = ttb.hosvd(T, 1, dimorder=dimorder)