-
Notifications
You must be signed in to change notification settings - Fork 3
/
process_addresses.py
170 lines (128 loc) · 4.49 KB
/
process_addresses.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from __future__ import print_function
import re
import xml.etree.cElementTree as ET
import sys
import expansions
"""Script to process Kokomo address data in OSM format, and generate
a cleaned up, porperly tagged OSM file."""
INITIAL_ID = -747
def main(infile):
osm = ET.parse(infile)
root = osm.getroot()
count_skipped = 0
# tree for storing good nodes
processed_root = newroot()
# tree for storing nodes that have problems (unparseable addrs)
problems_root = newroot()
for node in root:
house_numb_node = node.find("tag[@k='HOUSE_NUMB']")
if house_numb_node is None:
log("Skipping node without HOUSE_NUMB")
count_skipped += 1
continue
dir_node = node.find("tag[@k='DIR']")
street_node = node.find("tag[@k='STREET']")
type_node = node.find("tag[@k='TYPE']")
address_la_node = node.find("tag[@k='ADDRESS_LA']")
house_numb_text = house_numb_node.attrib['v']
if dir_node is not None:
dir_text = dir_node.attrib['v']
else:
dir_text = None
street_text = capitalize(street_node.attrib['v'])
address_la_text = address_la_node.attrib['v']
if type_node is not None:
type_text = type_node.attrib['v']
else:
log("%s does not have a street type" % address_la_text)
type_text = None
status_node = node.find("tag[@k='STATUS']")
if status_node is not None and status_node.attrib['v'] == "RETIRED":
log("Skipping retired addr: " + address_la_text)
count_skipped += 1
continue
if type_text in expansions.road_types:
type_text = expansions.road_types[type_text]
elif type_text is not None:
log("Could not expand street type " + type_text)
type_text = capitalize(type_text)
if dir_text and dir_text in expansions.directions:
dir_text = expansions.directions[dir_text]
elif dir_text:
log("Could not expand direction " + dir_text)
else: # don't do anything if there is no street direction
pass
street = " ".join(filter(None, [dir_text, street_text, type_text]))
newnode(processed_root, node.attrib['lat'], node.attrib['lon'], {
"addr:housenumber": house_numb_text,
"addr:street": street
})
#log('%s\t\t%s' % (house_numb_text, street))
log("----")
log("%d nodes skipped" % count_skipped)
log("%d nodes total" % len(root))
processed_doc = ET.ElementTree(processed_root)
processed_doc.write(sys.stdout, encoding="UTF-8")
def capitalize(txt):
return " ".join(map(capitalize_word, txt.split(" ")))
def capitalize_word(txt):
"""Capitalizes one word."""
return txt[0].upper() + txt[1:].lower()
def newroot():
root = ET.Element("osm")
root.attrib['version'] = '0.6'
root.attrib['upload'] = 'true'
root.attrib['generator'] = 'doublemap/nola-addresses'
return root
def newnode(root, lat, lon, tags={}):
"Creates and returns a new <node> element."
n = ET.Element("node")
n.attrib['id'] = newid()
n.attrib['lat'] = lat
n.attrib['lon'] = lon
n.attrib['visible'] = "true"
root.append(n)
for k, v in tags.iteritems():
n.append(ET.Element("tag", {
"k": k,
"v": v
}))
return n
def newid():
"Generates the next (negative) ID number."
global INITIAL_ID
INITIAL_ID -= 1
return str(INITIAL_ID)
def parse_addr(text):
matches = re.match('(\\d+)\\s+(.+)', text)
if not matches:
return None
housenumber = matches.group(1)
street = expand_street(matches.group(2))
return (housenumber, street)
def expand_street(text):
# expand directions
def expand_dir(abbr_match):
abbr = abbr_match.group()
if abbr in expansions.directions:
return expansions.directions[abbr]
else:
return abbr
text = re.sub("\\b[NSEW]{,2}\\b", expand_dir, text)
# expand road type
def expand_road(abbr_match):
abbr = abbr_match.group()
if abbr in expansions.road_types:
return expansions.road_types[abbr]
else:
return abbr
text = re.sub('\\b\\w{,5}$', expand_road, text)
return text
def log(text):
if isinstance(text, str):
sys.stderr.write(text)
else:
sys.stderr.write(repr(text))
sys.stderr.write("\n")
if __name__ == "__main__":
main(sys.stdin)