-
Notifications
You must be signed in to change notification settings - Fork 51
/
Copy pathmri.py
456 lines (384 loc) · 18.8 KB
/
mri.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
"""Deals with MRI BIDS datasets and register them into the database."""
import getpass
import json
import os
import re
import sys
import lib.exitcode
import lib.utilities as utilities
from lib.candidate import Candidate
from lib.imaging import Imaging
from lib.scanstsv import ScansTSV
from lib.session import Session
__license__ = "GPLv3"
class Mri:
"""
This class reads the BIDS MRI data structure and registers the MRI datasets into the
database by calling lib.imaging class.
:Example:
from lib.bidsreader import BidsReader
from lib.mri import Mri
from lib.database import Database
# database connection
db = Database(config_file.mysql, verbose)
db.connect()
# grep config settings from the Config module
config_obj = Config(db, verbose)
default_bids_vl = config_obj.get_config('default_bids_vl')
data_dir = config_obj.get_config('dataDirBasepath')
# load the BIDS directory
bids_reader = BidsReader(bids_dir)
# create the LORIS_BIDS directory in data_dir based on Name and BIDS version
loris_bids_root_dir = create_loris_bids_directory(
bids_reader, data_dir, verbose
)
for row in bids_reader.cand_session_modalities_list:
for modality in row['modalities']:
if modality in ['anat', 'dwi', 'fmap', 'func']:
bids_session = row['bids_ses_id']
visit_label = bids_session if bids_session else default_bids_vl
loris_bids_mri_rel_dir = "sub-" + row['bids_sub_id'] + "/" + \
"ses-" + visit_label + "/mri/"
lib.utilities.create_dir(
loris_bids_root_dir + loris_bids_mri_rel_dir, verbose
)
Eeg(
bids_reader = bids_reader,
bids_sub_id = row['bids_sub_id'],
bids_ses_id = row['bids_ses_id'],
bids_modality = modality,
db = db,
verbose = verbose,
data_dir = data_dir,
default_visit_label = default_bids_vl,
loris_bids_eeg_rel_dir = loris_bids_mri_rel_dir,
loris_bids_root_dir = loris_bids_root_dir
)
# disconnect from the database
db.disconnect()
"""
def __init__(self, bids_reader, bids_sub_id, bids_ses_id, bids_modality, db,
verbose, data_dir, default_visit_label,
loris_bids_mri_rel_dir, loris_bids_root_dir):
# enumerate the different suffixes supported by BIDS per modality type
self.possible_suffix_per_modality = {
'anat' : [
'T1w', 'T2w', 'T1rho', 'T1map', 'T2map', 'T2star', 'FLAIR',
'FLASH', 'PD', 'PDmap', 'PDT2', 'inplaneT1', 'inplaneT2', 'angio'
],
'func' : [
'bold', 'cbv', 'phase'
],
'dwi' : [
'dwi', 'sbref'
],
'fmap' : [
'phasediff', 'magnitude1', 'magnitude2', 'phase1', 'phase2',
'fieldmap', 'epi'
]
}
# load bids objects
self.bids_reader = bids_reader
self.bids_layout = bids_reader.bids_layout
# load the LORIS BIDS import root directory where the files will be copied
self.loris_bids_mri_rel_dir = loris_bids_mri_rel_dir
self.loris_bids_root_dir = loris_bids_root_dir
self.data_dir = data_dir
# load BIDS subject, visit and modality
self.bids_sub_id = bids_sub_id
self.bids_ses_id = bids_ses_id
self.bids_modality = bids_modality
# load database handler object and verbose bool
self.db = db
self.verbose = verbose
# find corresponding CandID and SessionID in LORIS
self.loris_cand_info = self.get_loris_cand_info()
self.default_vl = default_visit_label
self.psc_id = self.loris_cand_info['PSCID']
self.cand_id = self.loris_cand_info['CandID']
self.center_id = self.loris_cand_info['RegistrationCenterID']
self.project_id = self.loris_cand_info['RegistrationProjectID']
self.cohort_id = None
for row in bids_reader.participants_info:
if not row['participant_id'] == self.psc_id:
continue
if 'cohort' in row:
cohort_info = db.pselect(
"SELECT CohortID FROM cohort WHERE title = %s",
[row['cohort'], ]
)
if len(cohort_info) > 0:
self.cohort_id = cohort_info[0]['CohortID']
break
self.session_id = self.get_loris_session_id()
# grep all the NIfTI files for the modality
self.nifti_files = self.grep_nifti_files()
# check if a tsv with acquisition dates or age is available for the subject
self.scans_file = None
if self.bids_layout.get(suffix='scans', subject=self.psc_id, return_type='filename'):
self.scans_file = self.bids_layout.get(suffix='scans', subject=self.psc_id,
return_type='filename', extension='tsv')[0]
# loop through NIfTI files and register them in the DB
for nifti_file in self.nifti_files:
self.register_raw_file(nifti_file)
def get_loris_cand_info(self):
"""
Gets the LORIS Candidate info for the BIDS subject.
:return: Candidate info of the subject found in the database
:rtype: list
"""
candidate = Candidate(verbose=self.verbose, psc_id=self.bids_sub_id)
loris_cand_info = candidate.get_candidate_info_from_loris(self.db)
return loris_cand_info
def get_loris_session_id(self):
"""
Greps the LORIS session.ID corresponding to the BIDS visit. Note,
if no BIDS visit are set, will use the default visit label value set
in the config module
:return: the session's ID in LORIS
:rtype: int
"""
# check if there are any visit label in BIDS structure, if not,
# will use the default visit label set in the config module
visit_label = self.bids_ses_id if self.bids_ses_id else self.default_vl
session = Session(
self.db, self.verbose, self.cand_id, visit_label,
self.center_id, self.project_id, self.cohort_id
)
loris_vl_info = session.get_session_info_from_loris()
if not loris_vl_info:
message = "ERROR: visit label " + visit_label + "does not exist in " + \
"the session table for candidate " + self.cand_id + \
"\nPlease make sure the visit label is created in the " + \
"database or run bids_import.py with the -s option -s if " + \
"you wish that the insertion pipeline creates the visit " + \
"label in the session table."
print(message)
exit(lib.exitcode.SELECT_FAILURE)
return loris_vl_info['ID']
def grep_nifti_files(self):
"""
Returns the list of NIfTI files found for the modality.
:return: list of NIfTI files found for the modality
:rtype: list
"""
# grep all the possible suffixes for the modality
modality_possible_suffix = self.possible_suffix_per_modality[self.bids_modality]
# loop through the possible suffixes and grep the NIfTI files
nii_files_list = []
for suffix in modality_possible_suffix:
nii_files_list.extend(self.grep_bids_files(suffix, 'nii.gz'))
# return the list of found NIfTI files
return nii_files_list
def grep_bids_files(self, bids_type, extension):
"""
Greps the BIDS files and their layout information from the BIDSLayout
and return that list.
:param bids_type: the BIDS type to use to grep files (T1w, T2w, bold, dwi...)
:type bids_type: str
:param extension: extension of the file to look for (nii.gz, json...)
:type extension: str
:return: list of files from the BIDS layout
:rtype: list
"""
if self.bids_ses_id:
return self.bids_layout.get(
subject = self.bids_sub_id,
session = self.bids_ses_id,
datatype = self.bids_modality,
extension = extension,
suffix = bids_type
)
else:
return self.bids_layout.get(
subject = self.bids_sub_id,
datatype = self.bids_modality,
extension = extension,
suffix = bids_type
)
def register_raw_file(self, nifti_file):
"""
Registers raw MRI files and related files into the files and parameter_file tables.
:param nifti_file: NIfTI file object
:type nifti_file: pybids NIfTI file object
"""
# insert the NIfTI file
self.fetch_and_insert_nifti_file(nifti_file)
def fetch_and_insert_nifti_file(self, nifti_file, derivatives=None):
"""
Gather NIfTI file information to insert into the files and parameter_file tables.
Once all the information has been gathered, it will call imaging.insert_imaging_file
that will perform the insertion into the files and parameter_file tables.
:param nifti_file : NIfTI file object
:type nifti_file : pybids NIfTI file object
:param derivatives: whether the file to be registered is a derivative file
:type derivatives: bool
:return: dictionary with the inserted file_id and file_path
:rtype: dict
"""
# load the Imaging object that will be used to insert the imaging data into the database
imaging = Imaging(self.db, self.verbose)
# load the list of associated files with the NIfTI file
associated_files = nifti_file.get_associations()
# load the entity information from the NIfTI file
entities = nifti_file.get_entities()
scan_type = entities['suffix']
# loop through the associated files to grep JSON, bval, bvec...
json_file = None
other_assoc_files = {}
for assoc_file in associated_files:
file_info = assoc_file.get_entities()
if re.search(r'json$', file_info['extension']):
json_file = assoc_file.path
elif re.search(r'bvec$', file_info['extension']):
other_assoc_files['bvec_file'] = assoc_file.path
elif re.search(r'bval$', file_info['extension']):
other_assoc_files['bval_file'] = assoc_file.path
elif re.search(r'tsv$', file_info['extension']) and file_info['suffix'] == 'events':
other_assoc_files['task_file'] = assoc_file.path
elif re.search(r'tsv$', file_info['extension']) and file_info['suffix'] == 'physio':
other_assoc_files['physio_file'] = assoc_file.path
# read the json file if it exists
file_parameters = {}
if json_file:
with open(json_file) as data_file:
file_parameters = json.load(data_file)
file_parameters = imaging.map_bids_param_to_loris_param(file_parameters)
# copy the JSON file to the LORIS BIDS import directory
json_path = self.copy_file_to_loris_bids_dir(json_file)
file_parameters['bids_json_file'] = json_path
json_blake2 = utilities.compute_blake2b_hash(json_file)
file_parameters['bids_json_file_blake2b_hash'] = json_blake2
# grep the file type from the ImagingFileTypes table
file_type = imaging.determine_file_type(nifti_file.filename)
if not file_type:
message = "\nERROR: File type for " + nifti_file.filename \
+ " does not exist in ImagingFileTypes database table\n"
print(message)
sys.exit(lib.exitcode.SELECT_FAILURE)
# determine the output type
output_type = 'derivatives' if derivatives else 'native'
if not derivatives:
coordinate_space = 'native'
# get the acquisition date of the MRI or the age at the time of acquisition
if self.scans_file:
scan_info = ScansTSV(self.scans_file, nifti_file.filename, self.verbose)
file_parameters['scan_acquisition_time'] = scan_info.get_acquisition_time()
file_parameters['age_at_scan'] = scan_info.get_age_at_scan()
# copy the scans.tsv file to the LORIS BIDS import directory
scans_path = scan_info.copy_scans_tsv_file_to_loris_bids_dir(
self.bids_sub_id, self.loris_bids_root_dir, self.data_dir
)
file_parameters['scans_tsv_file'] = scans_path
scans_blake2 = utilities.compute_blake2b_hash(self.scans_file)
file_parameters['scans_tsv_file_bake2hash'] = scans_blake2
# grep voxel step from the NIfTI file header
step_parameters = imaging.get_nifti_image_step_parameters(nifti_file.path)
file_parameters['xstep'] = step_parameters[0]
file_parameters['ystep'] = step_parameters[1]
file_parameters['zstep'] = step_parameters[2]
# grep the time length from the NIfTI file header
is_4d_dataset = False
length_parameters = imaging.get_nifti_image_length_parameters(nifti_file.path)
if len(length_parameters) == 4:
file_parameters['time'] = length_parameters[3]
is_4d_dataset = True
# add all other associated files to the file_parameters so they get inserted
# in parameter_file
for type in other_assoc_files:
original_file_path = other_assoc_files[type]
copied_path = self.copy_file_to_loris_bids_dir(original_file_path)
file_param_name = 'bids_' + type
file_parameters[file_param_name] = copied_path
file_blake2 = utilities.compute_blake2b_hash(original_file_path)
hash_param_name = file_param_name + '_blake2b_hash'
file_parameters[hash_param_name] = file_blake2
# append the blake2b to the MRI file parameters dictionary
blake2 = utilities.compute_blake2b_hash(nifti_file.path)
file_parameters['file_blake2b_hash'] = blake2
# check that the file is not already inserted before inserting it
result = imaging.grep_file_info_from_hash(blake2)
file_id = result['FileID'] if result else None
file_path = result['File'] if result else None
if not file_id:
# grep the scan type ID from the mri_scan_type table (if it is not already in
# the table, it will add a row to the mri_scan_type table)
scan_type_id = self.db.grep_id_from_lookup_table(
id_field_name = 'MriScanTypeID',
table_name = 'mri_scan_type',
where_field_name = 'MriScanTypeName',
where_value = scan_type,
insert_if_not_found = True
)
# copy the NIfTI file to the LORIS BIDS import directory
file_path = self.copy_file_to_loris_bids_dir(nifti_file.path)
# insert the file along with its information into files and parameter_file tables
echo_time = file_parameters['EchoTime'] if 'EchoTime' in file_parameters.keys() else None
echo_nb = file_parameters['EchoNumber'] if 'EchoNumber' in file_parameters.keys() else None
phase_enc_dir = file_parameters['PhaseEncodingDirection'] \
if 'PhaseEncodingDirection' in file_parameters.keys() else None
file_info = {
'FileType' : file_type,
'File' : file_path,
'SessionID' : self.session_id,
'InsertedByUserID': getpass.getuser(),
'CoordinateSpace' : coordinate_space,
'OutputType' : output_type,
'EchoTime' : echo_time,
'PhaseEncodingDirection': phase_enc_dir,
'EchoNumber' : echo_nb,
'SourceFileID' : None,
'MriScanTypeID' : scan_type_id
}
file_id = imaging.insert_imaging_file(file_info, file_parameters)
# create the pic associated with the file
pic_rel_path = imaging.create_imaging_pic(
{
'cand_id' : self.cand_id,
'data_dir_path': self.data_dir,
'file_rel_path': file_path,
'is_4D_dataset': is_4d_dataset,
'file_id' : file_id
}
)
if os.path.exists(os.path.join(self.data_dir, 'pic/', pic_rel_path)):
imaging.insert_parameter_file(file_id, 'check_pic_filename', pic_rel_path)
return {'file_id': file_id, 'file_path': file_path}
def copy_file_to_loris_bids_dir(self, file, derivatives_path=None):
"""
Wrapper around the utilities.copy_file function that copies the file
to the LORIS BIDS import directory and returns the relative path of the
file (without the data_dir part).
:param file: full path to the original file
:type file: str
:param derivatives_path: path to the derivative folder
:type derivatives_path: str
:return: relative path to the copied file
:rtype: str
"""
# determine the path of the copied file
copy_file = self.loris_bids_mri_rel_dir
if self.bids_ses_id:
copy_file += os.path.basename(file)
else:
# make sure the ses- is included in the new filename if using
# default visit label from the LORIS config
copy_file += str.replace(
os.path.basename(file),
"sub-" + self.bids_sub_id,
"sub-" + self.bids_sub_id + "_ses-" + self.default_vl
)
if derivatives_path:
# create derivative subject/vl/modality directory
lib.utilities.create_dir(
derivatives_path + self.loris_bids_mri_rel_dir,
self.verbose
)
copy_file = derivatives_path + copy_file
else:
copy_file = self.loris_bids_root_dir + copy_file
# copy the file
utilities.copy_file(file, copy_file, self.verbose)
# determine the relative path and return it
relative_path = copy_file.replace(self.data_dir, "")
return relative_path