Skip to content

Commit

Permalink
MRG, ENH: Add method to project onto max power ori (mne-tools#7883)
Browse files Browse the repository at this point in the history
* ENH: Add method to project onto max power ori

* FIX: Flake

* FIX: Naming

* FIX: dtypes and reprs

* API: Rename and deprecate

* FIX: Doc [skip travis]
  • Loading branch information
larsoner authored Jun 23, 2020
1 parent e4ace23 commit 181a572
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 26 deletions.
4 changes: 4 additions & 0 deletions doc/changes/latest.inc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Changelog

- Add :meth:`mne.MixedSourceEstimate.surface` and :meth:`mne.MixedSourceEstimate.volume` methods to allow surface and volume extraction by `Eric Larson`_

- Add :meth:`mne.VectorSourceEstimate.project` to project vector source estimates onto the direction of maximum source power by `Eric Larson`_

- Add support to :func:`mne.extract_label_time_course` for vector-valued and volumetric source estimates by `Eric Larson`_

- Add method :meth:`mne.VolSourceEstimate.in_label` by `Eric Larson`_
Expand Down Expand Up @@ -233,6 +235,8 @@ API

- The method ``stc_mixed.plot_surface`` for a :class:`mne.MixedSourceEstimate` has been deprecated in favor of :meth:`stc.surface().plot(...) <mne.MixedSourceEstimate.surface>` by `Eric Larson`_

- The method ``stc.normal`` for :class:`mne.VectorSourceEstimate` has been deprecated in favor of :meth:`stc.project('nn', src) <mne.VectorSourceEstimate.project>` by `Eric Larson`_

- Add ``use_dev_head_trans`` parameter to :func:`mne.preprocessing.annotate_movement` to allow choosing the device to head transform is used to define the fixed cHPI coordinates by `Luke Bloy`_

- The function ``mne.channels.read_dig_captrack`` will be deprecated in version 0.22 in favor of :func:`mne.channels.read_dig_captrak` to correct the spelling error: "captraCK" -> "captraK", by `Stefan Appelhoff`_
Expand Down
16 changes: 16 additions & 0 deletions examples/inverse/plot_vector_mne_solution.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#
# License: BSD (3-clause)

import numpy as np
import mne
from mne.datasets import sample
from mne.minimum_norm import read_inverse_operator, apply_inverse
Expand Down Expand Up @@ -52,6 +53,21 @@
brain = stc.plot(
initial_time=peak_time, hemi='lh', subjects_dir=subjects_dir)

###############################################################################
# Plot the activation in the direction of maximal power for this data:

stc_max, directions = stc.project('pca', src=inv['src'])
# These directions must by design be close to the normals because this
# inverse was computed with loose=0.2:
print('Absolute cosine similarity between source normals and directions: '
f'{np.abs(np.sum(directions * inv["source_nn"][2::3], axis=-1)).mean()}')
brain_max = stc_max.plot(
initial_time=peak_time, hemi='lh', subjects_dir=subjects_dir,
time_label='Max power')
brain_normal = stc.project('normal', inv['src'])[0].plot(
initial_time=peak_time, hemi='lh', subjects_dir=subjects_dir,
time_label='Normal')

###############################################################################
# You can also do this with a fixed-orientation inverse. It looks a lot like
# the result above because the ``loose=0.2`` orientation constraint keeps
Expand Down
24 changes: 19 additions & 5 deletions mne/minimum_norm/tests/test_inverse.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,28 @@ def test_localization_bias_fixed(bias_params_fixed, method, lower, upper,
('eLORETA', 99, 100, 0.8, 0.2),
('eLORETA', 99, 100, 0.8, 0.001),
])
@pytest.mark.parametrize('pick_ori', (None, 'vector'))
def test_localization_bias_loose(bias_params_fixed, method, lower, upper,
depth, loose):
depth, loose, pick_ori):
"""Test inverse localization bias for loose minimum-norm solvers."""
if pick_ori == 'vector' and method == 'eLORETA': # works, but save cycles
return
evoked, fwd, noise_cov, _, want = bias_params_fixed
fwd = convert_forward_solution(fwd, surf_ori=False, force_fixed=False)
assert not is_fixed_orient(fwd)
inv_loose = make_inverse_operator(evoked.info, fwd, noise_cov, loose=loose,
depth=depth)
loc = apply_inverse(evoked, inv_loose, lambda2, method).data
loc = apply_inverse(
evoked, inv_loose, lambda2, method, pick_ori=pick_ori)
if pick_ori is not None:
assert loc.data.ndim == 3
loc, directions = loc.project('pca', src=fwd['src'])
abs_cos_sim = np.abs(np.sum(
directions * inv_loose['source_nn'][2::3], axis=1))
assert np.percentile(abs_cos_sim, 10) > 0.9 # most very aligned
loc = abs(loc).data
else:
loc = loc.data
assert (loc >= 0).all()
# Compute the percentage of sources for which there is no loc bias:
perc = (want == np.argmax(loc, axis=0)).mean() * 100
Expand Down Expand Up @@ -535,7 +548,7 @@ def test_orientation_prior(bias_params_free, method, looses, vmin, vmax,
[_get_src_nn(s) for s in inv['src']]))
vec_stc_surf = np.matmul(rot, vec_stc.data)
if 0. in looses:
vec_stc_normal = vec_stc.normal(inv['src'])
vec_stc_normal, _ = vec_stc.project('normal', inv['src'])
assert_allclose(stcs[1].data, vec_stc_normal.data)
del vec_stc
assert_allclose(vec_stc_normal.data, vec_stc_surf[:, 2])
Expand Down Expand Up @@ -1065,8 +1078,9 @@ def test_inverse_mixed(all_src_types_inv_evoked):
stcs[kind].magnitude().data)
assert_allclose(getattr(stcs['mixed'], kind)().magnitude().data,
stcs[kind].magnitude().data)
assert_allclose(stcs['mixed'].surface().normal(surf_src).data,
stcs['surface'].normal(surf_src).data)
with pytest.deprecated_call():
assert_allclose(stcs['mixed'].surface().normal(surf_src).data,
stcs['surface'].normal(surf_src).data)


