-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparse_example_scos_message.py
136 lines (116 loc) · 6.55 KB
/
parse_example_scos_message.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import xml.etree.ElementTree as ET
APOLLO_TYPES_NAMESPACE = 'http://types.apollo.pitt.edu/v4_0/'
XSI_TYPE = '{http://www.w3.org/2001/XMLSchema-instance}type'
def process_other_variables(query, base):
variable = base.find('{' + APOLLO_TYPES_NAMESPACE + '}variable')
categories = base.findall('{' + APOLLO_TYPES_NAMESPACE + '}categories')
range_type = ''
if variable.text == 'ageRange':
range_type = 'age_range'
elif variable.text == 'householdMedianIncome':
range_type = 'household_median_income'
query["simulator_count_variables"][range_type] = {}
query["output_options"]['axes'].append(range_type)
for category in categories:
type = category.get(XSI_TYPE)
if type == 'apollo:MeasuredQuantityRangeCategoryDefinition':
unit_of_measure = category.find('{' + APOLLO_TYPES_NAMESPACE + '}unitOfMeasure')
lower_bound = category.find('{' + APOLLO_TYPES_NAMESPACE + '}lowerBound')
query_lb = process_boundary(lower_bound)
upper_bound = category.find('{' + APOLLO_TYPES_NAMESPACE + '}upperBound')
query_ub = process_boundary(upper_bound)
bound_text = str(query_lb) + ' - ' + str(query_ub)
query["simulator_count_variables"][range_type][bound_text] = {}
query["simulator_count_variables"][range_type][bound_text]['range_units'] = unit_of_measure.text
query["simulator_count_variables"][range_type][bound_text]['range'] = [query_lb, query_ub]
def process_boundary(boundary_element):
finite_boundry = boundary_element.find('{' + APOLLO_TYPES_NAMESPACE + '}finiteBoundary')
infinite_boundry = boundary_element.find('{' + APOLLO_TYPES_NAMESPACE + '}infiniteBoundary')
if finite_boundry is not None:
lb = int(finite_boundry.text)
elif infinite_boundry is not None:
inf_type = infinite_boundry.text
if inf_type == 'negativeInfinity':
lb = -float("inf")
elif inf_type == 'positiveInfinity':
lb = float("inf")
return lb
def process_spatial_granularity(query, element):
if element.text != 'none':
if element.text == 'admin0':
query["output_options"]['axes'].append('location_admin0')
elif element.text == 'admin1':
query["output_options"]['axes'].append('location_admin0')
query["output_options"]['axes'].append('location_admin1')
elif element.text == 'admin2':
query["output_options"]['axes'].append('location_admin0')
query["output_options"]['axes'].append('location_admin1')
query["output_options"]['axes'].append('location_admin2')
elif element.text == 'admin3':
query["output_options"]['axes'].append('location_admin0')
query["output_options"]['axes'].append('location_admin1')
query["output_options"]['axes'].append('location_admin2')
query["output_options"]['axes'].append('location_admin3')
elif element.text == 'admin4':
query["output_options"]['axes'].append('location_admin0')
query["output_options"]['axes'].append('location_admin1')
query["output_options"]['axes'].append('location_admin2')
query["output_options"]['axes'].append('location_admin3')
query["output_options"]['axes'].append('location_admin4')
elif element.text == 'admin5':
query["output_options"]['axes'].append('location_admin0')
query["output_options"]['axes'].append('location_admin1')
query["output_options"]['axes'].append('location_admin2')
query["output_options"]['axes'].append('location_admin3')
query["output_options"]['axes'].append('location_admin4')
query["output_options"]['axes'].append('location_admin5')
elif element.text == 'latLong':
print ("Error: latLong coordinates are not currently supported")
def get_queries_from_scos(scos_xml_root_node):
queries = []
for count_specification in scos_xml_root_node.findall('{' + APOLLO_TYPES_NAMESPACE + '}SimulatorCountOutputSpecification'):
query = {}
query["simulator_count_variables"] = {}
query["output_options"] = {}
query["output_options"]['axes'] = []
for element in count_specification:
namespace = ''
field = ''
if '}' in element.tag:
field = element.tag.split('}', 1)[1] # strip all namespaces
namespace = (element.tag.split('}', 1)[0])[1:] # strip all namespaces
if namespace != APOLLO_TYPES_NAMESPACE:
print("Error: unsupported Apollo type namespace was used in the XML")
return None
if field == 'speciesToCount':
query["simulator_count_variables"]['species'] = {element.text}
query["output_options"]['axes'].append('species')
elif field == 'temporalGranularity':
if element.text != 'entireSimulation':
if element.text != 'eachSimulationTimestep':
print('Error: unrecognized temporalGranularity. The available options are \'entireSimulation\' and \'eachSimulationTimestep\'')
return None
query["output_options"]['axes'].append('simulator_time')
elif field == 'spatialGranularity':
process_spatial_granularity(query, element)
elif field == 'infectionState':
if 'infection_state' in query["simulator_count_variables"]:
query["simulator_count_variables"]['infection_state'].add(element.text.upper())
else:
query["simulator_count_variables"]['infection_state'] = {element.text.upper()}
query["output_options"]['axes'].append('infection_state')
elif field == 'diseaseOutcome':
if 'disease_state' in query["simulator_count_variables"]:
query["simulator_count_variables"]['disease_state'].add(element.text.upper())
else:
query["simulator_count_variables"]['disease_state'] = {element.text.upper()}
query["output_options"]['axes'].append('disease_state')
elif field == 'otherVariables':
process_other_variables(query, element)
queries.append(query)
return queries
if __name__ == '__main__':
tree = ET.parse('/Users/nem41/Documents/code_projects/apollo_projects/example-scos-messages/num_infected_by_location.xml')
root = tree.getroot()
queries = get_queries_from_scos(root)
print(queries)