-
-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
local_planner.py
352 lines (292 loc) · 13.6 KB
/
local_planner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# Copyright (c) # Copyright (c) 2018-2020 CVC.
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
""" This module contains a local planner to perform low-level waypoint following based on PID controllers. """
from enum import IntEnum
from collections import deque
import random
import carla
from agents.navigation.controller import VehiclePIDController
from agents.tools.misc import draw_waypoints, get_speed
class RoadOption(IntEnum):
"""
RoadOption represents the possible topological configurations when moving from a segment of lane to other.
"""
VOID = -1
LEFT = 1
RIGHT = 2
STRAIGHT = 3
LANEFOLLOW = 4
CHANGELANELEFT = 5
CHANGELANERIGHT = 6
class LocalPlanner(object):
"""
LocalPlanner implements the basic behavior of following a
trajectory of waypoints that is generated on-the-fly.
The low-level motion of the vehicle is computed by using two PID controllers,
one is used for the lateral control and the other for the longitudinal control (cruise speed).
When multiple paths are available (intersections) this local planner makes a random choice,
unless a given global plan has already been specified.
"""
def __init__(self, vehicle, opt_dict={}, map_inst=None):
"""
:param vehicle: actor to apply to local planner logic onto
:param opt_dict: dictionary of arguments with different parameters:
dt: time between simulation steps
target_speed: desired cruise speed in Km/h
sampling_radius: distance between the waypoints part of the plan
lateral_control_dict: values of the lateral PID controller
longitudinal_control_dict: values of the longitudinal PID controller
max_throttle: maximum throttle applied to the vehicle
max_brake: maximum brake applied to the vehicle
max_steering: maximum steering applied to the vehicle
offset: distance between the route waypoints and the center of the lane
:param map_inst: carla.Map instance to avoid the expensive call of getting it.
"""
self._vehicle = vehicle
self._world = self._vehicle.get_world()
if map_inst:
if isinstance(map_inst, carla.Map):
self._map = map_inst
else:
print("Warning: Ignoring the given map as it is not a 'carla.Map'")
self._map = self._world.get_map()
else:
self._map = self._world.get_map()
self._vehicle_controller = None
self.target_waypoint = None
self.target_road_option = None
self._waypoints_queue = deque(maxlen=10000)
self._min_waypoint_queue_length = 100
self._stop_waypoint_creation = False
# Base parameters
self._dt = 1.0 / 20.0
self._target_speed = 20.0 # Km/h
self._sampling_radius = 2.0
self._args_lateral_dict = {'K_P': 1.95, 'K_I': 0.05, 'K_D': 0.2, 'dt': self._dt}
self._args_longitudinal_dict = {'K_P': 1.0, 'K_I': 0.05, 'K_D': 0, 'dt': self._dt}
self._max_throt = 0.75
self._max_brake = 0.3
self._max_steer = 0.8
self._offset = 0
self._base_min_distance = 3.0
self._distance_ratio = 0.5
self._follow_speed_limits = False
# Overload parameters
if opt_dict:
if 'dt' in opt_dict:
self._dt = opt_dict['dt']
if 'target_speed' in opt_dict:
self._target_speed = opt_dict['target_speed']
if 'sampling_radius' in opt_dict:
self._sampling_radius = opt_dict['sampling_radius']
if 'lateral_control_dict' in opt_dict:
self._args_lateral_dict = opt_dict['lateral_control_dict']
if 'longitudinal_control_dict' in opt_dict:
self._args_longitudinal_dict = opt_dict['longitudinal_control_dict']
if 'max_throttle' in opt_dict:
self._max_throt = opt_dict['max_throttle']
if 'max_brake' in opt_dict:
self._max_brake = opt_dict['max_brake']
if 'max_steering' in opt_dict:
self._max_steer = opt_dict['max_steering']
if 'offset' in opt_dict:
self._offset = opt_dict['offset']
if 'base_min_distance' in opt_dict:
self._base_min_distance = opt_dict['base_min_distance']
if 'distance_ratio' in opt_dict:
self._distance_ratio = opt_dict['distance_ratio']
if 'follow_speed_limits' in opt_dict:
self._follow_speed_limits = opt_dict['follow_speed_limits']
# initializing controller
self._init_controller()
def reset_vehicle(self):
"""Reset the ego-vehicle"""
self._vehicle = None
def _init_controller(self):
"""Controller initialization"""
self._vehicle_controller = VehiclePIDController(self._vehicle,
args_lateral=self._args_lateral_dict,
args_longitudinal=self._args_longitudinal_dict,
offset=self._offset,
max_throttle=self._max_throt,
max_brake=self._max_brake,
max_steering=self._max_steer)
# Compute the current vehicle waypoint
current_waypoint = self._map.get_waypoint(self._vehicle.get_location())
self.target_waypoint, self.target_road_option = (current_waypoint, RoadOption.LANEFOLLOW)
self._waypoints_queue.append((self.target_waypoint, self.target_road_option))
def set_speed(self, speed):
"""
Changes the target speed
:param speed: new target speed in Km/h
:return:
"""
if self._follow_speed_limits:
print("WARNING: The max speed is currently set to follow the speed limits. "
"Use 'follow_speed_limits' to deactivate this")
self._target_speed = speed
def follow_speed_limits(self, value=True):
"""
Activates a flag that makes the max speed dynamically vary according to the spped limits
:param value: bool
:return:
"""
self._follow_speed_limits = value
def _compute_next_waypoints(self, k=1):
"""
Add new waypoints to the trajectory queue.
:param k: how many waypoints to compute
:return:
"""
# check we do not overflow the queue
available_entries = self._waypoints_queue.maxlen - len(self._waypoints_queue)
k = min(available_entries, k)
for _ in range(k):
last_waypoint = self._waypoints_queue[-1][0]
next_waypoints = list(last_waypoint.next(self._sampling_radius))
if len(next_waypoints) == 0:
break
elif len(next_waypoints) == 1:
# only one option available ==> lanefollowing
next_waypoint = next_waypoints[0]
road_option = RoadOption.LANEFOLLOW
else:
# random choice between the possible options
road_options_list = _retrieve_options(
next_waypoints, last_waypoint)
road_option = random.choice(road_options_list)
next_waypoint = next_waypoints[road_options_list.index(
road_option)]
self._waypoints_queue.append((next_waypoint, road_option))
def set_global_plan(self, current_plan, stop_waypoint_creation=True, clean_queue=True):
"""
Adds a new plan to the local planner. A plan must be a list of [carla.Waypoint, RoadOption] pairs
The 'clean_queue` parameter erases the previous plan if True, otherwise, it adds it to the old one
The 'stop_waypoint_creation' flag stops the automatic creation of random waypoints
:param current_plan: list of (carla.Waypoint, RoadOption)
:param stop_waypoint_creation: bool
:param clean_queue: bool
:return:
"""
if clean_queue:
self._waypoints_queue.clear()
# Remake the waypoints queue if the new plan has a higher length than the queue
new_plan_length = len(current_plan) + len(self._waypoints_queue)
if new_plan_length > self._waypoints_queue.maxlen:
new_waypoint_queue = deque(maxlen=new_plan_length)
for wp in self._waypoints_queue:
new_waypoint_queue.append(wp)
self._waypoints_queue = new_waypoint_queue
for elem in current_plan:
self._waypoints_queue.append(elem)
self._stop_waypoint_creation = stop_waypoint_creation
def set_offset(self, offset):
"""Sets an offset for the vehicle"""
self._vehicle_controller.set_offset(offset)
def run_step(self, debug=False):
"""
Execute one step of local planning which involves running the longitudinal and lateral PID controllers to
follow the waypoints trajectory.
:param debug: boolean flag to activate waypoints debugging
:return: control to be applied
"""
if self._follow_speed_limits:
self._target_speed = self._vehicle.get_speed_limit()
# Add more waypoints too few in the horizon
if not self._stop_waypoint_creation and len(self._waypoints_queue) < self._min_waypoint_queue_length:
self._compute_next_waypoints(k=self._min_waypoint_queue_length)
# Purge the queue of obsolete waypoints
veh_location = self._vehicle.get_location()
vehicle_speed = get_speed(self._vehicle) / 3.6
self._min_distance = self._base_min_distance + self._distance_ratio * vehicle_speed
num_waypoint_removed = 0
for waypoint, _ in self._waypoints_queue:
if len(self._waypoints_queue) - num_waypoint_removed == 1:
min_distance = 1 # Don't remove the last waypoint until very close by
else:
min_distance = self._min_distance
if veh_location.distance(waypoint.transform.location) < min_distance:
num_waypoint_removed += 1
else:
break
if num_waypoint_removed > 0:
for _ in range(num_waypoint_removed):
self._waypoints_queue.popleft()
# Get the target waypoint and move using the PID controllers. Stop if no target waypoint
if len(self._waypoints_queue) == 0:
control = carla.VehicleControl()
control.steer = 0.0
control.throttle = 0.0
control.brake = 1.0
control.hand_brake = False
control.manual_gear_shift = False
else:
self.target_waypoint, self.target_road_option = self._waypoints_queue[0]
control = self._vehicle_controller.run_step(self._target_speed, self.target_waypoint)
if debug:
draw_waypoints(self._vehicle.get_world(), [self.target_waypoint], 1.0)
return control
def get_incoming_waypoint_and_direction(self, steps=3):
"""
Returns direction and waypoint at a distance ahead defined by the user.
:param steps: number of steps to get the incoming waypoint.
"""
if len(self._waypoints_queue) > steps:
return self._waypoints_queue[steps]
else:
try:
wpt, direction = self._waypoints_queue[-1]
return wpt, direction
except IndexError as i:
return None, RoadOption.VOID
def get_plan(self):
"""Returns the current plan of the local planner"""
return self._waypoints_queue
def done(self):
"""
Returns whether or not the planner has finished
:return: boolean
"""
return len(self._waypoints_queue) == 0
def _retrieve_options(list_waypoints, current_waypoint):
"""
Compute the type of connection between the current active waypoint and the multiple waypoints present in
list_waypoints. The result is encoded as a list of RoadOption enums.
:param list_waypoints: list with the possible target waypoints in case of multiple options
:param current_waypoint: current active waypoint
:return: list of RoadOption enums representing the type of connection from the active waypoint to each
candidate in list_waypoints
"""
options = []
for next_waypoint in list_waypoints:
# this is needed because something we are linking to
# the beggining of an intersection, therefore the
# variation in angle is small
next_next_waypoint = next_waypoint.next(3.0)[0]
link = _compute_connection(current_waypoint, next_next_waypoint)
options.append(link)
return options
def _compute_connection(current_waypoint, next_waypoint, threshold=35):
"""
Compute the type of topological connection between an active waypoint (current_waypoint) and a target waypoint
(next_waypoint).
:param current_waypoint: active waypoint
:param next_waypoint: target waypoint
:return: the type of topological connection encoded as a RoadOption enum:
RoadOption.STRAIGHT
RoadOption.LEFT
RoadOption.RIGHT
"""
n = next_waypoint.transform.rotation.yaw
n = n % 360.0
c = current_waypoint.transform.rotation.yaw
c = c % 360.0
diff_angle = (n - c) % 180.0
if diff_angle < threshold or diff_angle > (180 - threshold):
return RoadOption.STRAIGHT
elif diff_angle > 90.0:
return RoadOption.LEFT
else:
return RoadOption.RIGHT