diff --git a/pyttb/__init__.py b/pyttb/__init__.py index 99fc2ed8..1cf0bebb 100644 --- a/pyttb/__init__.py +++ b/pyttb/__init__.py @@ -9,6 +9,7 @@ from pyttb.cp_als import cp_als from pyttb.cp_apr import * from pyttb.export_data import export_data +from pyttb.hosvd import hosvd from pyttb.import_data import import_data from pyttb.khatrirao import khatrirao from pyttb.ktensor import ktensor diff --git a/pyttb/hosvd.py b/pyttb/hosvd.py new file mode 100644 index 00000000..4c9cb60b --- /dev/null +++ b/pyttb/hosvd.py @@ -0,0 +1,138 @@ +"""Higher Order SVD Implementation""" +import warnings +from typing import List, Optional + +import numpy as np +import scipy + +import pyttb as ttb + + +def hosvd( + tensor, + tol: float, + verbosity: float = 1, + dimorder: Optional[List[int]] = None, + sequential: bool = True, + ranks: Optional[List[int]] = None, +): + """Compute sequentially-truncated higher-order SVD (Tucker). + + Computes a Tucker decomposition with relative error + specified by tol, i.e., it computes a ttensor T such that + ||X-T||/||X|| <= tol. + + Parameters + ---------- + tensor: Tensor to factor + tol: Relative error to stop at + verbosity: Print level + dimorder: Order to loop through dimensions + sequential: Use sequentially-truncated version + ranks: Specify ranks to consider rather than computing + + Example + ------- + >>> data = np.array([[29, 39.], [63., 85.]]) + >>> tol = 1e-4 + >>> disable_printing = -1 + >>> tensorInstance = ttb.tensor().from_data(data) + >>> result = hosvd(tensorInstance, tol, verbosity=disable_printing) + >>> ((result.full() - tensorInstance).norm() / tensorInstance.norm()) < tol + True + """ + # In tucker als this is N + d = tensor.ndims + + if ranks is not None: + if len(ranks) != d: + raise ValueError( + f"Ranks must be a list of length tensor ndims. Ndims: {d} but got " + f"ranks: {ranks}." + ) + else: + ranks = [0] * d + + # Set up dimorder if not specified (this is copy past from tucker_als + if not dimorder: + dimorder = list(range(d)) + else: + if not isinstance(dimorder, list): + raise ValueError("Dimorder must be a list") + elif tuple(range(d)) != tuple(sorted(dimorder)): + raise ValueError( + "Dimorder must be a list or permutation of range(tensor.ndims)" + ) + + # TODO should unify printing throughout. Probably easier to use python logging levels + if verbosity > 0: + print("Computing HOSVD...\n") + + normxsqr = (tensor**2).collapse() + eigsumthresh = ((tol**2) * normxsqr) / d + + if verbosity > 2: + print( + f"||X||^2 = {normxsqr: g}\n" + f"tol = {tol: g}\n" + f"eigenvalue sum threshold = tol^2 ||X||^2 / d = {eigsumthresh: g}" + ) + + # Main Loop + factor_matrices = [np.empty(1)] * d + # Copy input tensor, shrinks every step for sequential + Y = ttb.tensor.from_tensor_type(tensor) + + for k in dimorder: + # Compute Gram matrix + Yk = ttb.tenmat.from_tensor_type(Y, np.array([k])).double() + Z = np.dot(Yk, Yk.transpose()) + + # Compute eigenvalue decomposition + D, V = scipy.linalg.eigh(Z) + pi = np.argsort(-D, kind="quicksort") + eigvec = D[pi] + + # If rank not provided compute it. + if ranks[k] == 0: + eigsum = np.cumsum(eigvec[::-1]) + eigsum = eigsum[::-1] + ranks[k] = np.where(eigsum > eigsumthresh)[0][-1] + + if verbosity > 5: + print(f"Reverse cummulative sum of evals of Gram matrix:") + for i in range(len(eigsum)): + print_msg = f"{i: d}: {eigsum[i]: 6.4f}" + if i == ranks[k]: + print_msg += " <-- Cutoff" + print(print_msg) + + # Extract factor matrix b picking leading eigenvectors of V + # NOTE: Plus 1 in pi slice for inclusive range to match MATLAB + factor_matrices[k] = V[:, pi[0 : ranks[k] + 1]] + + # Shrink! + if sequential: + Y = Y.ttm(factor_matrices[k].transpose(), k) + # Extract final core + if sequential: + G = Y + else: + G = Y.ttm(factor_matrices, transpose=True) + + result = ttb.ttensor.from_data(G, factor_matrices) + + if verbosity > 0: + diffnormsqr = ((tensor - result.full()) ** 2).collapse() + relnorm = np.sqrt(diffnormsqr / normxsqr) + print(f" Size of core: {G.shape}") + if relnorm <= tol: + print(f"||X-T||/||X|| = {relnorm: g} <=" f"{tol: f} (tol)") + else: + print( + "Tolerance not satisfied!! " + f"||X-T||/||X|| = {relnorm: g} >=" + f"{tol: f} (tol)" + ) + warnings.warn("Specified tolerance was not achieved") + return result diff --git a/pyttb/tensor.py b/pyttb/tensor.py index 5438d8f6..59b772b7 100644 --- a/pyttb/tensor.py +++ b/pyttb/tensor.py @@ -7,7 +7,6 @@ import warnings from itertools import permutations from math import factorial -from numbers import Real from typing import Any, Callable, List, Literal, Optional, Tuple, Union import numpy as np @@ -181,9 +180,9 @@ def collapse( self, dims: Optional[np.ndarray] = None, fun: Union[ - Literal["sum"], Callable[[np.ndarray], Union[Real, np.ndarray]] + Literal["sum"], Callable[[np.ndarray], Union[float, np.ndarray]] ] = "sum", - ) -> Union[Real, np.ndarray, tensor]: + ) -> Union[float, np.ndarray, tensor]: """ Collapse tensor along specified dimensions. @@ -389,7 +388,7 @@ def full(self) -> tensor: """ return ttb.tensor.from_data(self.data) - def innerprod(self, other: Union[tensor, ttb.sptensor, ttb.ktensor]) -> Real: + def innerprod(self, other: Union[tensor, ttb.sptensor, ttb.ktensor]) -> float: """ Efficient inner product with a tensor @@ -542,7 +541,7 @@ def issymmetric( return bool((all_diffs == 0).all()) return bool((all_diffs == 0).all()), all_diffs, all_perms - def logical_and(self, B: Union[Real, tensor]) -> tensor: + def logical_and(self, B: Union[float, tensor]) -> tensor: """ Logical and for tensors @@ -578,7 +577,7 @@ def logical_not(self) -> tensor: """ return ttb.tensor.from_data(np.logical_not(self.data)) - def logical_or(self, other: Union[Real, tensor]) -> tensor: + def logical_or(self, other: Union[float, tensor]) -> tensor: """ Logical or for tensors @@ -598,7 +597,7 @@ def tensor_or(x, y): return ttb.tt_tenfun(tensor_or, self, other) - def logical_xor(self, other: Union[Real, tensor]) -> tensor: + def logical_xor(self, other: Union[float, tensor]) -> tensor: """ Logical xor for tensors @@ -867,7 +866,7 @@ def reshape(self, shape: Tuple[int, ...]) -> tensor: return ttb.tensor.from_data(np.reshape(self.data, shape, order="F"), shape) - def squeeze(self) -> Union[tensor, np.ndarray, Real]: + def squeeze(self) -> Union[tensor, np.ndarray, float]: """ Removes singleton dimensions from a tensor @@ -1029,7 +1028,7 @@ def symmetrize( def ttm( self, matrix: Union[np.ndarray, List[np.ndarray]], - dims: Optional[Union[Real, np.ndarray]] = None, + dims: Optional[Union[float, np.ndarray]] = None, transpose: bool = False, ) -> tensor: """ @@ -1046,7 +1045,7 @@ def ttm( dims = np.arange(self.ndims) elif isinstance(dims, list): dims = np.array(dims) - elif isinstance(dims, Real): + elif isinstance(dims, (float, int, np.generic)): dims = np.array([dims]) if isinstance(matrix, list): @@ -1060,7 +1059,7 @@ def ttm( return Y if not isinstance(matrix, np.ndarray): - assert False, "matrix must be of type numpy.ndarray" + assert False, f"matrix must be of type numpy.ndarray but got:\n{matrix}" if not (dims.size == 1 and np.isin(dims, np.arange(self.ndims))): assert False, "dims must contain values in [0,self.dims)" diff --git a/tests/test_hosvd.py b/tests/test_hosvd.py new file mode 100644 index 00000000..c0038e02 --- /dev/null +++ b/tests/test_hosvd.py @@ -0,0 +1,67 @@ +import numpy as np +import pytest + +import pyttb as ttb + + +@pytest.fixture() +def sample_tensor(): + data = np.array([[29, 39.0], [63.0, 85.0]]) + shape = (2, 2) + params = {"data": data, "shape": shape} + tensorInstance = ttb.tensor().from_data(data, shape) + return params, tensorInstance + + +@pytest.mark.indevelopment +def test_hosvd_simple_convergence(capsys, sample_tensor): + (data, T) = sample_tensor + tol = 1e-4 + result = ttb.hosvd(T, tol) + assert (result.full() - T).norm() / T.norm() < tol, f"Failed to converge" + + tol = 1e-4 + result = ttb.hosvd(T, tol, sequential=False) + assert ( + result.full() - T + ).norm() / T.norm() < tol, f"Failed to converge for non-sequential option" + + impossible_tol = 1e-20 + with pytest.warns(UserWarning): + result = ttb.hosvd(T, impossible_tol) + assert ( + result.full() - T + ).norm() / T.norm() > impossible_tol, f"Converged beyond provided precision" + + +@pytest.mark.indevelopment +def test_hosvd_default_init(capsys, sample_tensor): + (data, T) = sample_tensor + _ = ttb.hosvd(T, 1) + + +@pytest.mark.indevelopment +def test_hosvd_smoke_test_verbosity(capsys, sample_tensor): + """For now just make sure verbosity calcs don't crash""" + (data, T) = sample_tensor + ttb.hosvd(T, 1, verbosity=10) + + +@pytest.mark.indevelopment +def test_hosvd_incorrect_ranks(capsys, sample_tensor): + (data, T) = sample_tensor + ranks = list(range(T.ndims - 1)) + with pytest.raises(ValueError): + _ = ttb.hosvd(T, 1, ranks=ranks) + + +@pytest.mark.indevelopment +def test_hosvd_incorrect_dimorder(capsys, sample_tensor): + (data, T) = sample_tensor + dimorder = list(range(T.ndims - 1)) + with pytest.raises(ValueError): + _ = ttb.hosvd(T, 1, dimorder=dimorder) + + dimorder = 1 + with pytest.raises(ValueError): + _ = ttb.hosvd(T, 1, dimorder=dimorder)