diff --git a/mne/channels/interpolation.py b/mne/channels/interpolation.py index 35f3f1f9e36..3976cb9714a 100644 --- a/mne/channels/interpolation.py +++ b/mne/channels/interpolation.py @@ -207,11 +207,11 @@ def _interpolate_bads_nirs(inst, method='nearest', exclude=(), verbose=None): from scipy.spatial.distance import pdist, squareform from mne.preprocessing.nirs import _validate_nirs_info - # Returns pick of all nirs and ensures channels are correctly ordered - picks_nirs = _validate_nirs_info(inst.info) - if len(picks_nirs) == 0: + if len(pick_types(inst.info, fnirs=True, exclude=())) == 0: return + # Returns pick of all nirs and ensures channels are correctly ordered + picks_nirs = _validate_nirs_info(inst.info) nirs_ch_names = [inst.info['ch_names'][p] for p in picks_nirs] nirs_ch_names = [ch for ch in nirs_ch_names if ch not in exclude] bads_nirs = [ch for ch in inst.info['bads'] if ch in nirs_ch_names] diff --git a/mne/conftest.py b/mne/conftest.py index 34cad272dca..d6b0d76a1db 100644 --- a/mne/conftest.py +++ b/mne/conftest.py @@ -24,7 +24,7 @@ from mne.coreg import create_default_subject from mne.datasets import testing from mne.fixes import has_numba, _compare_version -from mne.io import read_raw_fif, read_raw_ctf +from mne.io import read_raw_fif, read_raw_ctf, read_raw_nirx, read_raw_snirf from mne.stats import cluster_level from mne.utils import (_pl, _assert_no_instances, numerics, Bunch, _check_qt_version, _TempDir) @@ -48,6 +48,17 @@ ctf_dir = op.join(test_path, 'CTF') fname_ctf_continuous = op.join(ctf_dir, 'testdata_ctf.ds') +nirx_path = test_path / 'NIRx' +snirf_path = test_path / 'SNIRF' +nirsport2 = nirx_path / 'nirsport_v2' / 'aurora_recording _w_short_and_acc' +nirsport2_snirf = ( + snirf_path / 'NIRx' / 'NIRSport2' / '1.0.3' / + '2021-05-05_001.snirf') +nirsport2_2021_9 = nirx_path / 'nirsport_v2' / 'aurora_2021_9' +nirsport2_20219_snirf = ( + snirf_path / 'NIRx' / 'NIRSport2' / '2021.9' / + '2021-10-01_002.snirf') + # data from mne.io.tests.data base_dir = op.join(op.dirname(__file__), 'io', 'tests', 'data') fname_raw_io = op.join(base_dir, 'test_raw.fif') @@ -925,3 +936,15 @@ def run(nbexec=nbexec, code=code): item.runtest = run return + + +@pytest.mark.filterwarnings('ignore:.*Extraction of measurement.*:') +@pytest.fixture(params=( + [nirsport2, nirsport2_snirf, testing._pytest_param()], + [nirsport2_2021_9, nirsport2_20219_snirf, testing._pytest_param()], +)) +def nirx_snirf(request): + """Return a (raw_nirx, raw_snirf) matched pair.""" + pytest.importorskip('h5py') + return (read_raw_nirx(request.param[0], preload=True), + read_raw_snirf(request.param[1], preload=True)) diff --git a/mne/fixes.py b/mne/fixes.py index 0cb743f5ede..4f37c826841 100644 --- a/mne/fixes.py +++ b/mne/fixes.py @@ -12,11 +12,9 @@ # Lars Buitinck # License: BSD -import functools import inspect from math import log import os -from pathlib import Path import warnings import numpy as np @@ -72,10 +70,15 @@ def _median_complex(data, axis): # helpers to get function arguments -def _get_args(function, varargs=False): +def _get_args(function, varargs=False, *, + exclude=('var_positional', 'var_keyword')): params = inspect.signature(function).parameters - args = [key for key, param in params.items() - if param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)] + # As of Python 3.10: + # https://docs.python.org/3/library/inspect.html#inspect.Parameter.kind + # POSITIONAL_ONLY, POSITIONAL_OR_KEYWORD, VAR_POSITIONAL, KEYWORD_ONLY, + # VAR_KEYWORD + exclude = set(getattr(inspect.Parameter, ex.upper()) for ex in exclude) + args = [key for key, param in params.items() if param.kind not in exclude] if varargs: varargs = [param.name for param in params.values() if param.kind == param.VAR_POSITIONAL] diff --git a/mne/io/nirx/nirx.py b/mne/io/nirx/nirx.py index b03e8beb999..8b1a45f3312 100644 --- a/mne/io/nirx/nirx.py +++ b/mne/io/nirx/nirx.py @@ -16,7 +16,6 @@ from ..utils import _mult_cal_one from ..constants import FIFF from ..meas_info import create_info, _format_dig_points -from ..pick import pick_types from ...annotations import Annotations from ..._freesurfer import get_mni_fiducials from ...transforms import apply_trans, _get_trans @@ -459,7 +458,6 @@ def __init__(self, fname, saturated, preload=False, verbose=None): ch_names.append(list()) annot = Annotations(onset, duration, description, ch_names=ch_names) self.set_annotations(annot) - self.pick(picks=_nirs_sort_idx(self.info)) def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): """Read a segment of data from a file. @@ -512,21 +510,3 @@ def _convert_fnirs_to_head(trans, fro, to, src_locs, det_locs, ch_locs): det_locs = apply_trans(mri_head_t, det_locs) ch_locs = apply_trans(mri_head_t, ch_locs) return src_locs, det_locs, ch_locs, mri_head_t - - -def _nirs_sort_idx(info): - # TODO: Remove any actual reordering that is done and just use this - # function to get picks to operate on in an ordered way. This should be - # done by refactoring mne.preprocessing.nirs.nirs._check_channels_ordered - # and this function to make sure the picks we obtain here are in the - # correct order. - nirs_picks = pick_types(info, fnirs=True, exclude=()) - other_picks = np.setdiff1d(np.arange(info['nchan']), nirs_picks) - prefixes = [info['ch_names'][pick].split()[0] for pick in nirs_picks] - nirs_names = [info['ch_names'][pick] for pick in nirs_picks] - nirs_sorted = sorted(nirs_names, - key=lambda name: (prefixes.index(name.split()[0]), - name.split(maxsplit=1)[1])) - nirs_picks = nirs_picks[ - [nirs_names.index(name) for name in nirs_sorted]] - return np.concatenate((nirs_picks, other_picks)) diff --git a/mne/io/nirx/tests/test_nirx.py b/mne/io/nirx/tests/test_nirx.py index 263c2e3dc32..fece76fffcb 100644 --- a/mne/io/nirx/tests/test_nirx.py +++ b/mne/io/nirx/tests/test_nirx.py @@ -14,13 +14,12 @@ from mne import pick_types from mne.datasets.testing import data_path, requires_testing_data -from mne.io import read_raw_nirx, read_raw_snirf -from mne.utils import requires_h5py +from mne.io import read_raw_nirx from mne.io.tests.test_raw import _test_raw_reader from mne.preprocessing import annotate_nan from mne.transforms import apply_trans, _get_trans from mne.preprocessing.nirs import source_detector_distances,\ - short_channels + short_channels, _reorder_nirx from mne.io.constants import FIFF testing_path = data_path(download=False) @@ -46,31 +45,17 @@ testing_path, 'NIRx', 'nirsport_v1', 'nirx_15_3_recording_w_' 'saturation_on_montage_channels') -# NIRSport2 device using Aurora software and matching snirf file +# NIRSport2 device using Aurora software nirsport2 = op.join( testing_path, 'NIRx', 'nirsport_v2', 'aurora_recording _w_short_and_acc') -nirsport2_snirf = op.join( - testing_path, 'SNIRF', 'NIRx', 'NIRSport2', '1.0.3', - '2021-05-05_001.snirf') - nirsport2_2021_9 = op.join( testing_path, 'NIRx', 'nirsport_v2', 'aurora_2021_9') -snirf_nirsport2_20219 = op.join( - testing_path, 'SNIRF', 'NIRx', 'NIRSport2', '2021.9', - '2021-10-01_002.snirf') -@requires_h5py -@requires_testing_data -@pytest.mark.filterwarnings('ignore:.*Extraction of measurement.*:') -@pytest.mark.parametrize('fname_nirx, fname_snirf', ( - [nirsport2, nirsport2_snirf], - [nirsport2_2021_9, snirf_nirsport2_20219], -)) -def test_nirsport_v2_matches_snirf(fname_nirx, fname_snirf): +def test_nirsport_v2_matches_snirf(nirx_snirf): """Test NIRSport2 raw files return same data as snirf.""" - raw = read_raw_nirx(fname_nirx, preload=True) - raw_snirf = read_raw_snirf(fname_snirf, preload=True) + raw, raw_snirf = nirx_snirf + _reorder_nirx(raw_snirf) assert raw.ch_names == raw_snirf.ch_names assert_allclose(raw._data, raw_snirf._data) diff --git a/mne/io/snirf/_snirf.py b/mne/io/snirf/_snirf.py index c6322b26ebb..134497c679f 100644 --- a/mne/io/snirf/_snirf.py +++ b/mne/io/snirf/_snirf.py @@ -15,7 +15,7 @@ from ..constants import FIFF from .._digitization import _make_dig_points from ...transforms import _frame_to_str, apply_trans -from ..nirx.nirx import _convert_fnirs_to_head, _nirs_sort_idx +from ..nirx.nirx import _convert_fnirs_to_head from ..._freesurfer import get_mni_fiducials @@ -409,10 +409,6 @@ def natural_keys(text): annot.append(data[:, 0], 1.0, desc.decode('UTF-8')) self.set_annotations(annot, emit_warning=False) - # MNE requires channels are paired as alternating wavelengths - if len(_validate_nirs_info(self.info, throw_errors=False)) == 0: - self.pick(picks=_nirs_sort_idx(self.info)) - # Validate that the fNIRS info is correctly formatted _validate_nirs_info(self.info) diff --git a/mne/io/snirf/tests/test_snirf.py b/mne/io/snirf/tests/test_snirf.py index 5fb85e710f5..57d7ee88f20 100644 --- a/mne/io/snirf/tests/test_snirf.py +++ b/mne/io/snirf/tests/test_snirf.py @@ -12,7 +12,8 @@ from mne.io import read_raw_snirf, read_raw_nirx from mne.io.tests.test_raw import _test_raw_reader from mne.preprocessing.nirs import (optical_density, beer_lambert_law, - short_channels, source_detector_distances) + short_channels, source_detector_distances, + _reorder_nirx) from mne.transforms import apply_trans, _get_trans from mne.io.constants import FIFF @@ -93,23 +94,18 @@ def test_snirf_gowerlabs(): assert len(raw.ch_names) == 216 assert_allclose(raw.info['sfreq'], 10.0) # we don't force them to be sorted according to a naive split - # (but we do force them to be interleaved, which is tested by beer_lambert - # above) assert raw.ch_names != sorted(raw.ch_names) - # ... and this file does have a nice logical ordering already + # ... but this file does have a nice logical ordering already + print(raw.ch_names) assert raw.ch_names == sorted( - raw.ch_names, # use a key which is (source int, detector int) - key=lambda name: (int(name.split()[0].split('_')[0][1:]), - int(name.split()[0].split('_')[1][1:]))) - prefixes = [name.split()[0] for name in raw.ch_names] - # TODO: This is actually not the order on disk -- we reorder to ravel as - # S-D then freq, but gowerlabs order is freq then S-D. So hopefully soon - # we can change these lines to check that the first half of prefixes - # matches the second half of prefixes, rather than every-other matching the - # other every-other - assert prefixes[::2] == prefixes[1::2] - prefixes = prefixes[::2] - assert prefixes == ['S1_D1', 'S1_D2', 'S1_D3', 'S1_D4', 'S1_D5', 'S1_D6', 'S1_D7', 'S1_D8', 'S1_D9', 'S1_D10', 'S1_D11', 'S1_D12', 'S2_D1', 'S2_D2', 'S2_D3', 'S2_D4', 'S2_D5', 'S2_D6', 'S2_D7', 'S2_D8', 'S2_D9', 'S2_D10', 'S2_D11', 'S2_D12', 'S3_D1', 'S3_D2', 'S3_D3', 'S3_D4', 'S3_D5', 'S3_D6', 'S3_D7', 'S3_D8', 'S3_D9', 'S3_D10', 'S3_D11', 'S3_D12', 'S4_D1', 'S4_D2', 'S4_D3', 'S4_D4', 'S4_D5', 'S4_D6', 'S4_D7', 'S4_D8', 'S4_D9', 'S4_D10', 'S4_D11', 'S4_D12', 'S5_D1', 'S5_D2', 'S5_D3', 'S5_D4', 'S5_D5', 'S5_D6', 'S5_D7', 'S5_D8', 'S5_D9', 'S5_D10', 'S5_D11', 'S5_D12', 'S6_D1', 'S6_D2', 'S6_D3', 'S6_D4', 'S6_D5', 'S6_D6', 'S6_D7', 'S6_D8', 'S6_D9', 'S6_D10', 'S6_D11', 'S6_D12', 'S7_D1', 'S7_D2', 'S7_D3', 'S7_D4', 'S7_D5', 'S7_D6', 'S7_D7', 'S7_D8', 'S7_D9', 'S7_D10', 'S7_D11', 'S7_D12', 'S8_D1', 'S8_D2', 'S8_D3', 'S8_D4', 'S8_D5', 'S8_D6', 'S8_D7', 'S8_D8', 'S8_D9', 'S8_D10', 'S8_D11', 'S8_D12', 'S9_D1', 'S9_D2', 'S9_D3', 'S9_D4', 'S9_D5', 'S9_D6', 'S9_D7', 'S9_D8', 'S9_D9', 'S9_D10', 'S9_D11', 'S9_D12'] # noqa: E501 + raw.ch_names, + # use a key which is (src triplet, freq, src, freq, det) + key=lambda name: ( + (int(name.split()[0].split('_')[0][1:]) - 1) // 3, + int(name.split()[1]), + int(name.split()[0].split('_')[0][1:]), + int(name.split()[0].split('_')[1][1:]) + )) @requires_testing_data @@ -122,13 +118,13 @@ def test_snirf_basic(): assert raw.info['sfreq'] == 12.5 # Test channel naming - assert raw.info['ch_names'][:4] == ["S1_D1 760", "S1_D1 850", - "S1_D9 760", "S1_D9 850"] - assert raw.info['ch_names'][24:26] == ["S5_D13 760", "S5_D13 850"] + assert raw.info['ch_names'][:4] == ["S1_D1 760", "S1_D9 760", + "S2_D3 760", "S2_D10 760"] + assert raw.info['ch_names'][24:26] == ['S5_D8 850', 'S5_D13 850'] # Test frequency encoding assert raw.info['chs'][0]['loc'][9] == 760 - assert raw.info['chs'][1]['loc'][9] == 850 + assert raw.info['chs'][24]['loc'][9] == 850 # Test source locations assert_allclose([-8.6765 * 1e-2, 0.0049 * 1e-2, -2.6167 * 1e-2], @@ -159,6 +155,7 @@ def test_snirf_basic(): def test_snirf_against_nirx(): """Test against file snirf was created from.""" raw = read_raw_snirf(sfnirs_homer_103_wShort, preload=True) + _reorder_nirx(raw) raw_orig = read_raw_nirx(sfnirs_homer_103_wShort_original, preload=True) # Check annotations are the same @@ -225,13 +222,13 @@ def test_snirf_nirsport2(): assert_almost_equal(raw.info['sfreq'], 7.6, decimal=1) # Test channel naming - assert raw.info['ch_names'][:4] == ['S1_D1 760', 'S1_D1 850', - 'S1_D3 760', 'S1_D3 850'] - assert raw.info['ch_names'][24:26] == ['S6_D4 760', 'S6_D4 850'] + assert raw.info['ch_names'][:4] == ['S1_D1 760', 'S1_D3 760', + 'S1_D9 760', 'S1_D16 760'] + assert raw.info['ch_names'][24:26] == ['S8_D15 760', 'S8_D20 760'] # Test frequency encoding assert raw.info['chs'][0]['loc'][9] == 760 - assert raw.info['chs'][1]['loc'][9] == 850 + assert raw.info['chs'][-1]['loc'][9] == 850 assert sum(short_channels(raw.info)) == 16 @@ -257,6 +254,7 @@ def test_snirf_nirsport2_w_positions(): """Test reading SNIRF files with known positions.""" raw = read_raw_snirf(nirx_nirsport2_103_2, preload=True, optode_frame="mri") + _reorder_nirx(raw) # Test data import assert raw._data.shape == (40, 128) diff --git a/mne/preprocessing/ica.py b/mne/preprocessing/ica.py index a2f308b36ce..71954d5d307 100644 --- a/mne/preprocessing/ica.py +++ b/mne/preprocessing/ica.py @@ -110,9 +110,13 @@ def get_score_funcs(): score_funcs.update({n: _make_xy_sfunc(f) for n, f in xy_arg_dist_funcs if _get_args(f) == ['u', 'v']}) + # In SciPy 1.9+, pearsonr has (u, v, *, alternative='two-sided'), so we + # should just look at the positional_only and positional_or_keyword entries + exclude = ('var_positional', 'var_keyword', 'keyword_only') score_funcs.update({n: _make_xy_sfunc(f, ndim_output=True) for n, f in xy_arg_stats_funcs - if _get_args(f) == ['x', 'y']}) + if _get_args(f, exclude=exclude) == ['x', 'y']}) + assert 'pearsonr' in score_funcs return score_funcs diff --git a/mne/preprocessing/nirs/__init__.py b/mne/preprocessing/nirs/__init__.py index c9cc6d11374..3dbbb28249c 100644 --- a/mne/preprocessing/nirs/__init__.py +++ b/mne/preprocessing/nirs/__init__.py @@ -8,8 +8,9 @@ from .nirs import (short_channels, source_detector_distances, _check_channels_ordered, _channel_frequencies, - _fnirs_check_bads, _fnirs_spread_bads, _channel_chromophore, - _validate_nirs_info, _fnirs_optode_names, _optode_position) + _fnirs_spread_bads, _channel_chromophore, + _validate_nirs_info, _fnirs_optode_names, _optode_position, + _reorder_nirx) from ._optical_density import optical_density from ._beer_lambert_law import beer_lambert_law from ._scalp_coupling_index import scalp_coupling_index diff --git a/mne/preprocessing/nirs/_beer_lambert_law.py b/mne/preprocessing/nirs/_beer_lambert_law.py index 329d3dc50ba..ed1b25ebca3 100644 --- a/mne/preprocessing/nirs/_beer_lambert_law.py +++ b/mne/preprocessing/nirs/_beer_lambert_law.py @@ -11,8 +11,7 @@ from ...io import BaseRaw from ...io.constants import FIFF from ...utils import _validate_type, warn -from ..nirs import source_detector_distances, _channel_frequencies,\ - _check_channels_ordered, _channel_chromophore +from ..nirs import source_detector_distances, _validate_nirs_info def beer_lambert_law(raw, ppf=6.): @@ -35,8 +34,10 @@ def beer_lambert_law(raw, ppf=6.): _validate_type(raw, BaseRaw, 'raw') _validate_type(ppf, 'numeric', 'ppf') ppf = float(ppf) - freqs = np.unique(_channel_frequencies(raw.info, nominal=True)) - picks = _check_channels_ordered(raw.info, freqs) + picks = _validate_nirs_info(raw.info, fnirs='od', which='Beer-lambert') + # This is the one place we *really* need the actual/accurate frequencies + freqs = np.array( + [raw.info['chs'][pick]['loc'][9] for pick in picks], float) abs_coef = _load_absorption(freqs) distances = source_detector_distances(raw.info) if (distances == 0).any(): @@ -49,25 +50,24 @@ def beer_lambert_law(raw, ppf=6.): 'likely due to optode locations being stored in a ' ' unit other than meters.') rename = dict() - for ii in picks[::2]: + for ii, jj in zip(picks[::2], picks[1::2]): EL = abs_coef * distances[ii] * ppf iEL = linalg.pinv(EL) - raw._data[[ii, ii + 1]] = iEL @ raw._data[[ii, ii + 1]] * 1e-3 + raw._data[[ii, jj]] = iEL @ raw._data[[ii, jj]] * 1e-3 # Update channel information coil_dict = dict(hbo=FIFF.FIFFV_COIL_FNIRS_HBO, hbr=FIFF.FIFFV_COIL_FNIRS_HBR) - for ki, kind in enumerate(('hbo', 'hbr')): - ch = raw.info['chs'][ii + ki] + for ki, kind in zip((ii, jj), ('hbo', 'hbr')): + ch = raw.info['chs'][ki] ch.update(coil_type=coil_dict[kind], unit=FIFF.FIFF_UNIT_MOL) new_name = f'{ch["ch_name"].split(" ")[0]} {kind}' rename[ch['ch_name']] = new_name raw.rename_channels(rename) # Validate the format of data after transformation is valid - chroma = np.unique(_channel_chromophore(raw.info)) - _check_channels_ordered(raw.info, chroma) + _validate_nirs_info(raw.info, fnirs='hb') return raw diff --git a/mne/preprocessing/nirs/_optical_density.py b/mne/preprocessing/nirs/_optical_density.py index 318ec8210fa..9955f05df04 100644 --- a/mne/preprocessing/nirs/_optical_density.py +++ b/mne/preprocessing/nirs/_optical_density.py @@ -9,8 +9,7 @@ from ...io import BaseRaw from ...io.constants import FIFF from ...utils import _validate_type, warn, verbose -from ...io.pick import _picks_to_idx -from ..nirs import _channel_frequencies, _check_channels_ordered +from ..nirs import _validate_nirs_info @verbose @@ -30,10 +29,7 @@ def optical_density(raw, *, verbose=None): """ raw = raw.copy().load_data() _validate_type(raw, BaseRaw, 'raw') - _check_channels_ordered( - raw.info, np.unique(_channel_frequencies(raw.info, nominal=True))) - - picks = _picks_to_idx(raw.info, 'fnirs_cw_amplitude') + picks = _validate_nirs_info(raw.info, fnirs='cw_amplitude') # The devices measure light intensity. Negative light intensities should # not occur. If they do it is likely due to hardware or movement issues. @@ -49,9 +45,9 @@ def optical_density(raw, *, verbose=None): for pi in picks: np.maximum(raw._data[pi], min_, out=raw._data[pi]) - data_means = np.mean(raw.get_data(picks=picks), axis=1) - for ii, pi in enumerate(picks): - raw._data[pi] /= data_means[ii] + for pi in picks: + data_mean = np.mean(raw._data[pi]) + raw._data[pi] /= data_mean np.log(raw._data[pi], out=raw._data[pi]) raw._data[pi] *= -1 raw.info['chs'][pi]['coil_type'] = FIFF.FIFFV_COIL_FNIRS_OD diff --git a/mne/preprocessing/nirs/_scalp_coupling_index.py b/mne/preprocessing/nirs/_scalp_coupling_index.py index d9fd3b3c7e5..4c2e4f613f1 100644 --- a/mne/preprocessing/nirs/_scalp_coupling_index.py +++ b/mne/preprocessing/nirs/_scalp_coupling_index.py @@ -8,7 +8,7 @@ from ...io import BaseRaw from ...utils import _validate_type, verbose -from ..nirs import _channel_frequencies, _check_channels_ordered +from ..nirs import _validate_nirs_info @verbose @@ -41,13 +41,8 @@ def scalp_coupling_index(raw, l_freq=0.7, h_freq=1.5, .. footbibliography:: """ _validate_type(raw, BaseRaw, 'raw') - - if 'fnirs_od' not in raw: - raise RuntimeError( - 'Scalp coupling index should be run on optical density data.') - - freqs = np.unique(_channel_frequencies(raw.info, nominal=True)) - picks = _check_channels_ordered(raw.info, freqs) + picks = _validate_nirs_info( + raw.info, fnirs='od', which='Scalp coupling index') raw = raw.copy().pick(picks).load_data() zero_mask = np.std(raw._data, axis=-1) == 0 @@ -64,4 +59,5 @@ def scalp_coupling_index(raw, l_freq=0.7, h_freq=1.5, sci[ii] = c sci[ii + 1] = c sci[zero_mask] = 0 + sci = sci[np.argsort(picks)] # restore original order return sci diff --git a/mne/preprocessing/nirs/_tddr.py b/mne/preprocessing/nirs/_tddr.py index f29883ab2a7..9a6022e589d 100644 --- a/mne/preprocessing/nirs/_tddr.py +++ b/mne/preprocessing/nirs/_tddr.py @@ -8,7 +8,6 @@ from ...io import BaseRaw from ...utils import _validate_type, verbose -from ...io.pick import _picks_to_idx from ..nirs import _validate_nirs_info @@ -47,12 +46,11 @@ def temporal_derivative_distribution_repair(raw, *, verbose=None): """ raw = raw.copy().load_data() _validate_type(raw, BaseRaw, 'raw') - _validate_nirs_info(raw.info) + picks = _validate_nirs_info(raw.info) - picks = _picks_to_idx(raw.info, ['fnirs_od', 'hbo', 'hbr'], exclude=[]) if not len(picks): - raise RuntimeError('TDDR should be run on optical density or \ - hemoglobin data.') + raise RuntimeError('TDDR should be run on optical density or ' + 'hemoglobin data.') for pick in picks: raw._data[pick] = _TDDR(raw._data[pick], raw.info['sfreq']) diff --git a/mne/preprocessing/nirs/nirs.py b/mne/preprocessing/nirs/nirs.py index 982bf41347f..53bdfd8d029 100644 --- a/mne/preprocessing/nirs/nirs.py +++ b/mne/preprocessing/nirs/nirs.py @@ -7,8 +7,8 @@ import re import numpy as np -from ...io.pick import _picks_to_idx -from ...utils import fill_doc +from ...io.pick import _picks_to_idx, pick_types +from ...utils import fill_doc, _check_option, _validate_type # Standardized fNIRS channel name regexs @@ -31,10 +31,10 @@ def source_detector_distances(info, picks=None): Array containing distances in meters. Of shape equal to number of channels, or shape of picks if supplied. """ - dist = [np.linalg.norm(ch['loc'][3:6] - ch['loc'][6:9]) - for ch in info['chs']] - picks = _picks_to_idx(info, picks, exclude=[]) - return np.array(dist, float)[picks] + return np.array([ + np.linalg.norm(np.diff( + info['chs'][pick]['loc'][3:9].reshape(2, 3), axis=0)[0]) + for pick in _picks_to_idx(info, picks, exclude=[])], float) @fill_doc @@ -59,19 +59,16 @@ def short_channels(info, threshold=0.01): return source_detector_distances(info) < threshold -def _channel_frequencies(info, nominal=False): +def _channel_frequencies(info): """Return the light frequency for each channel.""" # Only valid for fNIRS data before conversion to haemoglobin picks = _picks_to_idx(info, ['fnirs_cw_amplitude', 'fnirs_od'], exclude=[], allow_empty=True) - freqs = np.empty(picks.size, int) - for ii in picks: - if nominal: - freq = float(_S_D_F_RE.match(info['ch_names'][ii]).groups()[2]) - else: - freq = info['chs'][ii]['loc'][9] - freqs[ii] = freq - return freqs + freqs = list() + for pick in picks: + freqs.append(round(float( + _S_D_F_RE.match(info['ch_names'][pick]).groups()[2]))) + return np.array(freqs, int) def _channel_chromophore(info): @@ -84,7 +81,8 @@ def _channel_chromophore(info): return chroma -def _check_channels_ordered(info, pair_vals, throw_errors=True): +def _check_channels_ordered(info, pair_vals, *, throw_errors=True, + check_bads=True): """Check channels follow expected fNIRS format. If the channels are correctly ordered then an array of valid picks @@ -103,8 +101,6 @@ def _check_channels_ordered(info, pair_vals, throw_errors=True): # All chromophore fNIRS data picks_chroma = _picks_to_idx(info, ['hbo', 'hbr'], exclude=[], allow_empty=True) - # All continuous wave fNIRS data - picks = np.hstack([picks_chroma, picks_wave]) if (len(picks_wave) > 0) & (len(picks_chroma) > 0): picks = _throw_or_return_empty( @@ -112,11 +108,43 @@ def _check_channels_ordered(info, pair_vals, throw_errors=True): 'density, and haemoglobin data in the same raw structure.', throw_errors) + # All continuous wave fNIRS data + if len(picks_wave): + error_word = "frequencies" + use_RE = _S_D_F_RE + picks = picks_wave + else: + error_word = "chromophore" + use_RE = _S_D_H_RE + picks = picks_chroma + + pair_vals = np.array(pair_vals) + if pair_vals.shape != (2,): + raise ValueError( + f'Exactly two {error_word} must exist in info, got ' + f'{list(pair_vals)}') + # In principle we do not need to require that these be sorted -- + # all we need to do is change our sorted() below to make use of a + # pair_vals.index(...) in a sort key -- but in practice we always want + # (hbo, hbr) or (lower_freq, upper_freq) pairings, both of which will + # work with a naive string sort, so let's just enforce sorted-ness here + is_str = pair_vals.dtype.kind == 'U' + pair_vals = list(pair_vals) + if is_str: + if pair_vals != ['hbo', 'hbr']: + raise ValueError( + f'The {error_word} in info must be ["hbo", "hbr"], but got ' + f'{pair_vals} instead') + elif not np.array_equal(np.unique(pair_vals), pair_vals): + raise ValueError( + f'The {error_word} in info must be unique and sorted, but got ' + f'got {pair_vals} instead') + if len(picks) % 2 != 0: picks = _throw_or_return_empty( 'NIRS channels not ordered correctly. An even number of NIRS ' - f'channels is required. {len(info.ch_names)} channels were' - f'provided: {info.ch_names}', throw_errors) + f'channels is required. {len(info.ch_names)} channels were' + f'provided', throw_errors) # Ensure wavelength info exists for waveform data all_freqs = [info["chs"][ii]["loc"][9] for ii in picks_wave] @@ -126,51 +154,59 @@ def _check_channels_ordered(info, pair_vals, throw_errors=True): f'info["chs"] structure. The encoded wavelengths are {all_freqs}.', throw_errors) - for ii in picks[::2]: - ch1_name_info = _S_D_F_RE.match(info['chs'][ii]['ch_name']) - ch2_name_info = _S_D_F_RE.match(info['chs'][ii + 1]['ch_name']) - - if bool(ch2_name_info) & bool(ch1_name_info): - - first_value = float(ch1_name_info.groups()[2]) - second_value = float(ch2_name_info.groups()[2]) - error_word = "frequencies" - - else: - ch1_name_info = _S_D_H_RE.match(info['chs'][ii]['ch_name']) - ch2_name_info = _S_D_H_RE.match(info['chs'][ii + 1]['ch_name']) - - if bool(ch2_name_info) & bool(ch1_name_info): - - first_value = ch1_name_info.groups()[2] - second_value = ch2_name_info.groups()[2] - error_word = "chromophore" - - if (first_value not in ["hbo", "hbr"] or - second_value not in ["hbo", "hbr"]): - picks = _throw_or_return_empty( - "NIRS channels have specified naming conventions." - "Chromophore data must be labeled either hbo or hbr. " - f"Failing channels are {info['chs'][ii]['ch_name']}, " - f"{info['chs'][ii + 1]['ch_name']}", throw_errors) - - else: + # Validate the channel naming scheme + for pick in picks: + ch_name_info = use_RE.match(info['chs'][pick]['ch_name']) + if not bool(ch_name_info): + picks = _throw_or_return_empty( + 'NIRS channels have specified naming conventions. ' + 'The provided channel name can not be parsed: ' + f'{repr(info.ch_names[pick])}', throw_errors) + break + value = ch_name_info.groups()[2] + if len(picks_wave): + value = value + else: # picks_chroma + if value not in ["hbo", "hbr"]: picks = _throw_or_return_empty( - 'NIRS channels have specified naming conventions. ' - 'The provided channel names can not be parsed.' - f'Channels are {info.ch_names}', throw_errors) - - if (ch1_name_info.groups()[0] != ch2_name_info.groups()[0]) or \ - (ch1_name_info.groups()[1] != ch2_name_info.groups()[1]) or \ - (first_value != pair_vals[0]) or \ - (second_value != pair_vals[1]): + "NIRS channels have specified naming conventions." + "Chromophore data must be labeled either hbo or hbr. " + f"The failing channel is {info['chs'][pick]['ch_name']}", + throw_errors) + break + + # Reorder to be paired (naive sort okay here given validation above) + picks = picks[np.argsort([info['ch_names'][pick] for pick in picks])] + + # Validate our paired ordering + for ii, jj in zip(picks[::2], picks[1::2]): + ch1_name = info['chs'][ii]['ch_name'] + ch2_name = info['chs'][jj]['ch_name'] + ch1_re = use_RE.match(ch1_name) + ch2_re = use_RE.match(ch2_name) + ch1_S, ch1_D, ch1_value = ch1_re.groups()[:3] + ch2_S, ch2_D, ch2_value = ch2_re.groups()[:3] + if len(picks_wave): + ch1_value, ch2_value = float(ch1_value), float(ch2_value) + if (ch1_S != ch2_S) or (ch1_D != ch2_D) or \ + (ch1_value != pair_vals[0]) or (ch2_value != pair_vals[1]): picks = _throw_or_return_empty( - 'NIRS channels not ordered correctly. Channels must be' + 'NIRS channels not ordered correctly. Channels must be ' 'ordered as source detector pairs with alternating' - f' {error_word}: {pair_vals[0]} & {pair_vals[1]}', + f' {error_word} {pair_vals[0]} & {pair_vals[1]}, but got ' + f'S{ch1_S}_D{ch1_D} pair ' + f'{repr(ch1_name)} and {repr(ch2_name)}', throw_errors) - - _fnirs_check_bads(info) + break + + if check_bads: + for ii, jj in zip(picks[::2], picks[1::2]): + want = [info.ch_names[ii], info.ch_names[jj]] + got = list(set(info['bads']).intersection(want)) + if len(got) == 1: + raise RuntimeError( + f'NIRS bad labelling is not consistent, found {got} but ' + f'needed {want}') return picks @@ -181,31 +217,31 @@ def _throw_or_return_empty(msg, throw_errors): return [] -def _validate_nirs_info(info, throw_errors=True): +def _validate_nirs_info(info, *, throw_errors=True, fnirs=None, which=None, + check_bads=True, allow_empty=True): """Apply all checks to fNIRS info. Works on all continuous wave types.""" - freqs = np.unique(_channel_frequencies(info, nominal=True)) + _validate_type(fnirs, (None, str), 'fnirs') + kinds = dict( + od='optical density', + cw_amplitude='continuous wave', + hb='chromophore', + ) + _check_option('fnirs', fnirs, (None,) + tuple(kinds)) + if fnirs is not None: + kind = kinds[fnirs] + fnirs = ['hbo', 'hbr'] if fnirs == 'hb' else f'fnirs_{fnirs}' + if not len(pick_types(info, fnirs=fnirs)): + raise RuntimeError( + f'{which} must operate on {kind} data, but none was found.') + freqs = np.unique(_channel_frequencies(info)) if freqs.size > 0: - picks = _check_channels_ordered(info, freqs, throw_errors=throw_errors) + pair_vals = freqs else: - picks = _check_channels_ordered(info, - np.unique(_channel_chromophore(info)), - throw_errors=throw_errors) - return picks - - -def _fnirs_check_bads(info): - """Check consistent labeling of bads across fnirs optodes.""" - # For an optode pair, if one component (light frequency or chroma) is - # marked as bad then they all should be. This function checks that all - # optodes are marked bad consistently. - picks = _picks_to_idx(info, 'fnirs', exclude=[], allow_empty=True) - for ii in picks[::2]: - want = info.ch_names[ii:ii + 2] - got = list(set(info['bads']).intersection(want)) - if len(got) == 1: - raise RuntimeError( - f'NIRS bad labelling is not consistent, found {got} but ' - f'needed {want}') + pair_vals = np.unique(_channel_chromophore(info)) + out = _check_channels_ordered( + info, pair_vals, throw_errors=throw_errors, + check_bads=check_bads) + return out def _fnirs_spread_bads(info): @@ -213,13 +249,15 @@ def _fnirs_spread_bads(info): # For an optode pair if any component (light frequency or chroma) is marked # as bad, then they all should be. This function will find any pairs marked # as bad and spread the bad marking to all components of the optode pair. - picks = _picks_to_idx(info, 'fnirs', exclude=[], allow_empty=True) - new_bads = list() - for ii in picks[::2]: - bad_opto = set(info['bads']).intersection(info.ch_names[ii:ii + 2]) - if len(bad_opto) > 0: - new_bads.extend(info.ch_names[ii:ii + 2]) - info['bads'] = new_bads + picks = _validate_nirs_info(info, check_bads=False) + new_bads = set(info['bads']) + for ii, jj in zip(picks[::2], picks[1::2]): + ch1_name, ch2_name = info.ch_names[ii], info.ch_names[jj] + if ch1_name in new_bads: + new_bads.add(ch2_name) + elif ch2_name in new_bads: + new_bads.add(ch1_name) + info['bads'] = sorted(new_bads) return info @@ -259,3 +297,16 @@ def _optode_position(info, optode): loc_idx = range(6, 9) return info["chs"][idx]["loc"][loc_idx] + + +def _reorder_nirx(raw): + # Maybe someday we should make this public like + # mne.preprocessing.nirs.reorder_standard(raw, order='nirx') + info = raw.info + picks = pick_types(info, fnirs=True, exclude=[]) + prefixes = [info['ch_names'][pick].split()[0] for pick in picks] + nirs_names = [info['ch_names'][pick] for pick in picks] + nirs_sorted = sorted(nirs_names, + key=lambda name: (prefixes.index(name.split()[0]), + name.split(maxsplit=1)[1])) + raw.reorder_channels(nirs_sorted) diff --git a/mne/preprocessing/nirs/tests/test_beer_lambert_law.py b/mne/preprocessing/nirs/tests/test_beer_lambert_law.py index c6fde250e77..175c135e709 100644 --- a/mne/preprocessing/nirs/tests/test_beer_lambert_law.py +++ b/mne/preprocessing/nirs/tests/test_beer_lambert_law.py @@ -63,13 +63,14 @@ def test_beer_lambert_unordered_errors(): # Test that an error is thrown if channel naming frequency doesn't match # what is stored in loc[9], which should hold the light frequency too. raw_od = optical_density(raw) - raw_od.rename_channels({'S2_D2 760': 'S2_D2 770'}) - with pytest.raises(ValueError, match='not ordered'): - beer_lambert_law(raw_od) - - # Test that an error is thrown if inconsistent frequencies used in data - raw_od.info['chs'][2]['loc'][9] = 770.0 - with pytest.raises(ValueError, match='with alternating frequencies'): + ch_name = raw.ch_names[0] + assert ch_name == 'S1_D1 760' + idx = raw_od.ch_names.index(ch_name) + assert idx == 0 + raw_od.info['chs'][idx]['loc'][9] = 770 + raw_od.rename_channels({ch_name: ch_name.replace('760', '770')}) + assert raw_od.ch_names[0] == 'S1_D1 770' + with pytest.raises(ValueError, match='Exactly two frequencies'): beer_lambert_law(raw_od) diff --git a/mne/preprocessing/nirs/tests/test_nirs.py b/mne/preprocessing/nirs/tests/test_nirs.py index 9646fe13065..47d25dc5ddf 100644 --- a/mne/preprocessing/nirs/tests/test_nirs.py +++ b/mne/preprocessing/nirs/tests/test_nirs.py @@ -8,16 +8,18 @@ import pytest import numpy as np -from numpy.testing import assert_array_equal, assert_array_almost_equal +from numpy.testing import (assert_array_equal, assert_array_almost_equal, + assert_allclose) from mne import create_info from mne.datasets.testing import data_path from mne.io import read_raw_nirx, RawArray from mne.preprocessing.nirs import (optical_density, beer_lambert_law, - _fnirs_check_bads, _fnirs_spread_bads, - _check_channels_ordered, + _fnirs_spread_bads, _validate_nirs_info, + _check_channels_ordered, tddr, _channel_frequencies, _channel_chromophore, - _fnirs_optode_names, _optode_position) + _fnirs_optode_names, _optode_position, + scalp_coupling_index) from mne.io.pick import _picks_to_idx from mne.datasets import testing @@ -94,6 +96,11 @@ def test_fnirs_picks(): pytest.raises(ValueError, _picks_to_idx, raw.info, 'fnirs_fd_phase') +# Backward compat wrapper for simplicity below +def _fnirs_check_bads(info): + _validate_nirs_info(info) + + @testing.requires_testing_data @pytest.mark.parametrize('fname', ([fname_nirx_15_2_short, fname_nirx_15_2, fname_nirx_15_0])) @@ -122,6 +129,9 @@ def test_fnirs_check_bads(fname): pytest.raises(RuntimeError, _fnirs_check_bads, raw.info) with pytest.raises(RuntimeError, match='bad labelling'): raw = optical_density(raw) + raw.info['bads'] = [] + raw = optical_density(raw) + raw.info['bads'] = raw.ch_names[0:1] pytest.raises(RuntimeError, _fnirs_check_bads, raw.info) with pytest.raises(RuntimeError, match='bad labelling'): raw = beer_lambert_law(raw) @@ -176,14 +186,20 @@ def test_fnirs_channel_naming_and_order_readers(fname): with pytest.raises(ValueError, match='not ordered correctly'): _check_channels_ordered(raw_dropped.info, freqs) - # The ordering must match the passed in argument + # The ordering must be increasing for the pairs, if provided raw_names_reversed = raw.copy().ch_names raw_names_reversed.reverse() raw_reversed = raw.copy().pick_channels(raw_names_reversed, ordered=True) - with pytest.raises(ValueError, match='not ordered .* frequencies'): - _check_channels_ordered(raw_reversed.info, freqs) + with pytest.raises(ValueError, match='The frequencies.*sorted.*'): + _check_channels_ordered(raw_reversed.info, [850, 760]) # So if we flip the second argument it should pass again - _check_channels_ordered(raw_reversed.info, [850, 760]) + picks = _check_channels_ordered(raw_reversed.info, freqs) + got_first = set( + raw_reversed.ch_names[pick].split()[1] for pick in picks[::2]) + assert got_first == {'760'} + got_second = set( + raw_reversed.ch_names[pick].split()[1] for pick in picks[1::2]) + assert got_second == {'850'} # Check on OD data raw = optical_density(raw) @@ -203,8 +219,8 @@ def test_fnirs_channel_naming_and_order_readers(fname): assert_array_equal(chroma, ["hbo", "hbr"]) picks = _check_channels_ordered(raw.info, chroma) assert len(picks) == len(raw.ch_names) - with pytest.raises(ValueError, match='not ordered .* chromophore'): - _check_channels_ordered(raw.info, ["hbx", "hbr"]) + with pytest.raises(ValueError, match='chromophore in info'): + _check_channels_ordered(raw.info, ["hbr", "hbo"]) def test_fnirs_channel_naming_and_order_custom_raw(): @@ -237,7 +253,7 @@ def test_fnirs_channel_naming_and_order_custom_raw(): for idx, f in enumerate(freqs): raw.info["chs"][idx]["loc"][9] = f - picks = _check_channels_ordered(raw.info, [920, 850]) + picks = _check_channels_ordered(raw.info, [850, 920]) assert len(picks) == len(raw.ch_names) assert len(picks) == 6 @@ -253,7 +269,7 @@ def test_fnirs_channel_naming_and_order_custom_raw(): for idx, f in enumerate(freqs): raw.info["chs"][idx]["loc"][9] = f with pytest.raises(ValueError, match='not ordered'): - _check_channels_ordered(raw.info, [920, 850]) + _check_channels_ordered(raw.info, [850, 920]) # Catch if someone doesn't set the info field ch_names = ['S1_D1 760', 'S1_D1 850', 'S2_D1 760', 'S2_D1 850', @@ -262,7 +278,7 @@ def test_fnirs_channel_naming_and_order_custom_raw(): info = create_info(ch_names=ch_names, ch_types=ch_types, sfreq=1.0) raw = RawArray(data, info, verbose=True) with pytest.raises(ValueError, match='missing wavelength information'): - _check_channels_ordered(raw.info, [920, 850]) + _check_channels_ordered(raw.info, [850, 920]) # I have seen data encoded not in alternating frequency, but blocked. ch_names = ['S1_D1 760', 'S2_D1 760', 'S3_D1 760', @@ -273,10 +289,6 @@ def test_fnirs_channel_naming_and_order_custom_raw(): freqs = np.repeat([760, 850], 3) for idx, f in enumerate(freqs): raw.info["chs"][idx]["loc"][9] = f - with pytest.raises(ValueError, match='channels not ordered correctly'): - _check_channels_ordered(raw.info, [760, 850]) - # and this is how you would fix the ordering, then it should pass - raw.pick(picks=[0, 3, 1, 4, 2, 5]) _check_channels_ordered(raw.info, [760, 850]) @@ -309,9 +321,9 @@ def test_fnirs_channel_naming_and_order_custom_optical_density(): freqs = np.repeat([760, 850], 3) for idx, f in enumerate(freqs): raw.info["chs"][idx]["loc"][9] = f - with pytest.raises(ValueError, match='channels not ordered correctly'): - _check_channels_ordered(raw.info, [760, 850]) - # and this is how you would fix the ordering, then it should pass + # no problems here + _check_channels_ordered(raw.info, [760, 850]) + # or with this (nirx) reordering raw.pick(picks=[0, 3, 1, 4, 2, 5]) _check_channels_ordered(raw.info, [760, 850]) @@ -349,13 +361,13 @@ def test_fnirs_channel_naming_and_order_custom_chroma(): ch_types = np.repeat(["hbo", "hbr"], 3) info = create_info(ch_names=ch_names, ch_types=ch_types, sfreq=1.0) raw = RawArray(data, info, verbose=True) - with pytest.raises(ValueError, match='not ordered .* chromophore'): - _check_channels_ordered(raw.info, ["hbo", "hbr"]) - # Reordering should fix + # no issue here + _check_channels_ordered(raw.info, ["hbo", "hbr"]) + # reordering okay, too raw.pick(picks=[0, 3, 1, 4, 2, 5]) _check_channels_ordered(raw.info, ["hbo", "hbr"]) # Wrong names should fail - with pytest.raises(ValueError, match='not ordered .* chromophore'): + with pytest.raises(ValueError, match='chromophore in info'): _check_channels_ordered(raw.info, ["hbb", "hbr"]) # Test weird naming @@ -365,7 +377,7 @@ def test_fnirs_channel_naming_and_order_custom_chroma(): info = create_info(ch_names=ch_names, ch_types=ch_types, sfreq=1.0) raw = RawArray(data, info, verbose=True) with pytest.raises(ValueError, match='naming conventions'): - _check_channels_ordered(raw.info, ["hbb", "hbr"]) + _check_channels_ordered(raw.info, ["hbo", "hbr"]) # Check more weird naming ch_names = ['S1_DX hbo', 'S1_DX hbr', 'S2_D1 hbo', 'S2_D1 hbr', @@ -402,3 +414,57 @@ def test_optode_loc(): raw = read_raw_nirx(fname_nirx_15_2_short) loc = _optode_position(raw.info, "D3") assert_array_almost_equal(loc, [0.082804, 0.01573, 0.024852]) + + +def test_order_agnostic(nirx_snirf): + """Test that order does not matter to (pre)processing results.""" + raw_nirx, raw_snirf = nirx_snirf + raw_random = raw_nirx.copy().pick( + np.random.RandomState(0).permutation(len(raw_nirx.ch_names))) + raws = dict(nirx=raw_nirx, snirf=raw_snirf, random=raw_random) + del raw_nirx, raw_snirf, raw_random + orders = dict() + # continuous wave + for key, r in raws.items(): + assert set(r.get_channel_types()) == {'fnirs_cw_amplitude'} + orders[key] = [ + r.ch_names.index(name) for name in raws['nirx'].ch_names] + assert_array_equal( + raws['nirx'].ch_names, np.array(r.ch_names)[orders[key]]) + assert_allclose( + raws['nirx'].get_data(), r.get_data(orders[key]), err_msg=key) + assert_array_equal(orders['nirx'], np.arange(len(raws['nirx'].ch_names))) + # optical density + for key, r in raws.items(): + raws[key] = r = optical_density(r) + assert_allclose( + raws['nirx'].get_data(), r.get_data(orders[key]), err_msg=key) + assert set(r.get_channel_types()) == {'fnirs_od'} + # scalp-coupling index + sci = dict() + for key, r in raws.items(): + sci[key] = r = scalp_coupling_index(r) + assert_allclose(sci['nirx'], r[orders[key]], err_msg=key, rtol=0.01) + # TDDR (on optical) + tddrs = dict() + for key, r in raws.items(): + tddrs[key] = r = tddr(r) + assert_allclose( + tddrs['nirx'].get_data(), r.get_data(orders[key]), err_msg=key, + atol=1e-4) + assert set(r.get_channel_types()) == {'fnirs_od'} + # beer-lambert + for key, r in raws.items(): + raws[key] = r = beer_lambert_law(r) + assert_allclose( + raws['nirx'].get_data(), r.get_data(orders[key]), err_msg=key, + rtol=2e-7) + assert set(r.get_channel_types()) == {'hbo', 'hbr'} + # TDDR (on haemo) + tddrs = dict() + for key, r in raws.items(): + tddrs[key] = r = tddr(r) + assert_allclose( + tddrs['nirx'].get_data(), r.get_data(orders[key]), err_msg=key, + atol=1e-9) + assert set(r.get_channel_types()) == {'hbo', 'hbr'} diff --git a/mne/preprocessing/nirs/tests/test_optical_density.py b/mne/preprocessing/nirs/tests/test_optical_density.py index c4c83acf2cf..084792cf353 100644 --- a/mne/preprocessing/nirs/tests/test_optical_density.py +++ b/mne/preprocessing/nirs/tests/test_optical_density.py @@ -30,6 +30,8 @@ def test_optical_density(): _validate_type(raw, BaseRaw, 'raw') assert 'fnirs_cw_amplitude' not in raw assert 'fnirs_od' in raw + with pytest.raises(RuntimeError, match='on continuous wave'): + optical_density(raw) @testing.requires_testing_data diff --git a/tutorials/io/30_reading_fnirs_data.py b/tutorials/io/30_reading_fnirs_data.py index a32736aa725..ca197de9fe4 100644 --- a/tutorials/io/30_reading_fnirs_data.py +++ b/tutorials/io/30_reading_fnirs_data.py @@ -25,15 +25,6 @@ Manual modification of channel names and metadata is not recommended. -.. note:: To provide a consistent interface across different measurement - devices and file types, MNE-Python uses a standard ordering of - channels. MNE-Python internally orders channels by ascending source - number. When there are multiple channels with the same source number, - they are ordered by ascending detector number. The ordering of - channels is done automatically when data is imported. Therefore, - the ordering of channels within MNE-Python may be different to what - was provided by the hardware vendor. - .. _import-snirf: ***************** @@ -185,17 +176,6 @@ # %% -# -# .. warning:: The channels must be ordered in haemoglobin pairs, such that for -# a single channel all the types are in subsequent indices. The -# type order must be 'hbo' then 'hbr'. -# The data below is already in the correct order and may be -# used as a template for how data must be stored. -# If the order that your data is stored is different to the -# mandatory formatting, then you must first read the data with -# channel naming according to the data structure, then reorder -# the channels to match the required format. -# # Next, we will load the example CSV file. data = pd.read_csv('fnirs.csv')