-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwalkshed.py
executable file
·240 lines (205 loc) · 10.3 KB
/
walkshed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#!/usr/bin/env python
from imposm.parser import OSMParser
from pygraph.classes.graph import graph
from pygraph.algorithms.minmax import shortest_path
import math
EARTH_RADIUS = 6371e3
class Node(object):
def __init__(self, lon, lat):
self.lon = float(lon)
self.lat = float(lat)
self.walkable = False
self.walkDistance = float("inf")
def distanceTo(self, other):
lat1 = self.lat /180. * math.pi
lat2 = other.lat /180. * math.pi
lon1 = self.lon /180. * math.pi
lon2 = other.lon /180. * math.pi
val = math.acos(min(math.sin(lat1) * math.sin(lat2) + math.cos(lat1) *
math.cos(lat2) * math.cos(lon2-lon1), 1)) * EARTH_RADIUS
if math.isnan(val):
return 0
else:
return val
class Way(object):
def __init__(self, tags, refs):
self.tags = tags
self.refs = refs
# simple class that handles the parsed OSM data.
class Map(object):
nodes = {}
ways = {}
def nodes_callback(self, nodes):
for osmid, lon, lat in nodes:
self.nodes[osmid] = Node(lon, lat)
def ways_callback(self, ways):
for osmid, tags, refs in ways:
if 'highway' in tags and 'motor' not in tags['highway']:
self.ways[osmid] = Way(tags, refs)
for index in refs:
if index in self.nodes:
self.nodes[index].walkable = True
def _clearDistances(self, ):
inf = float("inf")
for node in self.nodes.values():
node.walkDistance = inf
def calculateWalkDistances(self, stops):
self._clearDistances()
fillerNodes = 0
gr = graph()
gr.add_nodes(self.nodes.keys())
# build the graph from our map
for way in self.ways.values():
lastNode = None
for node in way.refs:
if (lastNode and lastNode in self.nodes and
node in self.nodes and not gr.has_edge((lastNode, node))):
gr.add_edge((lastNode, node), self.nodes[lastNode].distanceTo(self.nodes[node]))
lastNode = node
def frange(x, y, jump):
while x < y:
yield x
x += jump
#Add nodes from stops
for stop in stops:
#add stop to graph
gr.add_node(stop)
#find nearest node to stop
nearest = -1
nearestDist = float('inf')
for key, node in self.nodes.iteritems():
if node.walkable:
dist = node.distanceTo(stop)
if dist <= 10: # here we add all nodes within 10 m (approx road width)
gr.add_edge((stop, key), dist)
if dist < nearestDist:
nearestDist = node.distanceTo(stop)
nearest = key
# ensure the nearest node is not missed
if nearestDist > 10:
#gr.add_edge((stop, nearest), nearestDist)
nearestNode = self.nodes[nearest]
for mrR in gr.neighbors(nearest):
if mrR not in self.nodes:
continue
mrRogers = self.nodes[mrR]
ndist = nearestNode.distanceTo(mrRogers)
lastNode = nearest
for factor in frange(5.0/ndist, 1, 5.0/ndist):
fillerNodes -= 1
newNode = Node(nearestNode.lon + factor *
(mrRogers.lon - nearestNode.lon),
nearestNode.lat + factor *
(mrRogers.lat - nearestNode.lat))
gr.add_node(fillerNodes)
self.nodes[fillerNodes] = newNode
if stop.distanceTo(newNode) <= 10:
gr.add_edge((stop, fillerNodes), stop.distanceTo(newNode))
gr.add_edge((lastNode, fillerNodes), 5)
lastNode = fillerNodes
if not gr.has_edge((lastNode, mrR)):
gr.add_edge((lastNode, mrR), ndist % 5)
# this graph libary may be overkill (i.e. we should probably stop Djikstra's algorithm
# after some distance) but it beats reinventing the wheel
st, distances = shortest_path(gr, stop)
# Set the walking distance to be whatever is the nearest stop
for key, dist in distances.iteritems():
if key in self.nodes and dist < self.nodes[key].walkDistance:
self.nodes[key].walkDistance = dist
def generateWalkShed(self, distThreshold, props=None):
# base geoJSON
feature = {'type': 'Feature',
'geometry' : {'type': 'MultiLineString', 'coordinates': []},
'properties': props or {}}
# add coordinates as LineStrings to geoJSON MultiLineString
def addSequence(feature, sequence):
coords = []
for node in sequence:
coords.append([node.lon, node.lat])
feature['geometry']['coordinates'].append(coords)
# for each way, determine which nodes are within walking threshold
for way in self.ways.values():
sequence = []
for i, noderef in enumerate(way.refs):
if noderef in self.nodes:
node = self.nodes[noderef]
distLeft = distThreshold - node.walkDistance
if distLeft >= 0: # node is within walking threshold
if len(sequence) == 0 and i > 0:
# need to back-track to find the point on way where walkability begins
lastNode = self.nodes[way.refs[i-1]]
lastDist = node.distanceTo(lastNode)
portion = distLeft / lastDist
sequence.append(Node(node.lon + portion * (lastNode.lon - node.lon),
node.lat + portion * (lastNode.lat - node.lat)))
sequence.append(node)
if i+1 < len(way.refs): # peek ahead to next node
nextNode = self.nodes[way.refs[i+1]]
nextDistLeft = distThreshold - nextNode.walkDistance
nextDist = node.distanceTo(nextNode)
if nextDistLeft < 0 or distLeft + nextDistLeft < nextDist:
# if the next node is not in the walkable area, or there is a portion
# of the way between this node and the next that too far from transit,
# find the point of the furthest possible walking distance and stop
# line there
portion = distLeft / nextDist
sequence.append(Node(node.lon + portion * (nextNode.lon - node.lon),
node.lat + portion * (nextNode.lat - node.lat)))
addSequence(feature, sequence)
sequence = []
# if the final node on the way was walkable, need to add this last LineString
if len(sequence) > 0:
addSequence(feature, sequence)
return feature
def processShed(pbfFile, outFile, distance, mode, concurrent, stopNodes):
# instantiate map and parser and start parsing
osmap = Map()
p = OSMParser(concurrency=concurrent, ways_callback=osmap.ways_callback, coords_callback=osmap.nodes_callback)
p.parse(pbfFile)
osmap.calculateWalkDistances(stopNodes)
walkshed = osmap.generateWalkShed(distance, {'distance': distance, 'mode': mode})
import json
with open(outFile, 'w') as f:
json.dump(walkshed, f)
def addRouteStops(route, stopNodes, stops):
# add
for trip in route['trips']:
for stop in trip['stops']:
if 'stop_id' in stop:
stopNodes.append(Node(stops[stop['stop_id']]['stop_lon'],
stops[stop['stop_id']]['stop_lat']))
else:
stopNodes.append(Node(stop['lon'], stop['lat']))
def main():
import argparse
parser = argparse.ArgumentParser(description='Generate geoJSON of selected transit route walkshed.')
parser.add_argument('-j', '--json', dest='jsonFile', required=True,
help='json file containing future transit service information')
parser.add_argument('-g', '--gtfs', dest='gtfsDir', required=True,
help='directory containing GTFS of existing service')
parser.add_argument('-p', '--pbf', dest='pbfFile', required=True,
help='OSM map file')
parser.add_argument('-o', '--output', dest='outFile', required=True,
help='file where geoJSON data is to be output')
parser.add_argument('-d', '--distance', dest='distance', default=400, type=float,
help='walkable distance from transit stations')
parser.add_argument('-m', '--mode', dest='mode', default='bus',
help='mode of service')
parser.add_argument('-c', dest='concurrent', default=2, type=int,
help='number of processors to use when importing map')
parser.add_argument('ids', nargs="*", default=[], metavar='ID',
help='id of route in json file to include')
args = parser.parse_args()
import os.path
from readCSV import readCSV
stops, headings = readCSV(os.path.join(args.gtfsDir, 'stops.txt'), 'stop_id')
import json
with open(args.jsonFile, 'r') as file:
routes = json.load(file)
stopNodes = []
for route in routes:
if len(args.ids) == 0 or route['id'] in args.ids:
addRouteStops(route, stopNodes, stops)
processShed(args.pbfFile, args.outFile, args.distance, args.mode, args.concurrent, stopNodes)
if __name__ == "__main__":
main()