-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwfdbtools.py
575 lines (484 loc) · 19.1 KB
/
wfdbtools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
#!/usr/bin/env python
##########################################################
# A python module to access data from WFDB
# Copyright (C) 2009, 2010 Raja Selvaraj <rajajs@gmail.com>
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
##########################################################
"""
A pure python module for accessing and using the waveform data in `Physiobank <http://www.physionet.org/physiobank/>`_.
Provides `rdsamp` and `rdann` which are the python equivalents of the wfdb applications
of similar names.
A deliberate attempt is made to try to keep names and usage similar to the
original wfdb applications for simplicity of use.
The only dependency that will need to be installed is numpy. However, to use the function
`plot_data`, which is an utility function for interactive use, you need to have matplotlib
also installed.
Example Usage::
>> from wfdbtools import rdsamp, rdann, plot_data
>> from pprint import pprint
# Record is a format 212 record from physiobank.
# Note that name of record does not include extension.
>> record = 'samples/format212/100'
# Read in the data from 0 to 10 seconds
>> data, info = rdsamp(record, 0, 10)
# returned data is an array. The columns are time(samples),
# time(seconds), signal1, signal2
>> print data.shape
(3600, 4)
# info is a dictionary containing header information
>> pprint info
{'first_values': [995, 1011],
'gains': [200, 200],
'samp_count': 650000,
'samp_freq': 360,
'signal_names': ['MLII', 'V5'],
'zero_values': [1024, 1024]}
# And now read the annotation
>> ann = rdann(record, 'atr', 0, 10)
# ann has 3 columns - sample number, time (sec) and annotation code
>> print(ann[:4,:])
array([[ 18. , 0.05 , 28. ],
[ 77. , 0.214, 1. ],
[ 370. , 1.028, 1. ],
[ 662. , 1.839, 1. ]])
# Plot the data and the mark the annotations
>> plot_data(data, info, ann)
"""
# rdsamp based on rddata.m for matlab written by Robert Tratnig
# available at http://physionet.org/physiotools/matlab/rddata.m
from __future__ import division
import re
import warnings
import numpy
import datetime as dt
#import pylab
#from pprint import pprint
__author__ = 'Raja Selvaraj <rajajs@gmail.com>'
__version__ = '0.2'
## Annotation codes
CODEDICT = {
0 : 'NOTQRS', # not-QRS (not a getann/putann codedict) */
1 : 'NORMAL', # normal beat */
2 : 'LBBB', # left bundle branch block beat */
3 : 'RBBB', # right bundle branch block beat */
4 : 'ABERR', # aberrated atrial premature beat */
5 : 'PVC', # premature ventricular contraction */
6 : 'FUSION', # fusion of ventricular and normal beat */
7 : 'NPC', # nodal (junctional) premature beat */
8 : 'APC', # atrial premature contraction */
9 : 'SVPB', # premature or ectopic supraventricular beat */
10 : 'VESC', # ventricular escape beat */
11 : 'NESC', # nodal (junctional) escape beat */
12 : 'PACE', # paced beat */
13 : 'UNKNOWN', # unclassifiable beat */
14 : 'NOISE', # signal quality change */
16 : 'ARFCT', # isolated QRS-like artifact */
18 : 'STCH', # ST change */
19 : 'TCH', # T-wave change */
20 : 'SYSTOLE', # systole */
21 : 'DIASTOLE', # diastole */
22 : 'NOTE', # comment annotation */
23 : 'MEASURE', # measurement annotation */
24 : 'PWAVE', # P-wave peak */
25 : 'BBB', # left or right bundle branch block */
26 : 'PACESP', # non-conducted pacer spike */
27 : 'TWAVE', # T-wave peak */
28 : 'RHYTHM', # rhythm change */
29 : 'UWAVE', # U-wave peak */
30 : 'LEARN', # learning */
31 : 'FLWAV', # ventricular flutter wave */
32 : 'VFON', # start of ventricular flutter/fibrillation */
33 : 'VFOFF', # end of ventricular flutter/fibrillation */
34 : 'AESC', # atrial escape beat */
35 : 'SVESC', # supraventricular escape beat */
36 : 'LINK', # link to external data (aux contains URL) */
37 : 'NAPC', # non-conducted P-wave (blocked APB) */
38 : 'PFUS', # fusion of paced and normal beat */
39 : 'WFON', # waveform onset */
#WFON : 'PQ', # PQ junction (beginning of QRS) */
40 : 'WFOFF', # waveform end */
#WFOFF : 'JPT', # J point (end of QRS) */
41 : 'RONT' # R-on-T premature ventricular contraction */
}
def rdsamp(record, start=0, end=-1, interval=-1):
"""
Read signals from a format 212 record from Physionet database.
Only 2 channel records in format 212 are supported.
This is the most common record in the
Physionet database(http://www.physionet.org/physiobank/).
Parameters
----------
record : str
Full path to record. No extension to be used for record name.
start : int, optional
time to begin in seconds, default 0
end : int, optional
time to end in seconds, defaults to end of record
interval : int, optional
interval of data to be read in seconds
If both interval and end are given, earlier limit is used.
Returns
-------
data : (N, 4) ndarray
numpy array with 4 columns
col 1 - Elapsed time in samples
col 2 - Elapsed time in milliseconds
col 3,4 - The two signals
Signal amplitude is in physical units (mV)
info : dict
Dictionary containing header information
keys :
'signal_names' - Names of each signal
'samp_freq' - Sampling freq (samples / second)
'samp_count' - Total samples in record
'first_values' - First value of each signal
'gains' - Gain for each signal
'zero_values' - Zero value for each signal
"""
# read the header file - output is a dict
info = rdhdr(record)
# establish start and end in samples
start, end = _get_read_limits(start, end, interval, info)
# read the data
data = _read_data(record, start, end, info)
return data, info
def rdann(record, annotator, start=0, end=-1, types=[]):
"""
Reads annotations for given record by specified annotator.
Parameters
----------
record : str
Full path to record. Record name has no extension.
annotator : str
Name of annotator, eg. 'atr'.
This is the extension for the annotation file.
start : int, optional
time to begin in seconds, default 0
end : int, optional
time to end in seconds, defaults to end of record
types : list, optional
list of annotation types that will be returned.
Types are input as annotation code (integer from 0 to 49)
Annotation types not in list will be ignored.
Default is empty list, which results in all types being read.
Returns
-------
data : (N, 3) ndarray
numpy array with 3 columns
col 1 - Elapsed time in samples for each annotation.
col 2 - Elapsed time in seconds for each annotation.
col 3 - The annotation code.
"""
# get header data
info = rdhdr(record)
annfile = ''.join((record, '.', annotator))
with open(annfile, 'rb') as f:
arr = numpy.fromstring(f.read(), dtype = numpy.uint8).reshape((-1, 2))
rows = arr.shape[0]
annot = []
annot_time = []
i = 0
while i < rows:
anntype = arr[i, 1] >> 2
if anntype == 59:
annot.append(arr[i+3, 1] >> 2)
annot_time.append(arr[i+2, 0] + (arr[i+2, 1] << 8) +
(arr[i+1, 0] << 16) + arr[i+1, 1] << 24)
i += 3
elif anntype in [60, 61, 62]:
pass
elif anntype == 63:
hilfe = arr[i, 0] + ((arr[i, 1] & 3) << 8)
hilfe += hilfe % 2
i += hilfe / 2
else:
annot_time.append(arr[i, 0] + ((arr[i, 1] & 3) << 8))
annot.append(arr[i, 1] >> 2)
i += 1
# last values are EOF indicator
annot_time = annot_time[:-1]
annot = annot[:-1]
# annot_time should be total elapsed samples
annot_time = numpy.cumsum(annot_time)
annot_time_ms = annot_time / info['samp_freq'] # in seconds
# limit to requested interval
start, end = _get_read_limits(start, end, -1, info)
ann = numpy.array([annot_time, annot_time_ms, annot]).transpose()
# filter by annot_time in interval
ann = ann[start <= ann[:, 0]]
if end:
ann = ann[ann[:, 0] <= end]
# filter by type
if types != []:
ann = ann[numpy.logical_or.reduce([ann[:, 2] == x for x in types])]
#ann = ann[numpy.array([ann[x, 2] in types for x in range(len(ann))])]
return ann
def plot_data(data, info, ann=None):
"""
Plot the signal with annotations if available.
Parameters
----------
data : (N, 4) ndarray
Output array from rdsamp.
info : dict
Header information as a dictionary.
Output from rdsamp
ann : (N, 2) ndarray, optional
Output from rdann
Returns
-------
None
Matplotlib figure is plotted with the signals and annotations.
"""
try:
import pylab
except ImportError:
warnings.warn("Could not import pylab. Abandoning plotting")
return
time = data[:, 1] #in seconds. use data[:, 0] to use sample no.
sig1 = data[:, 2]
sig2 = data[:, 3]
pylab.subplot(211)
pylab.plot(time, sig1, '-')
pylab.xticks([])
pylab.ylabel('%s (mV)' %(info['signal_names'][0]))
pylab.subplot(212)
pylab.plot(time, sig2, 'k')
pylab.ylabel('%s (mV)' %(info['signal_names'][1]))
pylab.xlabel('Time (seconds)')
if ann != None:
# annotation time in samples from start
ann_x = (ann[:, 0] - data[0, 0]).astype('int')
pylab.plot(ann[:, 1], data[ann_x, 3], 'xr')
pylab.subplot(211)
pylab.plot(ann[:, 1], data[ann_x, 2], 'xr')
pylab.show()
def rdhdr(record):
"""
Returns the information read from the header file
Header file for each record has suffix '.hea' and
contains information about the record and each signal.
Parameters
----------
record : str
Full path to record. Record name has no extension.
Returns
-------
info : dict
Information read from the header as a dictionary.
keys :
'signal_names' - Names of each signal
'samp_freq' - Sampling freq (samples / second)
'samp_count' - Total samples in record
'first_values' - First value of each signal
'gains' - Gain for each signal
'zero_values' - Zero value for each signal
'signal_names' - Name/Descr for each signal
"""
info = {'signal_names':[], 'gains':[], 'units':[],
'first_values':[], 'zero_values':[]}
RECORD_REGEX = re.compile(r''.join([
"(?P<record>\d+)\/*(?P<seg_ct>\d*)\s",
"(?P<sig_ct>\d+)\s*",
"(?P<samp_freq>\d*)\/?(?P<counter_freq>\d*)\(?(?P<base_counter>\d*)\)?\s*",
"(?P<samp_count>\d*)\s*",
"(?P<base_time>\d{,2}:*\d{,2}:*\d{,2})\s*",
"(?P<base_date>\d{,2}\/*\d{,2}\/*\d{,4})"]))
SIGNAL_REGEX = re.compile(r''.join([
"(?P<file_name>[0-9a-zA-Z\._/-]+)\s+",
"(?P<format>\d+)x{,1}(?P<samp_per_frame>\d*):*",
"(?P<skew>\d*)\+*(?P<byte_offset>\d*)\s*",
"(?P<adc_gain>\d*)\(?(?P<baseline>\d*)\)?\/?",
"(?P<units>\w*)\s*(?P<resolution>\d*)\s*",
"(?P<adc_zero>\d*)\s*(?P<init_value>[\d-]*)\s*",
"(?P<checksum>[0-9-]*)\s*(?P<block_size>\d*)\s*",
"(?P<description>[a-zA-Z0-9\s]*)"]))
DESC_REGEX = re.compile(r''.join([
"(?P<age>[\d?]{1,2})\s*",
"[^MF]*(?P<gender>[MF?]{1})\s*",
"(?P<diagnosis>.*)"]))
header_lines, comment_lines = _getheaderlines(record)
(record_name, seg_count, signal_count, samp_freq,
counter_freq, base_counter, samp_count,
base_time, base_date) = RECORD_REGEX.findall(header_lines[0])[0]
# use 250 if missing
if samp_freq == '':
samp_freq = 250
if samp_count == '':
samp_count = 0
# if base_time == '':
# base_time = '00:00:00'
# if base_date == '':
# base_date = '01/01/1900'
info['samp_freq'] = float(samp_freq)
info['samp_count'] = int(samp_count)
if base_time != '':
info['base_time'] = dt.datetime(1900,01,01,0,0,0,tzinfo=None).strptime(base_time, '%H:%M:%S')
else:
info['base_time'] = None
if base_date != '':
info['base_date'] = dt.datetime(1900,01,01,0,0,0,tzinfo=None).strptime(base_date, '%d/%m/%Y')
else:
info['base_date'] = None
if len(header_lines) > 1:
for sig in range(2):
(file_name, file_format, samp_per_frame, skew,
byte_offset, gain, baseline, units,
resolution, zero_value, first_value,
checksum, blocksize, signal_name) = SIGNAL_REGEX.findall(
header_lines[sig+1])[0]
# replace with defaults for missing values
if gain == '' or gain == 0:
gain = 200
if units == '':
units = 'mV'
if zero_value == '':
zero_value = 0
if first_value == '':
first_value = 0 # do not use to check
info['gains'].append(float(gain))
info['units'].append(units)
info['zero_values'].append(float(zero_value))
info['first_values'].append(float(first_value))
info['signal_names'].append(signal_name)
(age, gender, diagnosis) = DESC_REGEX.findall(comment_lines[0])[0]
if age == '':
age = '?'
if gender == '':
gender = '?'
if diagnosis == '':
diagnosis = '?'
#if age != '':
info['age'] = age
#if gender != '':
info['gender'] = gender
#if diagnosis != '':
info['diagnosis'] = diagnosis
return info
def _getheaderlines(record):
"""Read the header file and separate comment lines
and header lines"""
hfile = record + '.hea'
all_lines = open(hfile, 'r').readlines()
comment_lines = []
header_lines = []
# strip newlines
all_lines = [l.rstrip('\n').rstrip('\r') for l in all_lines]
# comments
for l in all_lines:
if l.startswith('#'):
comment_lines.append(l)
elif l.strip() != '':
header_lines.append(l)
return header_lines, comment_lines
def _get_read_limits(start, end, interval, info):
"""
Given start time, end time and interval
for reading data, determines limits to use.
info is the dict returned by rdhdr
end of -1 means end of record.
If both end and interval are given, choose
earlier limit of two.
start and end are returned as samples.
Example:
>>> _get_read_limits(0, 2, -1, {'samp_count':100, 'samp_freq':10})
(0, 20)
>>> _get_read_limits(0, 2, 3, {'samp_count':100, 'samp_freq':10})
(0, 20)
>>> _get_read_limits(0, 4, 2, {'samp_count':100, 'samp_freq':10})
(0, 20)
>>> _get_read_limits(0, 105, -1, {'samp_count':100, 'samp_freq':10})
(0, 100)
>>> _get_read_limits(-1, -1, -1, {'samp_count':100, 'samp_freq':10})
(0, 100)
"""
start *= info['samp_freq']
end *= info['samp_freq']
if start < 0: # If start is negative, start at 0
start = 0
if end < 0: # if end is negative, use end of record
end = info['samp_count']
if end < start: # if end is before start, swap them
start, end = end, start
interval_end = start + interval * info['samp_freq'] # end det by interval
if interval_end < start:
interval_end = info['samp_count']
end = min(end, interval_end, info['samp_count']) # use earlier end
return int(start), int(end)
def _read_data(record, start, end, info):
"""Read the binary data for each signal"""
datfile = record + '.dat'
samp_to_read = end - start
# verify against first value in header
with open(datfile, 'rb') as f:
data = _arr_to_data(numpy.fromstring(f.read(3),
dtype=numpy.uint8).reshape(1,3))
#if [data[0, 2], data[0, 3]] != info['first_values']:
# warnings.warn(
# 'First value from dat file does not match value in header')
# read into an array with 3 bytes in each row
with open(datfile, 'rb') as f:
f.seek(start*3)
arr = numpy.fromstring(f.read(3*samp_to_read),
dtype=numpy.uint8).reshape((samp_to_read, 3))
data = _arr_to_data(arr)
# adjust zerovalue and gain
if info['gains'][0] > 0:
data[:, 2] = (data[:, 2] - info['zero_values'][0]) / info['gains'][0]
else:
data[:, 2] = data[:, 2] - info['zero_values'][0]
if info['gains'][1] > 0:
data[:, 3] = (data[:, 3] - info['zero_values'][1]) / info['gains'][1]
else:
data[:, 3] = data[:, 3] - info['zero_values'][1]
# time columns
data[:, 0] = numpy.arange(start, end) # elapsed time in samples
data[:, 1] = (numpy.arange(samp_to_read) + start
) / info['samp_freq'] # in sec
return data
def _arr_to_data(arr):
"""From the numpy array read from the dat file
using bit level operations, extract the 12-bit data"""
second_col = arr[:, 1].astype('int')
bytes1 = second_col & 15 # bytes belonging to first sample
bytes2 = second_col >> 4 # belongs to second sample
sign1 = (second_col & 8) << 9 # sign bit for first sample
sign2 = (second_col & 128) << 5 # sign bit for second sample
# data has columns - samples, time(ms), signal1 and signal2
data = numpy.zeros((arr.shape[0], 4), dtype='float')
data[:, 2] = (bytes1 << 8) + arr[:, 0] - sign1
data[:, 3] = (bytes2 << 8) + arr[:, 2] - sign2
return data
def get_annotation_code(code=None):
"""Returns the symbolic definition for the wfdb annotation code.
See http://www.physionet.org/physiotools/wpg/wpg_31.htm for details.
Based on ecgcodes.h from wfdb.
Parameters
----------
code : int
Integer from 0 to 49 (ACMAX).
Returns
-------
Definition : str
The definition for the code.
"""
return CODEDICT[code]
def main():
"""Run tests when called directly"""
import nose
print "-----------------"
print "Running the tests"
print "-----------------"
nose.main()
if __name__ == '__main__':
main()