-
Notifications
You must be signed in to change notification settings - Fork 185
/
Copy pathprocessors.py
122 lines (97 loc) · 4.64 KB
/
processors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import datetime
from fitparse.utils import scrub_method_name, is_iterable
# Datetimes (uint32) represent seconds since this UTC_REFERENCE
UTC_REFERENCE = 631065600 # timestamp for UTC 00:00 Dec 31 1989
class FitFileDataProcessor(object):
# TODO: Document API
# Functions that will be called to do the processing:
#def run_type_processor(field_data)
#def run_field_processor(field_data)
#def run_unit_processor(field_data)
#def run_message_processor(data_message)
# By default, the above functions call these functions if they exist:
#def process_type_<type_name> (field_data)
#def process_field_<field_name> (field_data) -- can be unknown_DD but NOT recommended
#def process_units_<unit_name> (field_data)
#def process_message_<mesg_name / mesg_type_num> (data_message)
# Used to memoize scrubbed method names
_scrubbed_method_names = {}
def _scrub_method_name(self, method_name):
"""Scrubs a method name, returning result from local cache if available.
This method wraps fitparse.utils.scrub_method_name and memoizes results,
as scrubbing a method name is expensive.
Args:
method_name: Method name to scrub.
Returns:
Scrubbed method name.
"""
if method_name not in self._scrubbed_method_names:
self._scrubbed_method_names[method_name] = (
scrub_method_name(method_name))
return self._scrubbed_method_names[method_name]
def run_type_processor(self, field_data):
self._run_processor(self._scrub_method_name(
'process_type_%s' % field_data.type.name), field_data)
def run_field_processor(self, field_data):
self._run_processor(self._scrub_method_name(
'process_field_%s' % field_data.name), field_data)
def run_unit_processor(self, field_data):
if field_data.units:
self._run_processor(self._scrub_method_name(
'process_units_%s' % field_data.units), field_data)
def run_message_processor(self, data_message):
self._run_processor(self._scrub_method_name(
'process_message_%s' % data_message.def_mesg.name), data_message)
def _run_processor(self, processor_name, data):
try:
getattr(self, processor_name)(data)
except AttributeError:
pass
def process_type_bool(self, field_data):
if field_data.value is not None:
field_data.value = bool(field_data.value)
def process_type_date_time(self, field_data):
value = field_data.value
if value is not None and value >= 0x10000000:
field_data.value = datetime.datetime.utcfromtimestamp(UTC_REFERENCE + value)
field_data.units = None # Units were 's', set to None
def process_type_local_date_time(self, field_data):
if field_data.value is not None:
# NOTE: This value was created on the device using it's local timezone.
# Unless we know that timezone, this value won't be correct. However, if we
# assume UTC, at least it'll be consistent.
field_data.value = datetime.datetime.utcfromtimestamp(UTC_REFERENCE + field_data.value)
field_data.units = None
def process_type_localtime_into_day(self, field_data):
if field_data.value is not None:
m, s = divmod(field_data.value, 60)
h, m = divmod(m, 60)
field_data.value = datetime.time(h, m, s)
field_data.units = None
class StandardUnitsDataProcessor(FitFileDataProcessor):
def run_field_processor(self, field_data):
"""
Convert all '*_speed' fields using 'process_field_speed'
All other units will use the default method.
"""
if field_data.name.endswith("_speed"):
self.process_field_speed(field_data)
else:
super(StandardUnitsDataProcessor, self).run_field_processor(field_data)
def process_field_distance(self, field_data):
if field_data.value is not None:
field_data.value /= 1000.0
field_data.units = 'km'
def process_field_speed(self, field_data):
if field_data.value is not None:
factor = 60.0 * 60.0 / 1000.0
# record.enhanced_speed field can be a tuple
if is_iterable(field_data.value):
field_data.value = tuple(x * factor for x in field_data.value)
else:
field_data.value *= factor
field_data.units = 'km/h'
def process_units_semicircles(self, field_data):
if field_data.value is not None:
field_data.value *= 180.0 / (2 ** 31)
field_data.units = 'deg'