Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lane error penalty #2030

Merged
merged 4 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ Copy and pasting the git commit messages is __NOT__ enough.
- `EntryTactic` derived classes now contain `start_time`.
- `info` returned by `hiway-v1` in `reset()` and `step()` methods are unified.
- Changed instances of `hiway-v0` and `gym` to use `hiway-v1` and `gymnasium`, respectively.
- `RoadMap.Route` now optionally stores the start and end lanes of the route.
- `DistToDestination` metric now adds lane error penalty when agent terminates in different lane but same road as the goal position.
### Deprecated
- `visdom` is set to be removed from the SMARTS object parameters.
- Deprecated `start_time` on missions.
Expand Down
42 changes: 24 additions & 18 deletions smarts/core/argoverse_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,10 +771,16 @@ def road_by_id(self, road_id: str) -> RoadMap.Road:
class Route(RouteWithCache):
"""Describes a route between `Argoverse` roads."""

def __init__(self, road_map):
super().__init__(road_map)
self._roads = []
self._length = 0
def __init__(
self,
road_map: RoadMap,
roads: List[RoadMap.Road] = [],
start_lane: Optional[RoadMap.Lane] = None,
end_lane: Optional[RoadMap.Lane] = None,
):
super().__init__(road_map, start_lane, end_lane)
self._roads = roads
self._length = sum([road.length for road in roads])

@property
def roads(self) -> List[RoadMap.Road]:
Expand All @@ -784,11 +790,6 @@ def roads(self) -> List[RoadMap.Road]:
def road_length(self) -> float:
return self._length

def _add_road(self, road: RoadMap.Road):
"""Add a road to this route."""
self._length += road.length
self._roads.append(road)