run_tests_if_main()
95 changes: 90 additions & 5 deletions mne/source_estimate.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from .cov import Covariance
from .evoked import _get_peak
from .filter import resample
from .fixes import einsum
from .io.constants import FIFF
from .surface import read_surface, _get_ico_surface, mesh_edges
from .source_space import (_ensure_src, _get_morph_src_reordering,
Expand Down Expand Up @@ -1786,6 +1785,8 @@ def magnitude(self):
data_mag, self.vertices, self.tmin, self.tstep, self.subject,
self.verbose)

@deprecated('stc.normal(src) is deprecated and will be removed in 0.22, '
'use stc.project("normal", src)[0] instead')
@fill_doc
def normal(self, src, use_cps=True):
"""Compute activity orthogonal to the cortex.
Expand All @@ -1806,13 +1807,97 @@ def normal(self, src, use_cps=True):
The source estimate only retaining the activity orthogonal to the
cortex.
"""
_check_src_normal('normal', src)
return self.project('normal', src, use_cps)[0]

def _get_src_normals(self, src, use_cps):
normals = np.vstack([_get_src_nn(s, use_cps, v) for s, v in
zip(src, self.vertices)])
data_norm = einsum('ijk,ij->ik', self.data, normals)
return self._scalar_class(
zip(src, self.vertices)])
return normals

@fill_doc
def project(self, directions, src=None, use_cps=True):
"""Project the data for each vertex in a given direction.
Parameters
----------
directions : ndarray, shape (n_vertices, 3) | str
Can be:
- ``'normal'``
Project onto the source space normals.
- ``'pca'``
SVD will be used to project onto the direction of maximal
power for each source.
- :class:`~numpy.ndarray`, shape (n_vertices, 3)
Projection directions for each source.
src : instance of SourceSpaces | None
The source spaces corresponding to the source estimate.
Not used when ``directions`` is an array, optional when
``directions='pca'``.
%(use_cps)s
Should be the same value that was used when the forward model
was computed (typically True).
Returns
-------
stc : instance of SourceEstimate
The projected source estimate.
directions : ndarray, shape (n_vertices, 3)
The directions that were computed (or just used).
Notes
-----
When using SVD, there is a sign ambiguity for the direction of maximal
power. When ``src is None``, the direction is chosen that makes the
resulting time waveform sum positive (i.e., have positive amplitudes).
When ``src`` is provided, the directions are flipped in the direction
of the source normals, i.e., outward from cortex for surface source
spaces and in the +Z / superior direction for volume source spaces.
.. versionadded:: 0.21
"""
_validate_type(directions, (str, np.ndarray), 'directions')
_validate_type(src, (None, SourceSpaces), 'src')
if isinstance(directions, str):
_check_option('directions', directions, ('normal', 'pca'),
extra='when str')

