diff --git a/mne/beamformer/tests/test_dics.py b/mne/beamformer/tests/test_dics.py index 38127a8be65..ff576a491f7 100644 --- a/mne/beamformer/tests/test_dics.py +++ b/mne/beamformer/tests/test_dics.py @@ -555,12 +555,12 @@ def test_apply_dics_timeseries(_load_forward, idx): @pytest.mark.slowtest @testing.requires_testing_data -@idx_param -def test_tf_dics(_load_forward, idx): +def test_tf_dics(_load_forward): """Test 5D time-frequency beamforming based on DICS.""" fwd_free, fwd_surf, fwd_fixed, _ = _load_forward + # idx isn't really used so let's just simulate one epochs, _, _, source_vertno, label, vertices, source_ind = \ - _simulate_data(fwd_fixed, idx) + _simulate_data(fwd_fixed, idx=0) reg = 1 # Lots of regularization for our toy dataset tmin = 0 diff --git a/mne/beamformer/tests/test_lcmv.py b/mne/beamformer/tests/test_lcmv.py index 9799b52ea87..774bcda04e5 100644 --- a/mne/beamformer/tests/test_lcmv.py +++ b/mne/beamformer/tests/test_lcmv.py @@ -6,8 +6,7 @@ from scipy import linalg from scipy.spatial.distance import cdist from numpy.testing import (assert_array_almost_equal, assert_array_equal, - assert_almost_equal, assert_allclose, - assert_array_less) + assert_allclose, assert_array_less) import mne from mne.transforms import apply_trans, invert_transform @@ -130,9 +129,9 @@ def test_lcmv_vector(): forward = mne.read_forward_solution(fname_fwd) forward = mne.pick_channels_forward(forward, info['ch_names']) - vertices = [s['vertno'][::100] for s in forward['src']] + vertices = [s['vertno'][::200] for s in forward['src']] n_vertices = sum(len(v) for v in vertices) - assert 5 < n_vertices < 20 + assert n_vertices == 4 amplitude = 100e-9 stc = mne.SourceEstimate(amplitude * np.eye(n_vertices), vertices, @@ -207,92 +206,89 @@ def test_lcmv_vector(): @pytest.mark.slowtest @requires_h5py @testing.requires_testing_data -@pytest.mark.parametrize('reg', (0.01, 0.)) -@pytest.mark.parametrize('proj', (True, False)) -def test_make_lcmv(tmpdir, reg, proj): +@pytest.mark.parametrize('reg, proj, kind', [ + (0.01, True, 'volume'), + (0., False, 'volume'), + (0.01, False, 'surface'), + (0., True, 'surface'), +]) +def test_make_lcmv_bem(tmpdir, reg, proj, kind): """Test LCMV with evoked data and single trials.""" raw, epochs, evoked, data_cov, noise_cov, label, forward,\ forward_surf_ori, forward_fixed, forward_vol = _get_data(proj=proj) - for fwd in [forward, forward_vol]: - filters = make_lcmv(evoked.info, fwd, data_cov, reg=reg, - noise_cov=noise_cov) - stc = apply_lcmv(evoked, filters, max_ori_out='signed') - stc.crop(0.02, None) + if kind == 'surface': + fwd = forward + else: + fwd = forward_vol + assert kind == 'volume' - stc_pow = np.sum(np.abs(stc.data), axis=1) - idx = np.argmax(stc_pow) - max_stc = stc.data[idx] - tmax = stc.times[np.argmax(max_stc)] - - assert 0.08 < tmax < 0.15, tmax - assert 0.9 < np.max(max_stc) < 3.5, np.max(max_stc) - - if fwd is forward: - # Test picking normal orientation (surface source space only). - filters = make_lcmv(evoked.info, forward_surf_ori, data_cov, - reg=reg, noise_cov=noise_cov, - pick_ori='normal', weight_norm=None) - stc_normal = apply_lcmv(evoked, filters, max_ori_out='signed') - stc_normal.crop(0.02, None) - - stc_pow = np.sum(np.abs(stc_normal.data), axis=1) - idx = np.argmax(stc_pow) - max_stc = stc_normal.data[idx] - tmax = stc_normal.times[np.argmax(max_stc)] - - lower = 0.04 if proj else 0.025 - assert lower < tmax < 0.14, tmax - lower = 3e-7 if proj else 2e-7 - assert lower < np.max(max_stc) < 3e-6, np.max(max_stc) - - # No weight normalization was applied, so the amplitude of normal - # orientation results should always be smaller than free - # orientation results. - assert (np.abs(stc_normal.data) <= stc.data).all() - - # Test picking source orientation maximizing output source power - filters = make_lcmv(evoked.info, fwd, data_cov, reg=reg, - noise_cov=noise_cov, pick_ori='max-power') - stc_max_power = apply_lcmv(evoked, filters, max_ori_out='signed') - stc_max_power.crop(0.02, None) - stc_pow = np.sum(np.abs(stc_max_power.data), axis=1) + filters = make_lcmv(evoked.info, fwd, data_cov, reg=reg, + noise_cov=noise_cov) + stc = apply_lcmv(evoked, filters, max_ori_out='signed') + stc.crop(0.02, None) + + stc_pow = np.sum(np.abs(stc.data), axis=1) + idx = np.argmax(stc_pow) + max_stc = stc.data[idx] + tmax = stc.times[np.argmax(max_stc)] + + assert 0.08 < tmax < 0.15, tmax + assert 0.9 < np.max(max_stc) < 3.5, np.max(max_stc) + + if kind == 'surface': + # Test picking normal orientation (surface source space only). + filters = make_lcmv(evoked.info, forward_surf_ori, data_cov, + reg=reg, noise_cov=noise_cov, + pick_ori='normal', weight_norm=None) + stc_normal = apply_lcmv(evoked, filters, max_ori_out='signed') + stc_normal.crop(0.02, None) + + stc_pow = np.sum(np.abs(stc_normal.data), axis=1) idx = np.argmax(stc_pow) - max_stc = np.abs(stc_max_power.data[idx]) - tmax = stc.times[np.argmax(max_stc)] - - lower = 0.08 if proj else 0.04 - assert lower < tmax < 0.15, tmax - assert 0.8 < np.max(max_stc) < 3., np.max(max_stc) - - stc_max_power.data[:, :] = np.abs(stc_max_power.data) - - if fwd is forward: - # Maximum output source power orientation results should be - # similar to free orientation results in areas with channel - # coverage - label = mne.read_label(fname_label) - mean_stc = stc.extract_label_time_course(label, fwd['src'], - mode='mean') - mean_stc_max_pow = \ - stc_max_power.extract_label_time_course(label, fwd['src'], - mode='mean') - assert_array_less(np.abs(mean_stc - mean_stc_max_pow), 1.0) - - # Test NAI weight normalization: - filters = make_lcmv(evoked.info, fwd, data_cov, reg=reg, - noise_cov=noise_cov, pick_ori='max-power', - weight_norm='nai') - stc_nai = apply_lcmv(evoked, filters, max_ori_out='signed') - stc_nai.crop(0.02, None) - - # Test whether unit-noise-gain solution is a scaled version of NAI - pearsoncorr = np.corrcoef(np.concatenate(np.abs(stc_nai.data)), - np.concatenate(stc_max_power.data)) - assert_almost_equal(pearsoncorr[0, 1], 1.) + max_stc = stc_normal.data[idx] + tmax = stc_normal.times[np.argmax(max_stc)] + + lower = 0.04 if proj else 0.025 + assert lower < tmax < 0.14, tmax + lower = 3e-7 if proj else 2e-7 + assert lower < np.max(max_stc) < 3e-6, np.max(max_stc) + + # No weight normalization was applied, so the amplitude of normal + # orientation results should always be smaller than free + # orientation results. + assert (np.abs(stc_normal.data) <= stc.data).all() + + # Test picking source orientation maximizing output source power + filters = make_lcmv(evoked.info, fwd, data_cov, reg=reg, + noise_cov=noise_cov, pick_ori='max-power') + stc_max_power = apply_lcmv(evoked, filters, max_ori_out='signed') + stc_max_power.crop(0.02, None) + stc_pow = np.sum(np.abs(stc_max_power.data), axis=1) + idx = np.argmax(stc_pow) + max_stc = np.abs(stc_max_power.data[idx]) + tmax = stc.times[np.argmax(max_stc)] + + lower = 0.08 if proj else 0.04 + assert lower < tmax < 0.15, tmax + assert 0.8 < np.max(max_stc) < 3., np.max(max_stc) + + stc_max_power.data[:, :] = np.abs(stc_max_power.data) + + if kind == 'surface': + # Maximum output source power orientation results should be + # similar to free orientation results in areas with channel + # coverage + label = mne.read_label(fname_label) + mean_stc = stc.extract_label_time_course( + label, fwd['src'], mode='mean') + mean_stc_max_pow = \ + stc_max_power.extract_label_time_course( + label, fwd['src'], mode='mean') + assert_array_less(np.abs(mean_stc - mean_stc_max_pow), 1.0) # Test if spatial filter contains src_type - assert 'src_type' in filters + assert filters['src_type'] == kind # __repr__ assert len(evoked.ch_names) == 22 @@ -301,7 +297,7 @@ def test_make_lcmv(tmpdir, reg, proj): rank = 17 if proj else 20 assert 'LCMV' in repr(filters) assert 'unknown subject' not in repr(filters) - assert '4157 vert' in repr(filters) + assert f'{fwd["nsource"]} vert' in repr(filters) assert '20 ch' in repr(filters) assert 'rank %s' % rank in repr(filters) @@ -317,6 +313,9 @@ def test_make_lcmv(tmpdir, reg, proj): filters['rank'] = int(filters['rank']) assert object_diff(filters, filters_read) == '' + if kind != 'surface': + return + # Test if fixed forward operator is detected when picking normal or # max-power orientation pytest.raises(ValueError, make_lcmv, evoked.info, forward_fixed, data_cov, @@ -357,9 +356,8 @@ def test_make_lcmv(tmpdir, reg, proj): # this channel from the data # also test here that no warnings are thrown - implemented to check whether # src should not be None warning occurs - with pytest.warns(None) as w: - stc = apply_lcmv(evoked, filters, max_ori_out='signed') - assert len(w) == 0 + stc = apply_lcmv(evoked, filters, max_ori_out='signed') + # the result should be equal to applying this filter to a dataset without # this channel: stc_ch = apply_lcmv(evoked_ch, filters, max_ori_out='signed') @@ -367,11 +365,16 @@ def test_make_lcmv(tmpdir, reg, proj): # Test if non-matching SSP projection is detected in application of filter if proj: - raw_proj = deepcopy(raw) - raw_proj.del_proj() + raw_proj = raw.copy().del_proj() with pytest.raises(ValueError, match='do not match the projections'): apply_lcmv_raw(raw_proj, filters, max_ori_out='signed') + # Test apply_lcmv_raw + use_raw = raw.copy().crop(0, 1) + stc = apply_lcmv_raw(use_raw, filters) + assert_allclose(stc.times, use_raw.times) + assert_array_equal(stc.vertices[0], forward_vol['src'][0]['vertno']) + # Test if spatial filter contains src_type assert 'src_type' in filters @@ -433,8 +436,13 @@ def test_make_lcmv(tmpdir, reg, proj): @testing.requires_testing_data @pytest.mark.slowtest -@pytest.mark.parametrize('weight_norm', (None, 'unit-noise-gain', 'nai')) -@pytest.mark.parametrize('pick_ori', (None, 'max-power', 'vector')) +@pytest.mark.parametrize('weight_norm, pick_ori', [ + ('unit-noise-gain', 'max-power'), + ('unit-noise-gain', 'vector'), + ('unit-noise-gain', None), + ('nai', 'vector'), + (None, 'max-power'), +]) def test_make_lcmv_sphere(pick_ori, weight_norm): """Test LCMV with sphere head model.""" # unit-noise gain beamformer and orientation @@ -479,33 +487,6 @@ def test_make_lcmv_sphere(pick_ori, weight_norm): assert min_ < np.max(max_stc) < max_, (min_, np.max(max_stc), max_) -@testing.requires_testing_data -def test_lcmv_raw(): - """Test LCMV with raw data.""" - raw, _, _, _, noise_cov, label, forward, _, _, _ =\ - _get_data(all_forward=False, epochs=False, data_cov=False) - - tmin, tmax = 0, 20 - start, stop = raw.time_as_index([tmin, tmax]) - - # use only the left-temporal MEG channels for LCMV - data_cov = mne.compute_raw_covariance(raw, tmin=tmin, tmax=tmax) - filters = make_lcmv(raw.info, forward, data_cov, reg=0.01, - noise_cov=noise_cov, label=label) - stc = apply_lcmv_raw(raw, filters, start=start, stop=stop, - max_ori_out='signed') - - assert_array_almost_equal(np.array([tmin, tmax]), - np.array([stc.times[0], stc.times[-1]]), - decimal=2) - - # make sure we get an stc with vertices only in the lh - vertno = [forward['src'][0]['vertno'], forward['src'][1]['vertno']] - assert len(stc.vertices[0]) == len(np.intersect1d(vertno[0], - label.vertices)) - assert len(stc.vertices[1]) == 0 - - @testing.requires_testing_data @pytest.mark.parametrize('weight_norm', (None, 'unit-noise-gain')) @pytest.mark.parametrize('pick_ori', ('max-power', 'normal')) @@ -542,16 +523,15 @@ def test_lcmv_ctf_comp(): ctf_dir = op.join(testing.data_path(download=False), 'CTF') raw_fname = op.join(ctf_dir, 'somMDYO-18av.ds') raw = mne.io.read_raw_ctf(raw_fname, preload=True) + raw.pick(raw.ch_names[:70]) events = mne.make_fixed_length_events(raw, duration=0.2)[:2] epochs = mne.Epochs(raw, events, tmin=-0.1, tmax=0.2) evoked = epochs.average() - with pytest.warns(RuntimeWarning, - match='Too few samples .* estimate may be unreliable'): - data_cov = mne.compute_covariance(epochs) + data_cov = mne.compute_covariance(epochs) fwd = mne.make_forward_solution(evoked.info, None, - mne.setup_volume_source_space(pos=15.0), + mne.setup_volume_source_space(pos=30.0), mne.make_sphere_model()) with pytest.raises(ValueError, match='reduce_rank'): make_lcmv(evoked.info, fwd, data_cov) @@ -566,8 +546,12 @@ def test_lcmv_ctf_comp(): @testing.requires_testing_data -@pytest.mark.parametrize('proj', [False, True]) -@pytest.mark.parametrize('weight_norm', (None, 'nai', 'unit-noise-gain')) +@pytest.mark.parametrize('proj, weight_norm', [ + (True, 'unit-noise-gain'), + (False, 'unit-noise-gain'), + (True, None), + (True, 'nai'), +]) def test_lcmv_reg_proj(proj, weight_norm): """Test LCMV with and without proj.""" raw = mne.io.read_raw_fif(fname_raw, preload=True) @@ -777,8 +761,12 @@ def test_orientation_max_power(bias_params_fixed, bias_params_free, assert lower_ori < got < upper_ori -@pytest.mark.parametrize('weight_norm', ('nai', 'unit-noise-gain')) -@pytest.mark.parametrize('pick_ori', ('vector', 'max-power', None)) +@pytest.mark.parametrize('weight_norm, pick_ori', [ + ('nai', 'max-power'), + ('unit-noise-gain', 'vector'), + ('unit-noise-gain', 'max-power'), + ('unit-noise-gain', None), +]) def test_depth_does_not_matter(bias_params_free, weight_norm, pick_ori): """Test that depth weighting does not matter for normalized filters.""" evoked, fwd, noise_cov, data_cov, _ = bias_params_free @@ -814,12 +802,24 @@ def test_lcmv_maxfiltered(): make_lcmv(epochs.info, fwd, data_cov, rank=use_rank) +# To reduce test time, only test combinations that should matter rather than +# all of them @testing.requires_testing_data -@pytest.mark.parametrize('pick_ori', ['vector', 'max-power', 'normal']) -@pytest.mark.parametrize( - 'weight_norm', ['unit-noise-gain', 'nai', 'unit-noise-gain-invariant']) -@pytest.mark.parametrize('reg', (0.05, 0.)) -@pytest.mark.parametrize('inversion', ['matrix', 'single']) +@pytest.mark.parametrize('pick_ori, weight_norm, reg, inversion', [ + ('vector', 'unit-noise-gain-invariant', 0.05, 'matrix'), + ('vector', 'unit-noise-gain-invariant', 0.05, 'single'), + ('vector', 'unit-noise-gain', 0.05, 'matrix'), + ('vector', 'unit-noise-gain', 0.05, 'single'), + ('vector', 'unit-noise-gain', 0.0, 'matrix'), + ('vector', 'unit-noise-gain', 0.0, 'single'), + ('vector', 'nai', 0.05, 'matrix'), + ('max-power', 'unit-noise-gain', 0.05, 'matrix'), + ('max-power', 'unit-noise-gain', 0.0, 'single'), + ('max-power', 'unit-noise-gain', 0.05, 'single'), + ('max-power', 'unit-noise-gain-invariant', 0.05, 'matrix'), + ('normal', 'unit-noise-gain', 0.05, 'matrix'), + ('normal', 'nai', 0.0, 'matrix'), +]) def test_unit_noise_gain_formula(pick_ori, weight_norm, reg, inversion): """Test unit-noise-gain filter against formula.""" raw = mne.io.read_raw_fif(fname_raw, preload=True) diff --git a/mne/io/ctf/tests/test_ctf.py b/mne/io/ctf/tests/test_ctf.py index ee2630f8bc8..b7dd0eb6067 100644 --- a/mne/io/ctf/tests/test_ctf.py +++ b/mne/io/ctf/tests/test_ctf.py @@ -212,8 +212,7 @@ def test_read_ctf(tmpdir): assert_allclose(raw_read[pick_ch, sl_time][0], raw_c[pick_ch, sl_time][0]) # all data / preload - with pytest.warns(None): # sometimes MISC - raw = read_raw_ctf(fname, preload=True) + raw.load_data() assert_allclose(raw[:][0], raw_c[:][0], atol=1e-15) # test bad segment annotations if 'testdata_ctf_short.ds' in fname: diff --git a/mne/io/eeglab/tests/test_eeglab.py b/mne/io/eeglab/tests/test_eeglab.py index 3ed15af5ec2..c0c9a22846d 100644 --- a/mne/io/eeglab/tests/test_eeglab.py +++ b/mne/io/eeglab/tests/test_eeglab.py @@ -20,7 +20,7 @@ from mne.io import read_raw_eeglab from mne.io.tests.test_raw import _test_raw_reader from mne.datasets import testing -from mne.utils import requires_h5py, run_tests_if_main +from mne.utils import check_version from mne.annotations import events_from_annotations, read_annotations from mne.io.eeglab.tests._utils import _read_eeglab_montage @@ -42,25 +42,18 @@ raw_h5_fnames = [raw_fname_h5, raw_fname_onefile_h5] epochs_h5_fnames = [epochs_fname_h5, epochs_fname_onefile_h5] -raw_fnames = [raw_fname_mat, raw_fname_onefile_mat, - raw_fname_h5, raw_fname_onefile_h5] montage_path = op.join(base_dir, 'test_chans.locs') -def _check_h5(fname): - if fname.endswith('_h5.set'): - try: - import h5py # noqa, analysis:ignore - except Exception: - raise SkipTest('h5py module required') +needs_h5 = pytest.mark.skipif(not check_version('h5py'), reason='Needs h5py') -@requires_h5py @testing.requires_testing_data -@pytest.mark.slowtest -@pytest.mark.parametrize( - 'fname', [raw_fname_mat, raw_fname_h5, raw_fname_chanloc], ids=op.basename -) +@pytest.mark.parametrize('fname', [ + raw_fname_mat, + pytest.param(raw_fname_h5, marks=needs_h5), + raw_fname_chanloc, +], ids=op.basename) def test_io_set_raw(fname): """Test importing EEGLAB .set files.""" montage = _read_eeglab_montage(montage_path) @@ -72,22 +65,19 @@ def test_io_set_raw(fname): if fname.endswith('test_raw_chanloc.set'): with pytest.warns(RuntimeWarning, match="The data contains 'boundary' events"): - _test_raw_reader(**kws) + raw0 = _test_raw_reader(**kws) + elif '_h5' in fname: # should be safe enough, and much faster + raw0 = read_raw_eeglab(fname, preload=True) else: - _test_raw_reader(**kws) + raw0 = _test_raw_reader(**kws) # test that preloading works - read_raw_kws = dict(input_fname=fname, preload=True) if fname.endswith('test_raw_chanloc.set'): - with pytest.warns(RuntimeWarning, - match="The data contains 'boundary' events"): - raw0 = read_raw_eeglab(**read_raw_kws) - raw0.set_montage(montage, on_missing='ignore') - # crop to check if the data has been properly preloaded; we cannot - # filter as the snippet of raw data is very short - raw0.crop(0, 1) + raw0.set_montage(montage, on_missing='ignore') + # crop to check if the data has been properly preloaded; we cannot + # filter as the snippet of raw data is very short + raw0.crop(0, 1) else: - raw0 = read_raw_eeglab(**read_raw_kws) raw0.set_montage(montage) raw0.filter(1, None, l_trans_bandwidth='auto', filter_length='auto', phase='zero') @@ -103,6 +93,12 @@ def test_io_set_raw(fname): raw0 = read_raw_eeglab(**read_raw_kws) raw0.set_montage(montage) + # Annotations + if fname != raw_fname_chanloc: + assert len(raw0.annotations) == 154 + assert set(raw0.annotations.description) == {'rt', 'square'} + assert_array_equal(raw0.annotations.duration, 0.) + @testing.requires_testing_data def test_io_set_raw_more(tmpdir): @@ -247,11 +243,12 @@ def test_io_set_raw_more(tmpdir): np.array([np.nan, np.nan, np.nan])) -@pytest.mark.slowtest # slow-ish on Travis OSX @pytest.mark.timeout(60) # ~60 sec on Travis OSX -@requires_h5py @testing.requires_testing_data -@pytest.mark.parametrize('fnames', [epochs_mat_fnames, epochs_h5_fnames]) +@pytest.mark.parametrize('fnames', [ + epochs_mat_fnames, + pytest.param(epochs_h5_fnames, marks=[needs_h5, pytest.mark.slowtest]), +]) def test_io_set_epochs(fnames): """Test importing EEGLAB .set epochs files.""" epochs_fname, epochs_fname_onefile = fnames @@ -306,12 +303,16 @@ def test_degenerate(tmpdir): bad_epochs_fname) -@pytest.mark.parametrize("fname", raw_fnames) +@pytest.mark.parametrize("fname", [ + raw_fname_mat, + raw_fname_onefile_mat, + # We don't test the h5 varaints here because they are implicitly tested + # in test_io_set_raw +]) @pytest.mark.filterwarnings('ignore: Complex objects') @testing.requires_testing_data def test_eeglab_annotations(fname): """Test reading annotations in EEGLAB files.""" - _check_h5(fname) annotations = read_annotations(fname) assert len(annotations) == 154 assert set(annotations.description) == {'rt', 'square'} @@ -421,6 +422,3 @@ def test_position_information(one_chanpos_fname): _assert_array_allclose_nan(np.array([ch['loc'] for ch in raw.info['chs']]), EXPECTED_LOCATIONS_FROM_MONTAGE) - - -run_tests_if_main() diff --git a/mne/io/egi/tests/test_egi.py b/mne/io/egi/tests/test_egi.py index 40e667cf006..01486ab98a4 100644 --- a/mne/io/egi/tests/test_egi.py +++ b/mne/io/egi/tests/test_egi.py @@ -59,11 +59,15 @@ ]) def test_egi_mff_pause(fname, skip_times, event_times): """Test EGI MFF with pauses.""" - with pytest.warns(RuntimeWarning, match='Acquisition skips detected'): - raw = _test_raw_reader(read_raw_egi, input_fname=fname, - test_scaling=False, # XXX probably some bug - test_rank='less', - ) + if fname == egi_pause_w1337_fname: + # too slow to _test_raw_reader + raw = read_raw_egi(fname).load_data() + else: + with pytest.warns(RuntimeWarning, match='Acquisition skips detected'): + raw = _test_raw_reader(read_raw_egi, input_fname=fname, + test_scaling=False, # XXX probably some bug + test_rank='less', + ) assert raw.info['sfreq'] == 250. # true for all of these files assert len(raw.annotations) == len(skip_times) diff --git a/mne/io/fieldtrip/tests/test_fieldtrip.py b/mne/io/fieldtrip/tests/test_fieldtrip.py index b4890d794a3..2e654d4608a 100644 --- a/mne/io/fieldtrip/tests/test_fieldtrip.py +++ b/mne/io/fieldtrip/tests/test_fieldtrip.py @@ -33,6 +33,11 @@ all_test_params_epochs = list(itertools.product(all_systems_epochs, all_versions, use_info)) +# just for speed we skip some slowest ones -- the coverage should still +# be sufficient +for key in [('CTF', 'v73', True), ('neuromag306', 'v73', False)]: + all_test_params_epochs.pop(all_test_params_epochs.index(key)) + all_test_params_raw.pop(all_test_params_raw.index(key)) no_info_warning = {'expected_warning': RuntimeWarning, 'match': NOINFO_WARNING} diff --git a/mne/io/fiff/tests/test_raw_fiff.py b/mne/io/fiff/tests/test_raw_fiff.py index 51c1137464b..e3dbfb99358 100644 --- a/mne/io/fiff/tests/test_raw_fiff.py +++ b/mne/io/fiff/tests/test_raw_fiff.py @@ -606,7 +606,7 @@ def test_io_raw(tmpdir): def test_io_raw_additional(fname_in, fname_out, tmpdir): """Test IO for raw data (Neuromag + CTF + gz).""" fname_out = tmpdir.join(fname_out) - raw = read_raw_fif(fname_in) + raw = read_raw_fif(fname_in).crop(0, 2) nchan = raw.info['nchan'] ch_names = raw.info['ch_names'] @@ -1067,16 +1067,21 @@ def test_resample_equiv(): @testing.requires_testing_data -@pytest.mark.parametrize('preload', (True, False)) -def test_resample(tmpdir, preload): +@pytest.mark.parametrize('preload, n, npad', [ + (True, 512, 'auto'), + (False, 512, 0), +]) +def test_resample(tmpdir, preload, n, npad): """Test resample (with I/O and multiple files).""" - raw = read_raw_fif(fif_fname).crop(0, 3) + raw = read_raw_fif(fif_fname) + raw.crop(0, raw.times[n - 1]) + assert len(raw.times) == n if preload: raw.load_data() raw_resamp = raw.copy() sfreq = raw.info['sfreq'] # test parallel on upsample - raw_resamp.resample(sfreq * 2, n_jobs=2, npad='auto') + raw_resamp.resample(sfreq * 2, n_jobs=2, npad=npad) assert raw_resamp.n_times == len(raw_resamp.times) raw_resamp.save(tmpdir.join('raw_resamp-raw.fif')) raw_resamp = read_raw_fif(tmpdir.join('raw_resamp-raw.fif'), @@ -1086,7 +1091,7 @@ def test_resample(tmpdir, preload): assert raw_resamp.get_data().shape[1] == raw_resamp.n_times assert raw.get_data().shape[0] == raw_resamp._data.shape[0] # test non-parallel on downsample - raw_resamp.resample(sfreq, n_jobs=1, npad='auto') + raw_resamp.resample(sfreq, n_jobs=1, npad=npad) assert raw_resamp.info['sfreq'] == sfreq assert raw.get_data().shape == raw_resamp._data.shape assert raw.first_samp == raw_resamp.first_samp @@ -1109,9 +1114,9 @@ def test_resample(tmpdir, preload): raw3 = raw.copy() raw4 = raw.copy() raw1 = concatenate_raws([raw1, raw2]) - raw1.resample(10., npad='auto') - raw3.resample(10., npad='auto') - raw4.resample(10., npad='auto') + raw1.resample(10., npad=npad) + raw3.resample(10., npad=npad) + raw4.resample(10., npad=npad) raw3 = concatenate_raws([raw3, raw4]) assert_array_equal(raw1._data, raw3._data) assert_array_equal(raw1._first_samps, raw3._first_samps) @@ -1129,12 +1134,12 @@ def test_resample(tmpdir, preload): # basic decimation stim = [1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0] raw = RawArray([stim], create_info(1, len(stim), ['stim'])) - assert_allclose(raw.resample(8., npad='auto')._data, + assert_allclose(raw.resample(8., npad=npad)._data, [[1, 1, 0, 0, 1, 1, 0, 0]]) # decimation of multiple stim channels raw = RawArray(2 * [stim], create_info(2, len(stim), 2 * ['stim'])) - assert_allclose(raw.resample(8., npad='auto', verbose='error')._data, + assert_allclose(raw.resample(8., npad=npad, verbose='error')._data, [[1, 1, 0, 0, 1, 1, 0, 0], [1, 1, 0, 0, 1, 1, 0, 0]]) @@ -1142,20 +1147,20 @@ def test_resample(tmpdir, preload): # done naively stim = [0, 0, 0, 1, 1, 0, 0, 0] raw = RawArray([stim], create_info(1, len(stim), ['stim'])) - assert_allclose(raw.resample(4., npad='auto')._data, + assert_allclose(raw.resample(4., npad=npad)._data, [[0, 1, 1, 0]]) # two events are merged in this case (warning) stim = [0, 0, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0] raw = RawArray([stim], create_info(1, len(stim), ['stim'])) with pytest.warns(RuntimeWarning, match='become unreliable'): - raw.resample(8., npad='auto') + raw.resample(8., npad=npad) # events are dropped in this case (warning) stim = [0, 1, 1, 0, 0, 1, 1, 0] raw = RawArray([stim], create_info(1, len(stim), ['stim'])) with pytest.warns(RuntimeWarning, match='become unreliable'): - raw.resample(4., npad='auto') + raw.resample(4., npad=npad) # test resampling events: this should no longer give a warning # we often have first_samp != 0, include it here too @@ -1167,7 +1172,7 @@ def test_resample(tmpdir, preload): raw = RawArray([stim], create_info(1, o_sfreq, ['stim']), first_samp=first_samp) events = find_events(raw) - raw, events = raw.resample(n_sfreq, events=events, npad='auto') + raw, events = raw.resample(n_sfreq, events=events, npad=npad) # Try index into raw.times with resampled events: raw.times[events[:, 0] - raw.first_samp] n_fsamp = int(first_samp * sfreq_ratio) # how it's calc'd in base.py @@ -1183,15 +1188,15 @@ def test_resample(tmpdir, preload): # test copy flag stim = [1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0] raw = RawArray([stim], create_info(1, len(stim), ['stim'])) - raw_resampled = raw.copy().resample(4., npad='auto') + raw_resampled = raw.copy().resample(4., npad=npad) assert (raw_resampled is not raw) - raw_resampled = raw.resample(4., npad='auto') + raw_resampled = raw.resample(4., npad=npad) assert (raw_resampled is raw) # resample should still work even when no stim channel is present raw = RawArray(np.random.randn(1, 100), create_info(1, 100, ['eeg'])) raw.info['lowpass'] = 50. - raw.resample(10, npad='auto') + raw.resample(10, npad=npad) assert raw.info['lowpass'] == 5. assert len(raw) == 10 diff --git a/mne/preprocessing/tests/test_ecg.py b/mne/preprocessing/tests/test_ecg.py index 1f62a1522c1..16cfdfdbebb 100644 --- a/mne/preprocessing/tests/test_ecg.py +++ b/mne/preprocessing/tests/test_ecg.py @@ -4,7 +4,6 @@ from mne.io import read_raw_fif from mne import pick_types from mne.preprocessing import find_ecg_events, create_ecg_epochs -from mne.utils import run_tests_if_main data_path = op.join(op.dirname(__file__), '..', '..', 'io', 'tests', 'data') raw_fname = op.join(data_path, 'test_raw.fif') @@ -15,7 +14,9 @@ def test_find_ecg(): """Test find ECG peaks.""" # Test if ECG analysis will work on data that is not preloaded - raw = read_raw_fif(raw_fname, preload=False) + raw = read_raw_fif(raw_fname, preload=False).pick_types(meg=True) + raw.pick(raw.ch_names[::10] + ['MEG 2641']) + raw.info.normalize_proj() # once with mag-trick # once with characteristic channel @@ -24,14 +25,16 @@ def test_find_ecg(): raw_bad._data[ecg_idx, :1] = 1e6 # this will break the detector raw_bad.annotations.append(raw.first_samp / raw.info['sfreq'], 1. / raw.info['sfreq'], 'BAD_values') + raw_noload = raw.copy() + raw.resample(100) - for ch_name, tstart in zip(['MEG 1531', None, None], - [raw.times[-1] / 2, raw.times[-1] / 2, 0]): + for ch_name, tstart in zip(['MEG 1531', None], + [raw.times[-1] / 2, 0]): events, ch_ECG, average_pulse, ecg = find_ecg_events( raw, event_id=999, ch_name=ch_name, tstart=tstart, return_ecg=True) assert raw.n_times == ecg.shape[-1] - assert 55 < average_pulse < 60 + assert 40 < average_pulse < 60 n_events = len(events) # with annotations @@ -44,11 +47,6 @@ def test_find_ecg(): reject_by_annotation=True)[2] assert 55 < average_pulse < 60 - average_pulse = find_ecg_events(raw_bad, ch_name='MEG 2641', - reject_by_annotation=False)[2] - assert 55 < average_pulse < 65 - del raw_bad - picks = pick_types( raw.info, meg='grad', eeg=False, stim=False, eog=False, ecg=True, emg=False, ref_meg=False, @@ -58,10 +56,11 @@ def test_find_ecg(): # tested assert 'ecg' not in raw - ecg_epochs = create_ecg_epochs(raw, picks=picks, keep_ecg=True) + ecg_epochs = create_ecg_epochs(raw_noload, picks=picks, keep_ecg=True) assert len(ecg_epochs.events) == n_events assert 'ECG-SYN' not in raw.ch_names assert 'ECG-SYN' in ecg_epochs.ch_names + assert len(ecg_epochs) == 23 picks = pick_types( ecg_epochs.info, meg=False, eeg=False, stim=False, @@ -74,15 +73,14 @@ def test_find_ecg(): # test with user provided ecg channel raw.info['projs'] = list() + assert 'MEG 2641' in raw.ch_names with pytest.warns(RuntimeWarning, match='unit for channel'): raw.set_channel_types({'MEG 2641': 'ecg'}) create_ecg_epochs(raw) - raw.load_data().pick_types(meg=True) # remove ECG + raw.pick_types(meg=True) # remove ECG + assert 'MEG 2641' not in raw.ch_names ecg_epochs = create_ecg_epochs(raw, keep_ecg=False) assert len(ecg_epochs.events) == n_events assert 'ECG-SYN' not in raw.ch_names assert 'ECG-SYN' not in ecg_epochs.ch_names - - -run_tests_if_main() diff --git a/mne/preprocessing/tests/test_ica.py b/mne/preprocessing/tests/test_ica.py index fc21a218d59..78a504a2684 100644 --- a/mne/preprocessing/tests/test_ica.py +++ b/mne/preprocessing/tests/test_ica.py @@ -515,27 +515,37 @@ def test_ica_core(method, n_components, noise_cov, n_pca_components): ica.fit(epochs, picks=[0, 1]) -@requires_sklearn -@pytest.mark.slowtest -@pytest.mark.parametrize("method", ["picard", "fastica"]) -def test_ica_additional(method, tmpdir): - """Test additional ICA functionality.""" - _skip_check_picard(method) - - stop2 = 500 - raw = read_raw_fif(raw_fname).crop(1.5, stop).load_data() +@pytest.fixture +def short_raw_epochs(): + """Get small data.""" + raw = read_raw_fif(raw_fname).crop(0, 5).load_data() + raw.pick_channels(set(raw.ch_names[::10]) | set( + ['EOG 061', 'MEG 1531', 'MEG 1441', 'MEG 0121'])) + assert 'eog' in raw raw.del_proj() # avoid warnings raw.set_annotations(Annotations([0.5], [0.5], ['BAD'])) + raw.resample(100) # XXX This breaks the tests :( # raw.info['bads'] = [raw.ch_names[1]] - test_cov = read_cov(test_cov_name) - events = read_events(event_name) - picks = pick_types(raw.info, meg=True, stim=False, ecg=False, - eog=False, exclude='bads')[1::4] + # Create epochs that have different channels from raw + events = make_fixed_length_events(raw) + picks = pick_types(raw.info, meg=True, eeg=True, eog=False)[:-1] epochs = Epochs(raw, events, None, tmin, tmax, picks=picks, baseline=(None, 0), preload=True, proj=False) - epochs.decimate(3, verbose='error') - assert len(epochs) == 4 + assert len(epochs) == 3 + epochs_eog = Epochs(raw, epochs.events, event_id, tmin, tmax, + picks=('meg', 'eog'), baseline=(None, 0), preload=True) + return raw, epochs, epochs_eog + + +@requires_sklearn +@pytest.mark.slowtest +@pytest.mark.parametrize("method", ["picard", "fastica"]) +def test_ica_additional(method, tmpdir, short_raw_epochs): + """Test additional ICA functionality.""" + _skip_check_picard(method) + raw, epochs, epochs_eog = short_raw_epochs + few_picks = np.arange(5) # test if n_components=None works ica = ICA(n_components=None, n_pca_components=None, method=method, @@ -543,17 +553,12 @@ def test_ica_additional(method, tmpdir): with pytest.warns(UserWarning, match='did not converge'): ica.fit(epochs) _assert_ica_attributes(ica, epochs.get_data('data'), limits=(0.05, 20)) - # for testing eog functionality - picks2 = np.concatenate([picks, pick_types(raw.info, False, eog=True)]) - epochs_eog = Epochs(raw, events[:4], event_id, tmin, tmax, picks=picks2, - baseline=(None, 0), preload=True) - del picks2 - - test_cov2 = test_cov.copy() - ica = ICA(noise_cov=test_cov2, n_components=3, method=method) + + test_cov = read_cov(test_cov_name) + ica = ICA(noise_cov=test_cov, n_components=3, method=method) assert (ica.info is None) with pytest.warns(RuntimeWarning, match='normalize_proj'): - ica.fit(raw, picks[:5]) + ica.fit(raw, picks=few_picks) _assert_ica_attributes(ica, raw.get_data(np.arange(5)), limits=(1, 90)) assert (isinstance(ica.info, Info)) assert (ica.n_components_ < 5) @@ -563,12 +568,12 @@ def test_ica_additional(method, tmpdir): ica.save('') with pytest.warns(Warning, match='converge'): - ica.fit(raw, picks=[1, 2, 3, 4, 5], start=start, stop=stop2) + ica.fit(raw, np.arange(1, 6)) _assert_ica_attributes( - ica, raw.get_data(np.arange(1, 6), start=start, stop=stop2)) + ica, raw.get_data(np.arange(1, 6))) # check Kuiper index threshold - assert_equal(ica._get_ctps_threshold(), 0.21) + assert_allclose(ica._get_ctps_threshold(), 0.5) with pytest.raises(TypeError, match='str or numeric'): ica.find_bads_ecg(raw, threshold=None) with pytest.warns(RuntimeWarning, match='is longer than the signal'): @@ -581,6 +586,7 @@ def test_ica_additional(method, tmpdir): # check passing a ch_name to find_bads_ecg with pytest.warns(RuntimeWarning, match='longer'): _, scores_1 = ica.find_bads_ecg(raw, threshold='auto') + with pytest.warns(RuntimeWarning, match='longer'): _, scores_2 = ica.find_bads_ecg(raw, raw.ch_names[1], threshold='auto') assert scores_1[0] != scores_2[0] @@ -616,7 +622,7 @@ def test_ica_additional(method, tmpdir): assert 'No maps selected' in log # make sure a single threshold in a list works - corrmap([ica, ica3], template, threshold=[0.5], label='blinks', plot=True, + corrmap([ica, ica3], template, threshold=[0.5], label='blinks', plot=False, ch_type="mag") ica_different_channels = ICA(n_components=2, max_iter=1) @@ -638,7 +644,7 @@ def test_ica_additional(method, tmpdir): raw_.append(raw_) n_samples = raw_._data.shape[1] with pytest.warns(UserWarning, match='did not converge'): - ica.fit(raw, picks=picks[:5], decim=3) + ica.fit(raw, picks=few_picks) _assert_ica_attributes(ica) assert raw_._data.shape[1] == n_samples @@ -649,7 +655,7 @@ def test_ica_additional(method, tmpdir): ICA(n_components=1, method=method) ica = ICA(n_components=4, method=method, max_iter=1) with pytest.warns(UserWarning, match='did not converge'): - ica.fit(raw, picks=None, decim=3) + ica.fit(raw) _assert_ica_attributes(ica) assert ica.n_components_ == 4 ica_var = _ica_explained_variance(ica, raw, normalize=True) @@ -664,92 +670,63 @@ def test_ica_additional(method, tmpdir): # epochs extraction from raw fit pytest.raises(RuntimeError, ica.get_sources, epochs) - # test reading and writing + + # test filtering + ica_raw = ica.get_sources(raw) + d1 = ica_raw._data[0].copy() + ica_raw.filter(4, 20, fir_design='firwin2') + assert_equal(ica_raw.info['lowpass'], 20.) + assert_equal(ica_raw.info['highpass'], 4.) + assert ((d1 != ica_raw._data[0]).any()) + d1 = ica_raw._data[0].copy() + ica_raw.notch_filter([10], trans_bandwidth=10, fir_design='firwin') + assert ((d1 != ica_raw._data[0]).any()) + test_ica_fname = tmpdir.join('test-ica.fif') - kwargs = dict(n_pca_components=4) - for cov in (None, test_cov): - ica = ICA(noise_cov=cov, n_components=2, method=method, max_iter=1) - with pytest.warns(None): # ICA does not converge - ica.fit(raw, picks=picks[:10], start=start, stop=stop2) - _assert_ica_attributes(ica) - sources = ica.get_sources(epochs).get_data() - assert (ica.mixing_matrix_.shape == (2, 2)) - assert (ica.unmixing_matrix_.shape == (2, 2)) - assert (ica.pca_components_.shape == (10, 10)) - assert (sources.shape[1] == ica.n_components_) - - for exclude in [[], [0], np.array([1, 2, 3])]: - ica.exclude = exclude - ica.labels_ = {'foo': [0]} - ica.save(test_ica_fname) - ica_read = read_ica(test_ica_fname) - assert (list(ica.exclude) == ica_read.exclude) - assert_equal(ica.labels_, ica_read.labels_) - ica.apply(raw.copy(), **kwargs) - ica.exclude = [] - ica.apply(raw.copy(), exclude=[1], **kwargs) - assert (ica.exclude == []) - - ica.exclude = [0, 1] - ica.apply(raw.copy(), exclude=[1], **kwargs) - assert (ica.exclude == [0, 1]) - - ica_raw = ica.get_sources(raw) - assert (ica.exclude == [ica_raw.ch_names.index(e) for e in - ica_raw.info['bads']]) - - # test filtering - d1 = ica_raw._data[0].copy() - ica_raw.filter(4, 20, fir_design='firwin2') - assert_equal(ica_raw.info['lowpass'], 20.) - assert_equal(ica_raw.info['highpass'], 4.) - assert ((d1 != ica_raw._data[0]).any()) - d1 = ica_raw._data[0].copy() - ica_raw.notch_filter([10], trans_bandwidth=10, fir_design='firwin') - assert ((d1 != ica_raw._data[0]).any()) - - ica.n_pca_components = 2 - ica.method = 'fake' - ica.save(test_ica_fname) - ica_read = read_ica(test_ica_fname) - assert (ica.n_pca_components == ica_read.n_pca_components) - assert_equal(ica.method, ica_read.method) - assert_equal(ica.labels_, ica_read.labels_) + ica.n_pca_components = 2 + ica.method = 'fake' + ica.save(test_ica_fname) + ica_read = read_ica(test_ica_fname) + assert (ica.n_pca_components == ica_read.n_pca_components) + assert_equal(ica.method, ica_read.method) + assert_equal(ica.labels_, ica_read.labels_) - # check type consistency - attrs = ('mixing_matrix_ unmixing_matrix_ pca_components_ ' - 'pca_explained_variance_ pre_whitener_') + # check type consistency + attrs = ('mixing_matrix_ unmixing_matrix_ pca_components_ ' + 'pca_explained_variance_ pre_whitener_') - def f(x, y): - return getattr(x, y).dtype + def f(x, y): + return getattr(x, y).dtype - for attr in attrs.split(): - assert_equal(f(ica_read, attr), f(ica, attr)) + for attr in attrs.split(): + assert_equal(f(ica_read, attr), f(ica, attr)) - ica.n_pca_components = 4 - ica_read.n_pca_components = 4 + ica.n_pca_components = 4 + ica_read.n_pca_components = 4 - ica.exclude = [] - ica.save(test_ica_fname) - ica_read = read_ica(test_ica_fname) - for attr in ['mixing_matrix_', 'unmixing_matrix_', 'pca_components_', - 'pca_mean_', 'pca_explained_variance_', - 'pre_whitener_']: - assert_array_almost_equal(getattr(ica, attr), - getattr(ica_read, attr)) + ica.exclude = [] + ica.save(test_ica_fname) + ica_read = read_ica(test_ica_fname) + for attr in ['mixing_matrix_', 'unmixing_matrix_', 'pca_components_', + 'pca_mean_', 'pca_explained_variance_', + 'pre_whitener_']: + assert_array_almost_equal(getattr(ica, attr), getattr(ica_read, attr)) - assert (ica.ch_names == ica_read.ch_names) - assert (isinstance(ica_read.info, Info)) + assert (ica.ch_names == ica_read.ch_names) + assert (isinstance(ica_read.info, Info)) - sources = ica.get_sources(raw)[:, :][0] - sources2 = ica_read.get_sources(raw)[:, :][0] - assert_array_almost_equal(sources, sources2) + sources = ica.get_sources(raw)[:, :][0] + sources2 = ica_read.get_sources(raw)[:, :][0] + assert_array_almost_equal(sources, sources2) - _raw1 = ica.apply(raw.copy(), exclude=[1]) - _raw2 = ica_read.apply(raw.copy(), exclude=[1]) - assert_array_almost_equal(_raw1[:, :][0], _raw2[:, :][0]) + _raw1 = ica.apply(raw.copy(), exclude=[1]) + _raw2 = ica_read.apply(raw.copy(), exclude=[1]) + assert_array_almost_equal(_raw1[:, :][0], _raw2[:, :][0]) + + ica = ICA(n_components=2, method=method, max_iter=1) + with pytest.warns(None): # ICA does not converge + ica.fit(raw, picks=few_picks) - os.remove(test_ica_fname) # check score funcs for name, func in get_score_funcs().items(): if name in score_funcs_unsuited: @@ -892,8 +869,8 @@ def f(x, y): ica = ICA(method=method) with pytest.warns(None): # sometimes does not converge - ica.fit(raw, picks=picks[:5]) - _assert_ica_attributes(ica, raw.get_data(picks[:5])) + ica.fit(raw, picks=few_picks) + _assert_ica_attributes(ica, raw.get_data(few_picks)) with pytest.warns(RuntimeWarning, match='longer'): ica.find_bads_ecg(raw, threshold='auto') ica.find_bads_eog(epochs, ch_name='MEG 0121') @@ -908,6 +885,55 @@ def f(x, y): ica.find_bads_ecg(raw, threshold='auto') +@requires_sklearn +@pytest.mark.slowtest +@pytest.mark.parametrize('method, cov', [ + ('picard', None), + ('picard', test_cov_name), + ('fastica', None), +]) +def test_ica_cov(method, cov, tmpdir, short_raw_epochs): + """Test ICA with cov.""" + _skip_check_picard(method) + raw, epochs, epochs_eog = short_raw_epochs + if cov is not None: + cov = read_cov(cov) + + # test reading and writing + test_ica_fname = tmpdir.join('test-ica.fif') + kwargs = dict(n_pca_components=4) + + ica = ICA(noise_cov=cov, n_components=2, method=method, max_iter=1) + with pytest.warns(None): # ICA does not converge + ica.fit(raw, picks=np.arange(10)) + _assert_ica_attributes(ica) + sources = ica.get_sources(epochs).get_data() + assert (ica.mixing_matrix_.shape == (2, 2)) + assert (ica.unmixing_matrix_.shape == (2, 2)) + assert (ica.pca_components_.shape == (10, 10)) + assert (sources.shape[1] == ica.n_components_) + + for exclude in [[], [0], np.array([1, 2, 3])]: + ica.exclude = exclude + ica.labels_ = {'foo': [0]} + ica.save(test_ica_fname) + ica_read = read_ica(test_ica_fname) + assert (list(ica.exclude) == ica_read.exclude) + assert_equal(ica.labels_, ica_read.labels_) + ica.apply(raw.copy(), **kwargs) + ica.exclude = [] + ica.apply(raw.copy(), exclude=[1], **kwargs) + assert (ica.exclude == []) + + ica.exclude = [0, 1] + ica.apply(raw.copy(), exclude=[1], **kwargs) + assert (ica.exclude == [0, 1]) + + ica_raw = ica.get_sources(raw) + assert (ica.exclude == [ica_raw.ch_names.index(e) for e in + ica_raw.info['bads']]) + + @requires_sklearn @pytest.mark.parametrize("method", ("fastica", "picard", "infomax")) @pytest.mark.parametrize("idx", (None, -1, slice(2), [0, 1])) @@ -949,15 +975,22 @@ def test_ica_twice(method): """Test running ICA twice.""" _skip_check_picard(method) raw = read_raw_fif(raw_fname).crop(1.5, stop).load_data() + raw.pick(raw.ch_names[::10]) picks = pick_types(raw.info, meg='grad', exclude='bads') - n_components = 0.9 + n_components = 0.99 n_pca_components = 0.9999 + if method == 'fastica': + ctx = pytest.warns(None) # convergence, sometimes + else: + ctx = nullcontext() ica1 = ICA(n_components=n_components, method=method) - ica1.fit(raw, picks=picks, decim=3) + with ctx: + ica1.fit(raw, picks=picks, decim=3) raw_new = ica1.apply(raw, n_pca_components=n_pca_components) ica2 = ICA(n_components=n_components, method=method) - ica2.fit(raw_new, picks=picks, decim=3) + with ctx: + ica2.fit(raw_new, picks=picks, decim=3) assert_equal(ica1.n_components_, ica2.n_components_) @@ -1167,6 +1200,9 @@ def test_ica_ctf(): """Test run ICA computation on ctf data with/without compensation.""" method = 'fastica' raw = read_raw_ctf(ctf_fname, preload=True) + picks = sorted(set(range(0, len(raw.ch_names), 10)) | + set(pick_types(raw.info, ref_meg=True))) + raw.pick(picks) events = make_fixed_length_events(raw, 99999) for comp in [0, 1]: raw.apply_gradient_compensation(comp) @@ -1208,9 +1244,13 @@ def test_ica_labels(): # The CTF data are uniquely well suited to testing the ICA.find_bads_ # methods raw = read_raw_ctf(ctf_fname, preload=True) + raw.pick_channels(raw.ch_names[:300:10] + raw.ch_names[300:]) # set the appropriate EEG channels to EOG and ECG - raw.set_channel_types({'EEG057': 'eog', 'EEG058': 'eog', 'EEG059': 'ecg'}) + rename = {'EEG057': 'eog', 'EEG058': 'eog', 'EEG059': 'ecg'} + for key in rename: + assert key in raw.ch_names + raw.set_channel_types(rename) ica = ICA(n_components=4, max_iter=2, method='fastica', allow_ref_meg=True) with pytest.warns(UserWarning, match='did not converge'): ica.fit(raw) @@ -1262,64 +1302,52 @@ def test_ica_labels(): @requires_sklearn @testing.requires_testing_data -def test_ica_eeg(): +@pytest.mark.parametrize('fname, grade', [ + (fif_fname, None), + (eeglab_fname, None), + (ctf_fname2, 0), + (ctf_fname2, 1), +]) +def test_ica_eeg(fname, grade): """Test ICA on EEG.""" method = 'fastica' - raw_fif = read_raw_fif(fif_fname, preload=True) - raw_eeglab = read_raw_eeglab(input_fname=eeglab_fname, - preload=True) - for raw in [raw_fif, raw_eeglab]: - events = make_fixed_length_events(raw, 99999, start=0, stop=0.3, - duration=0.1) - picks_meg = pick_types(raw.info, meg=True, eeg=False)[:2] - picks_eeg = pick_types(raw.info, meg=False, eeg=True)[:2] - picks_all = [] - picks_all.extend(picks_meg) - picks_all.extend(picks_eeg) - epochs = Epochs(raw, events, None, -0.1, 0.1, preload=True, proj=False) - evoked = epochs.average() + if fname.endswith('.fif'): + raw = read_raw_fif(fif_fname) + raw.pick(raw.ch_names[::5]).load_data() + raw.info.normalize_proj() + elif fname.endswith('.set'): + raw = read_raw_eeglab(input_fname=eeglab_fname, preload=True) + else: + with pytest.warns(RuntimeWarning, match='MISC channel'): + raw = read_raw_ctf(ctf_fname2) + raw.pick(raw.ch_names[:30] + raw.ch_names[30::10]).load_data() + if grade is not None: + raw.apply_gradient_compensation(grade) - for picks in [picks_meg, picks_eeg, picks_all]: - if len(picks) == 0: - continue - # test fit - for inst in [raw, epochs]: - ica = ICA(n_components=2, max_iter=2, method=method) - with pytest.warns(None): - ica.fit(inst, picks=picks, verbose=True) - _assert_ica_attributes(ica) - - # test apply and get_sources - for inst in [raw, epochs, evoked]: - ica.apply(inst) - ica.get_sources(inst) - - with pytest.warns(RuntimeWarning, match='MISC channel'): - raw = read_raw_ctf(ctf_fname2, preload=True) - events = make_fixed_length_events(raw, 99999, start=0, stop=0.2, + events = make_fixed_length_events(raw, 99999, start=0, stop=0.3, duration=0.1) - picks_meg = pick_types(raw.info, meg=True, eeg=False)[:2] + picks_meg = pick_types(raw.info, meg=True, eeg=False, ref_meg=False)[:2] picks_eeg = pick_types(raw.info, meg=False, eeg=True)[:2] - picks_all = picks_meg + picks_eeg - for comp in [0, 1]: - raw.apply_gradient_compensation(comp) - epochs = Epochs(raw, events, None, -0.1, 0.1, preload=True) - evoked = epochs.average() + picks_all = [] + picks_all.extend(picks_meg) + picks_all.extend(picks_eeg) + epochs = Epochs(raw, events, None, -0.1, 0.1, preload=True, proj=False) + evoked = epochs.average() - for picks in [picks_meg, picks_eeg, picks_all]: - if len(picks) == 0: - continue - # test fit - for inst in [raw, epochs]: - ica = ICA(n_components=2, max_iter=2, method=method) - with pytest.warns(None): - ica.fit(inst) - _assert_ica_attributes(ica) - - # test apply and get_sources - for inst in [raw, epochs, evoked]: - ica.apply(inst) - ica.get_sources(inst) + for picks in [picks_meg, picks_eeg, picks_all]: + if len(picks) == 0: + continue + # test fit + for inst in [raw, epochs]: + ica = ICA(n_components=2, max_iter=2, method=method) + with pytest.warns(None): + ica.fit(inst, picks=picks, verbose=True) + _assert_ica_attributes(ica) + + # test apply and get_sources + for inst in [raw, epochs, evoked]: + ica.apply(inst) + ica.get_sources(inst) @testing.requires_testing_data diff --git a/mne/preprocessing/tests/test_ssp.py b/mne/preprocessing/tests/test_ssp.py index 858e800e267..b16f5190a16 100644 --- a/mne/preprocessing/tests/test_ssp.py +++ b/mne/preprocessing/tests/test_ssp.py @@ -7,7 +7,6 @@ from mne.io import read_raw_fif, read_raw_ctf from mne.io.proj import make_projector, activate_proj from mne.preprocessing.ssp import compute_proj_ecg, compute_proj_eog -from mne.utils import run_tests_if_main from mne.datasets import testing from mne import pick_types @@ -20,98 +19,107 @@ 'testdata_ctf.ds') -def test_compute_proj_ecg(): +@pytest.fixture() +def short_raw(): + """Create a short, picked raw instance.""" + raw = read_raw_fif(raw_fname).crop(0, 7).pick_types( + meg=True, eeg=True, eog=True) + raw.pick(raw.ch_names[:306:10] + raw.ch_names[306:]).load_data() + raw.info.normalize_proj() + return raw + + +@pytest.mark.parametrize('average', (True, False)) +def test_compute_proj_ecg(short_raw, average): """Test computation of ECG SSP projectors.""" - raw = read_raw_fif(raw_fname).crop(0, 10) - raw.load_data() - for average in [False, True]: - # For speed, let's not filter here (must also not reject then) + raw = short_raw + + # For speed, let's not filter here (must also not reject then) + with pytest.warns(RuntimeWarning, match='Attenuation'): projs, events = compute_proj_ecg( raw, n_mag=2, n_grad=2, n_eeg=2, ch_name='MEG 1531', bads=['MEG 2443'], average=average, avg_ref=True, no_proj=True, l_freq=None, h_freq=None, reject=None, tmax=dur_use, - qrs_threshold=0.5, filter_length=6000) - assert len(projs) == 7 - # heart rate at least 0.5 Hz, but less than 3 Hz - assert (events.shape[0] > 0.5 * dur_use and - events.shape[0] < 3 * dur_use) - ssp_ecg = [proj for proj in projs if proj['desc'].startswith('ECG')] - # check that the first principal component have a certain minimum - ssp_ecg = [proj for proj in ssp_ecg if 'PCA-01' in proj['desc']] - thresh_eeg, thresh_axial, thresh_planar = .9, .3, .1 - for proj in ssp_ecg: - if 'planar' in proj['desc']: - assert proj['explained_var'] > thresh_planar - elif 'axial' in proj['desc']: - assert proj['explained_var'] > thresh_axial - elif 'eeg' in proj['desc']: - assert proj['explained_var'] > thresh_eeg - # XXX: better tests - - # without setting a bad channel, this should throw a warning - with pytest.warns(RuntimeWarning, match='No good epochs found'): - projs, events, drop_log = compute_proj_ecg( - raw, n_mag=2, n_grad=2, n_eeg=2, ch_name='MEG 1531', bads=[], - average=average, avg_ref=True, no_proj=True, l_freq=None, - h_freq=None, tmax=dur_use, return_drop_log=True) - assert projs is None - assert len(events) == len(drop_log) - - -def test_compute_proj_eog(): + qrs_threshold=0.5, filter_length=1000) + assert len(projs) == 7 + # heart rate at least 0.5 Hz, but less than 3 Hz + assert (events.shape[0] > 0.5 * dur_use and + events.shape[0] < 3 * dur_use) + ssp_ecg = [proj for proj in projs if proj['desc'].startswith('ECG')] + # check that the first principal component have a certain minimum + ssp_ecg = [proj for proj in ssp_ecg if 'PCA-01' in proj['desc']] + thresh_eeg, thresh_axial, thresh_planar = .9, .3, .1 + for proj in ssp_ecg: + if 'planar' in proj['desc']: + assert proj['explained_var'] > thresh_planar + elif 'axial' in proj['desc']: + assert proj['explained_var'] > thresh_axial + elif 'eeg' in proj['desc']: + assert proj['explained_var'] > thresh_eeg + # XXX: better tests + + # without setting a bad channel, this should throw a warning + with pytest.warns(RuntimeWarning, match='No good epochs found'): + projs, events, drop_log = compute_proj_ecg( + raw, n_mag=2, n_grad=2, n_eeg=2, ch_name='MEG 1531', bads=[], + average=average, avg_ref=True, no_proj=True, l_freq=None, + h_freq=None, tmax=dur_use, return_drop_log=True) + assert projs is None + assert len(events) == len(drop_log) + + +@pytest.mark.parametrize('average', [True, False]) +def test_compute_proj_eog(average, short_raw): """Test computation of EOG SSP projectors.""" - raw = read_raw_fif(raw_fname).crop(0, 10) - raw.load_data() - for average in [False, True]: - n_projs_init = len(raw.info['projs']) - projs, events = compute_proj_eog(raw, n_mag=2, n_grad=2, n_eeg=2, - bads=['MEG 2443'], average=average, - avg_ref=True, no_proj=False, - l_freq=None, h_freq=None, - reject=None, tmax=dur_use, - filter_length=6000) - assert (len(projs) == (7 + n_projs_init)) - assert (np.abs(events.shape[0] - - np.sum(np.less(eog_times, dur_use))) <= 1) - ssp_eog = [proj for proj in projs if proj['desc'].startswith('EOG')] - # check that the first principal component have a certain minimum - ssp_eog = [proj for proj in ssp_eog if 'PCA-01' in proj['desc']] - thresh_eeg, thresh_axial, thresh_planar = .9, .3, .1 - for proj in ssp_eog: - if 'planar' in proj['desc']: - assert (proj['explained_var'] > thresh_planar) - elif 'axial' in proj['desc']: - assert (proj['explained_var'] > thresh_axial) - elif 'eeg' in proj['desc']: - assert (proj['explained_var'] > thresh_eeg) - # XXX: better tests - - with pytest.warns(RuntimeWarning, match='longer'): - projs, events = compute_proj_eog(raw, n_mag=2, n_grad=2, n_eeg=2, - average=average, bads=[], - avg_ref=True, no_proj=False, - l_freq=None, h_freq=None, - tmax=dur_use) - assert projs is None + raw = short_raw + + n_projs_init = len(raw.info['projs']) + with pytest.warns(RuntimeWarning, match='Attenuation'): + projs, events = compute_proj_eog( + raw, n_mag=2, n_grad=2, n_eeg=2, bads=['MEG 2443'], + average=average, avg_ref=True, no_proj=False, l_freq=None, + h_freq=None, reject=None, tmax=dur_use, filter_length=1000) + assert (len(projs) == (7 + n_projs_init)) + assert (np.abs(events.shape[0] - + np.sum(np.less(eog_times, dur_use))) <= 1) + ssp_eog = [proj for proj in projs if proj['desc'].startswith('EOG')] + # check that the first principal component have a certain minimum + ssp_eog = [proj for proj in ssp_eog if 'PCA-01' in proj['desc']] + thresh_eeg, thresh_axial, thresh_planar = .9, .3, .1 + for proj in ssp_eog: + if 'planar' in proj['desc']: + assert (proj['explained_var'] > thresh_planar) + elif 'axial' in proj['desc']: + assert (proj['explained_var'] > thresh_axial) + elif 'eeg' in proj['desc']: + assert (proj['explained_var'] > thresh_eeg) + # XXX: better tests + + with pytest.warns(RuntimeWarning, match='longer'): + projs, events = compute_proj_eog( + raw, n_mag=2, n_grad=2, n_eeg=2, average=average, bads=[], + avg_ref=True, no_proj=False, l_freq=None, h_freq=None, + tmax=dur_use) + assert projs is None @pytest.mark.slowtest # can be slow on OSX -def test_compute_proj_parallel(): +def test_compute_proj_parallel(short_raw): """Test computation of ExG projectors using parallelization.""" - raw_0 = read_raw_fif(raw_fname).crop(0, 10) - raw_0.load_data() - raw = raw_0.copy() - projs, _ = compute_proj_eog(raw, n_mag=2, n_grad=2, n_eeg=2, - bads=['MEG 2443'], average=False, - avg_ref=True, no_proj=False, n_jobs=1, - l_freq=None, h_freq=None, reject=None, - tmax=dur_use, filter_length=6000) - raw_2 = raw_0.copy() - projs_2, _ = compute_proj_eog(raw_2, n_mag=2, n_grad=2, n_eeg=2, - bads=['MEG 2443'], average=False, - avg_ref=True, no_proj=False, n_jobs=2, - l_freq=None, h_freq=None, reject=None, - tmax=dur_use, filter_length=6000) + short_raw = short_raw.copy().pick(('eeg', 'eog')).resample(100) + raw = short_raw.copy() + with pytest.warns(RuntimeWarning, match='Attenuation'): + projs, _ = compute_proj_eog( + raw, n_eeg=2, bads=raw.ch_names[1:2], average=False, + avg_ref=True, no_proj=False, n_jobs=1, l_freq=None, h_freq=None, + reject=None, tmax=dur_use, filter_length=100) + raw_2 = short_raw.copy() + with pytest.warns(RuntimeWarning, match='Attenuation'): + projs_2, _ = compute_proj_eog( + raw_2, n_eeg=2, bads=raw.ch_names[1:2], + average=False, avg_ref=True, no_proj=False, n_jobs=2, + l_freq=None, h_freq=None, reject=None, tmax=dur_use, + filter_length=100) projs = activate_proj(projs) projs_2 = activate_proj(projs_2) projs, _, _ = make_projector(projs, raw_2.info['ch_names'], @@ -122,6 +130,7 @@ def test_compute_proj_parallel(): def _check_projs_for_expected_channels(projs, n_mags, n_grads, n_eegs): + assert projs is not None for p in projs: if 'planar' in p['desc']: assert len(p['data']['col_names']) == n_grads @@ -135,41 +144,39 @@ def _check_projs_for_expected_channels(projs, n_mags, n_grads, n_eegs): @testing.requires_testing_data def test_compute_proj_ctf(): """Test to show that projector code completes on CTF data.""" - raw = read_raw_ctf(ctf_fname) - raw.load_data() + raw = read_raw_ctf(ctf_fname, preload=True) # expected channels per projector type - n_mags = len(pick_types(raw.info, meg='mag', ref_meg=False, - exclude='bads')) - n_grads = len(pick_types(raw.info, meg='grad', ref_meg=False, - exclude='bads')) - n_eegs = len(pick_types(raw.info, meg=False, eeg=True, ref_meg=False, - exclude='bads')) + mag_picks = pick_types( + raw.info, meg='mag', ref_meg=False, exclude='bads')[::10] + n_mags = len(mag_picks) + grad_picks = pick_types(raw.info, meg='grad', ref_meg=False, + exclude='bads')[::10] + n_grads = len(grad_picks) + eeg_picks = pick_types(raw.info, meg=False, eeg=True, ref_meg=False, + exclude='bads')[2::3] + n_eegs = len(eeg_picks) + ref_picks = pick_types(raw.info, meg=False, ref_meg=True) + raw.pick(np.sort(np.concatenate( + [mag_picks, grad_picks, eeg_picks, ref_picks]))) + del mag_picks, grad_picks, eeg_picks, ref_picks # Test with and without gradient compensation - for c in [0, 1]: - raw.apply_gradient_compensation(c) - for average in [False, True]: - n_projs_init = len(raw.info['projs']) - projs, events = compute_proj_eog(raw, n_mag=2, n_grad=2, n_eeg=2, - average=average, - ch_name='EEG059', - avg_ref=True, no_proj=False, - l_freq=None, h_freq=None, - reject=None, tmax=dur_use, - filter_length=6000) - _check_projs_for_expected_channels(projs, n_mags, n_grads, n_eegs) - assert len(projs) == (5 + n_projs_init) - - projs, events = compute_proj_ecg(raw, n_mag=1, n_grad=1, n_eeg=2, - average=average, - ch_name='EEG059', - avg_ref=True, no_proj=False, - l_freq=None, h_freq=None, - reject=None, tmax=dur_use, - filter_length=6000) - _check_projs_for_expected_channels(projs, n_mags, n_grads, n_eegs) - assert len(projs) == (4 + n_projs_init) - - -run_tests_if_main() + raw.apply_gradient_compensation(0) + n_projs_init = len(raw.info['projs']) + with pytest.warns(RuntimeWarning, match='Attenuation'): + projs, _ = compute_proj_eog( + raw, n_mag=2, n_grad=2, n_eeg=2, average=True, ch_name='EEG059', + avg_ref=True, no_proj=False, l_freq=None, h_freq=None, + reject=None, tmax=dur_use, filter_length=1000) + _check_projs_for_expected_channels(projs, n_mags, n_grads, n_eegs) + assert len(projs) == (5 + n_projs_init) + + raw.apply_gradient_compensation(1) + with pytest.warns(RuntimeWarning, match='Attenuation'): + projs, _ = compute_proj_ecg( + raw, n_mag=1, n_grad=1, n_eeg=2, average=True, ch_name='EEG059', + avg_ref=True, no_proj=False, l_freq=None, h_freq=None, + reject=None, tmax=dur_use, filter_length=1000) + _check_projs_for_expected_channels(projs, n_mags, n_grads, n_eegs) + assert len(projs) == (4 + n_projs_init) diff --git a/mne/tests/test_epochs.py b/mne/tests/test_epochs.py index bece65c669d..c1686db5c25 100644 --- a/mne/tests/test_epochs.py +++ b/mne/tests/test_epochs.py @@ -310,9 +310,16 @@ def _assert_drop_log_types(drop_log): def test_reject(): """Test epochs rejection.""" - raw, events, picks = _get_data() + raw, events, _ = _get_data() + names = raw.ch_names[::5] + assert 'MEG 2443' in names + raw.pick(names).load_data() + assert 'eog' in raw + raw.info.normalize_proj() + picks = np.arange(len(raw.ch_names)) # cull the list just to contain the relevant event events = events[events[:, 2] == event_id, :] + assert len(events) == 7 selection = np.arange(3) drop_log = ((),) * 3 + (('MEG 2443',),) * 4 _assert_drop_log_types(drop_log)