@cached_property
def geometry(self) -> Sequence[Sequence[Tuple[float, float]]]:
return [list(road.shape(0.0).exterior.coords) for road in self.roads]
Expand Down Expand Up @@ -829,18 +830,18 @@ def _shortest_route(start: RoadMap.Road, end: RoadMap.Road) -> List[RoadMap.Road
path.reverse()
return path

def generate_routes(
def _generate_routes(
self,
start_road: RoadMap.Road,
start_lane: RoadMap.Lane,
end_road: RoadMap.Road,
via: Optional[Sequence[RoadMap.Road]] = None,
max_to_gen: int = 1,
end_lane: RoadMap.Lane,
via: Optional[Sequence[RoadMap.Road]],
max_to_gen: int,
) -> List[RoadMap.Route]:
assert (
max_to_gen == 1
), "multiple route generation not yet supported for Argoverse"
new_route = ArgoverseMap.Route(self)
result = [new_route]

roads = [start_road]
if via:
Expand All @@ -858,14 +859,19 @@ def generate_routes(
self._log.warning(
f"Unable to find valid path between {(cur_road.road_id, next_road.road_id)}."
)
return result
return [ArgoverseMap.Route(road_map=self)]
# The sub route includes the boundary roads (cur_road, next_road).
# We clip the latter to prevent duplicates
route_roads.extend(sub_route[:-1])

for road in route_roads:
new_route._add_road(road)
return result
return [
ArgoverseMap.Route(
road_map=self,
roads=route_roads,
start_lane=start_lane,
end_lane=end_lane,
)
]

def random_route(
self,
Expand Down
49 changes: 28 additions & 21 deletions smarts/core/opendrive_road_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -1473,10 +1473,16 @@ def road_with_point(self, point: Point) -> Optional[RoadMap.Road]:
class Route(RouteWithCache):
"""Describes a route between two OpenDRIVE roads."""

def __init__(self, road_map):
super().__init__(road_map)
self._roads = []
self._length = 0
def __init__(
self,
road_map: RoadMap,
roads: List[RoadMap.Road] = [],
start_lane: Optional[RoadMap.Lane] = None,
end_lane: Optional[RoadMap.Lane] = None,
):
super().__init__(road_map, start_lane, end_lane)
self._roads = roads
self._length = sum([road.length for road in roads])

@property
def roads(self) -> List[RoadMap.Road]:
Expand All @@ -1486,10 +1492,6 @@ def roads(self) -> List[RoadMap.Road]:
def road_length(self) -> float:
return self._length

def _add_road(self, road: RoadMap.Road):
self._length += road.length
self._roads.append(road)

@cached_property
def geometry(self) -> Sequence[Sequence[Tuple[float, float]]]:
return [list(road.shape(1.0).exterior.coords) for road in self.roads]
Expand Down Expand Up @@ -1529,18 +1531,18 @@ def _shortest_route(start: RoadMap.Road, end: RoadMap.Road) -> List[RoadMap.Road
path.reverse()
return path

def generate_routes(
def _generate_routes(
self,
start_road: RoadMap.Road,
start_lane: RoadMap.Lane,
end_road: RoadMap.Road,
via: Optional[Sequence[RoadMap.Road]] = None,
max_to_gen: int = 1,
end_lane: RoadMap.Lane,
via: Optional[Sequence[RoadMap.Road]],
max_to_gen: int,
) -> List[RoadMap.Route]:
assert (
max_to_gen == 1
), "multiple route generation not yet supported for OpenDRIVE"
newroute = OpenDriveRoadNetwork.Route(self)
result = [newroute]

roads = [start_road]
if via:
Expand All @@ -1558,14 +1560,19 @@ def generate_routes(
self._log.warning(
f"Unable to find valid path between {(cur_road.road_id, next_road.road_id)}."
)
return result
return [OpenDriveRoadNetwork.Route(road_map=self)]
# The sub route includes the boundary roads (cur_road, next_road).
# We clip the latter to prevent duplicates
route_roads.extend(sub_route[:-1])

for road in route_roads:
newroute._add_road(road)
return result
return [
OpenDriveRoadNetwork.Route(
road_map=self,
roads=route_roads,
start_lane=start_lane,
end_lane=end_lane,
)
]

def random_route(
self,
Expand All @@ -1575,15 +1582,15 @@ def random_route(
) -> RoadMap.Route:
""" """
assert not starting_road or not only_drivable or starting_road.is_drivable
route = OpenDriveRoadNetwork.Route(self)
next_roads = [starting_road] if starting_road else list(self._roads.values())
if only_drivable:
next_roads = [r for r in next_roads if r.is_drivable]
while next_roads and len(route.roads) < max_route_len:
route_roads = []
while next_roads and len(route_roads) < max_route_len:
cur_road = random.choice(next_roads)
route._add_road(cur_road)
route_roads.append(cur_road)
next_roads = list(cur_road.outgoing_roads)
return route
return OpenDriveRoadNetwork.Route(road_map=self,roads=route_roads)

def empty_route(self) -> RoadMap.Route:
return OpenDriveRoadNetwork.Route(self)
Expand Down
2 changes: 1 addition & 1 deletion smarts/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def create_route(self, mission: Mission, radius: Optional[float] = None):
# all nearby starting lane options are exhausted.
for start_lane, _ in start_lanes:
self._route = self._road_map.generate_routes(
start_lane.road, end_lane.road, via_roads, 1
start_lane, end_lane, via_roads, 1
)[0]
if self._route.road_length > 0:
break
Expand Down
58 changes: 48 additions & 10 deletions smarts/core/road_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from dataclasses import dataclass
from enum import IntEnum
from functools import cached_property, lru_cache
from typing import Any, List, Optional, Sequence, Set, Tuple
from typing import Any, List, Optional, Sequence, Set, Tuple, Union

import numpy as np
from shapely.geometry import LineString
Expand Down Expand Up @@ -141,25 +141,56 @@ def road_with_point(self, point: Point) -> Optional[RoadMap.Road]:

def generate_routes(
self,
start_road: RoadMap.Road,
end_road: RoadMap.Road,
start: Union["RoadMap.Road", "RoadMap.Lane"],
end: Union["RoadMap.Road", "RoadMap.Lane"],
via: Optional[Sequence[RoadMap.Road]] = None,
max_to_gen: int = 1,
) -> List[RoadMap.Route]:
"""Generates routes between two roads.
Args:
start_road:
The beginning road of the generated routes.
end_road:
The end road of the generated routes.
start:
The beginning road or lane of the generated routes.
end:
The end road or lane of the generated routes.
via:
All edges that the generated routes must pass through.
max_to_gen:
The maximum number of routes to generate.
Returns:
A list of generated routes that satisfy the given restrictions. Routes will be
returned in order of increasing length.
returned in order of increasing length.
"""
if isinstance(start, RoadMap.Lane):
start_lane = start
start_road = start.road
else:
start_lane = None
start_road = start
if isinstance(end, RoadMap.Lane):
end_lane = end
end_road = end.road
else:
end_lane = None
end_road = end

return self._generate_routes(
start_road=start_road,
start_lane=start_lane,
end_road=end_road,
end_lane=end_lane,
via=via,
max_to_gen=max_to_gen,
)

def _generate_routes(
self,
start_road: RoadMap.Road,
start_lane: RoadMap.Lane,
end_road: RoadMap.Road,
end_lane: RoadMap.Lane,
via: Optional[Sequence[RoadMap.Road]],
max_to_gen: int,
) -> List[RoadMap.Route]:
raise NotImplementedError()

def random_route(
Expand Down Expand Up @@ -647,8 +678,15 @@ def __eq__(self, other) -> bool:
"""Required for set usage; derived classes may override this."""
return self.__class__ == other.__class__ and hash(self) == hash(other)

def _add_road(self, road: RoadMap.Road):
raise NotImplementedError()
@property
def start_lane(self) -> Optional[RoadMap.Lane]:
"Route's start lane."
return None

@property
def end_lane(self) -> Optional[RoadMap.Lane]:
"Route's end lane."
return None

@property
def roads(self) -> List[RoadMap.Road]:
Expand Down
45 changes: 32 additions & 13 deletions smarts/core/route_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,16 @@ class _LaneContinuation:
class RouteWithCache(RoadMap.Route):
"""A cache for commonly-needed but expensive-to-compute information about RoadMap.Routes."""

def __init__(self, road_map: RoadMap):
def __init__(
self,
road_map: RoadMap,
start_lane: Optional[RoadMap.Lane] = None,
end_lane: Optional[RoadMap.Lane] = None,
):
self._map = road_map
self._logger = logging.getLogger(self.__class__.__name__)
self._start_lane: Optional[RoadMap.Lane] = start_lane
self._end_lane: Optional[RoadMap.Lane] = end_lane

def __hash__(self) -> int:
key: int = self._cache_key # pytype: disable=annotation-type-mismatch
Expand All @@ -59,46 +66,58 @@ def __hash__(self) -> int:
def __eq__(self, other) -> bool:
return self.__class__ == other.__class__ and hash(self) == hash(other)

def _add_road(self, road: RoadMap.Road):
raise NotImplementedError()
@property
def start_lane(self) -> Optional[RoadMap.Lane]:
"Route's start lane."
return self._start_lane

@property
def end_lane(self) -> Optional[RoadMap.Lane]:
"Route's end lane."
return self._end_lane

@cached_property
def road_ids(self) -> List[str]:
"""Get the road IDs for this route.

Returns:
(List[str]): A list of the road IDs for the Roads in this Route."""
return [road.road_id for road in self.roads]

@classmethod
@staticmethod
def from_road_ids(
cls,
road_map: RoadMap,
road_map,
road_ids: Sequence[str],
resolve_intermediaries: bool = False,
) -> "RouteWithCache":
) -> RoadMap.Route:
"""Factory to generate a new RouteWithCache from a sequence of road ids."""

if len(road_ids) > 0 and resolve_intermediaries:
via_roads = [road_map.road_by_id(r) for r in road_ids[1:-1]]
routes = road_map.generate_routes(
start_road=road_map.road_by_id(road_ids[0]),
end_road=road_map.road_by_id(road_ids[-1]),
start=road_map.road_by_id(road_ids[0]),
end=road_map.road_by_id(road_ids[-1]),
via=via_roads,
max_to_gen=1,
)
if len(routes) > 0:
return routes[0]

route = cls(road_map)
route_roads = []
for road_id in road_ids:
road = road_map.road_by_id(road_id)
assert road, f"cannot add unknown road {road_id} to route"
route._add_road(road)
return route
route_roads.append(road)

return road_map.Route(road_map=road_map, roads=route_roads)

@cached_property
def _cache_key(self) -> _RouteKey:
return hash(tuple(road.road_id for road in self.roads)) ^ hash(self._map)
return (
hash(tuple(road.road_id for road in self.roads))
^ hash(self._map)
^ hash((self.start_lane, self.end_lane))
)

@property
def is_cached(self) -> bool:
Expand Down
Loading