if directions == 'normal':
if src is None:
raise ValueError(
'If directions="normal", src cannot be None')
_check_src_normal('normal', src)
directions = self._get_src_normals(src, use_cps)
else:
assert directions == 'pca'
x = self.data
if not np.isrealobj(self.data):
_check_option('stc.data.dtype', self.data.dtype,
(np.complex64, np.complex128))
dtype = \
np.float32 if x.dtype == np.complex64 else np.float64
x = x.view(dtype)
assert x.shape[-1] == 2 * self.data.shape[-1]
u, _, v = np.linalg.svd(x, full_matrices=False)
directions = u[:, :, 0]
# The sign is arbitrary, so let's flip it in the direction that
# makes the resulting time series the most positive:
if src is None:
signs = np.sum(v[:, 0].real, axis=1, keepdims=True)
else:
normals = self._get_src_normals(src, use_cps)
signs = np.sum(directions * normals, axis=1, keepdims=True)
assert signs.shape == (self.data.shape[0], 1)
signs = np.sign(signs)
signs[signs == 0] = 1.
directions *= signs
_check_option(
'directions.shape', directions.shape, [(self.data.shape[0], 3)])
data_norm = np.matmul(directions[:, np.newaxis], self.data)[:, 0]
stc = self._scalar_class(
data_norm, self.vertices, self.tmin, self.tstep, self.subject,
self.verbose)
return stc, directions


class _BaseVolSourceEstimate(_BaseSourceEstimate):
Expand Down
83 changes: 70 additions & 13 deletions mne/tests/test_source_estimate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
assert_allclose, assert_equal)
import pytest
from scipy import sparse
from scipy.optimize import fmin_cobyla

from mne import (stats, SourceEstimate, VectorSourceEstimate,
VolSourceEstimate, Label, read_source_spaces,
Expand Down Expand Up @@ -1119,23 +1120,32 @@ def test_mixed_stc(tmpdir):
(VolVectorSourceEstimate, 'discrete'),
(MixedVectorSourceEstimate, 'mixed'),
])
def test_vec_stc_basic(tmpdir, klass, kind):
@pytest.mark.parametrize('dtype', [
np.float32, np.float64, np.complex64, np.complex128])
def test_vec_stc_basic(tmpdir, klass, kind, dtype):
"""Test (vol)vector source estimate."""
nn = np.array([
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[np.sqrt(1. / 2.), 0, np.sqrt(1. / 2.)],
[np.sqrt(1 / 3.)] * 3
])
], np.float32)

data = np.array([
[1, 0, 0],
[0, 2, 0],
[3, 0, 0],
[-3, 0, 0],
[1, 1, 1],
])[:, :, np.newaxis]
magnitudes = np.linalg.norm(data, axis=1)[:, 0]
normals = np.array([1, 2, 0, np.sqrt(3)])
], dtype)[:, :, np.newaxis]
amplitudes = np.array([1, 2, 3, np.sqrt(3)], dtype)
magnitudes = amplitudes.copy()
normals = np.array([1, 2, -3. / np.sqrt(2), np.sqrt(3)], dtype)
if dtype in (np.complex64, np.complex128):
data *= 1j
amplitudes *= 1j
normals *= 1j
directions = np.array(
[[1, 0, 0], [0, 1, 0], [-1, 0, 0], [1. / np.sqrt(3)] * 3])
vol_kind = kind if kind in ('discrete', 'vol') else 'vol'
vol_src = SourceSpaces([dict(nn=nn, type=vol_kind)])
assert vol_src.kind == dict(vol='volume').get(vol_kind, vol_kind)
Expand All @@ -1155,20 +1165,35 @@ def test_vec_stc_basic(tmpdir, klass, kind):
verts = surf_verts + vol_verts
assert src.kind == 'mixed'
data = np.tile(data, (2, 1, 1))
amplitudes = np.tile(amplitudes, 2)
magnitudes = np.tile(magnitudes, 2)
normals = np.tile(normals, 2)
directions = np.tile(directions, (2, 1))
stc = klass(data, verts, 0, 1, 'foo')
amplitudes = amplitudes[:, np.newaxis]
magnitudes = magnitudes[:, np.newaxis]

# Magnitude of the vectors
assert_array_equal(stc.magnitude().data[:, 0], magnitudes)
assert_array_equal(stc.magnitude().data, magnitudes)

# Vector components projected onto the vertex normals
if kind in ('vol', 'mixed'):
with pytest.raises(RuntimeError, match='surface or discrete'):
stc.normal(src)
stc.project('normal', src)[0]
else:
normal = stc.normal(src)
assert_array_equal(normal.data[:, 0], normals)
normal = stc.project('normal', src)[0]
assert_allclose(normal.data[:, 0], normals)

# Maximal-variance component, either to keep amps pos or to align to src-nn
projected, got_directions = stc.project('pca')
assert_allclose(got_directions, directions)
assert_allclose(projected.data, amplitudes)
projected, got_directions = stc.project('pca', src)
flips = np.array([[1], [1], [-1.], [1]])
if klass is MixedVectorSourceEstimate:
flips = np.tile(flips, (2, 1))
assert_allclose(got_directions, directions * flips)
assert_allclose(projected.data, amplitudes * flips)

out_name = tmpdir.join('temp.h5')
stc.save(out_name)
Expand All @@ -1188,6 +1213,37 @@ def test_vec_stc_basic(tmpdir, klass, kind):
klass(data, verts, 0, 1)


@pytest.mark.parametrize('real', (True, False))
def test_source_estime_project(real):
"""Test projecting a source estimate onto direction of max power."""
n_src, n_times = 4, 100
rng = np.random.RandomState(0)
data = rng.randn(n_src, 3, n_times)
if not real:
data = data + 1j * rng.randn(n_src, 3, n_times)
assert data.dtype == np.complex128
else:
assert data.dtype == np.float64

# Make sure that the normal we get maximizes the power
# (i.e., minimizes the negative power)
want_nn = np.empty((n_src, 3))
for ii in range(n_src):
x0 = np.ones(3)

def objective(x):
x = x / np.linalg.norm(x)
return -np.linalg.norm(np.dot(x, data[ii]))
want_nn[ii] = fmin_cobyla(objective, x0, (), rhobeg=0.1, rhoend=1e-6)
want_nn /= np.linalg.norm(want_nn, axis=1, keepdims=True)

stc = VolVectorSourceEstimate(data, [np.arange(n_src)], 0, 1)
stc_max, directions = stc.project('pca')
flips = np.sign(np.sum(directions * want_nn, axis=1, keepdims=True))
directions *= flips
assert_allclose(directions, want_nn, atol=1e-6)


@pytest.fixture(scope='module', params=[testing._pytest_param()])
def invs():
"""Inverses of various amounts of loose."""
Expand Down Expand Up @@ -1251,15 +1307,16 @@ def test_vec_stc_inv_fixed(invs, pick_ori):
evoked, _, _, _, fixed, fixedish = invs
stc_fixed = apply_inverse(evoked, fixed)
stc_fixed_vector = apply_inverse(evoked, fixed, pick_ori='vector')
assert_allclose(stc_fixed.data, stc_fixed_vector.normal(fixed['src']).data)
assert_allclose(stc_fixed.data,
stc_fixed_vector.project('normal', fixed['src'])[0].data)
stc_fixedish = apply_inverse(evoked, fixedish, pick_ori=pick_ori)
if pick_ori == 'vector':
assert_allclose(stc_fixed_vector.data, stc_fixedish.data, atol=1e-2)
# two ways here: with magnitude...
assert_allclose(
abs(stc_fixed).data, stc_fixedish.magnitude().data, atol=1e-2)
# ... and when picking the normal (signed)
stc_fixedish = stc_fixedish.normal(fixedish['src'])
stc_fixedish = stc_fixedish.project('normal', fixedish['src'])[0]
elif pick_ori is None:
stc_fixed = abs(stc_fixed)
else:
Expand Down
6 changes: 3 additions & 3 deletions mne/utils/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,11 +557,11 @@ def _check_option(parameter, value, allowed_values, extra=''):
'{options}, but got {value!r} instead.')
allowed_values = list(allowed_values) # e.g., if a dict was given
if len(allowed_values) == 1:
options = 'The only allowed value is %r' % allowed_values[0]
options = f'The only allowed value is {repr(allowed_values[0])}'
else:
options = 'Allowed values are '
options += ', '.join(['%r' % v for v in allowed_values[:-1]])
options += ' and %r' % allowed_values[-1]
options += ', '.join([f'{repr(v)}' for v in allowed_values[:-1]])
options += f' and {repr(allowed_values[-1])}'
raise ValueError(msg.format(parameter=parameter, options=options,
value=value, extra=extra))

Expand Down

0 comments on commit 181a572

Please sign in to comment.