Skip to content

Commit

Permalink
Performance metrics - normalized, recomputed, overall score (#1772)
Browse files Browse the repository at this point in the history
  • Loading branch information
Adaickalavan authored Jan 5, 2023
1 parent 4d94867 commit 14149b5
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 146 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ rsa==4.9
Rtree==1.0.1
scipy==1.7.3
sh==1.14.3
shapely==2.0.0
Shapely==1.8.5.post1
six==1.16.0
soupsieve==2.3.2.post1
tableprint==0.9.1
Expand Down
6 changes: 1 addition & 5 deletions smarts/core/lanepoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
# to allow for typing to refer to class being defined (LinkedLanePoint)
from __future__ import annotations

import math
import queue
import warnings
from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
from typing import List, NamedTuple, Optional, Sequence, Tuple
from typing import List, NamedTuple, Sequence, Tuple

import numpy as np
from scipy.spatial import KDTree
Expand All @@ -36,9 +34,7 @@
from smarts.core.road_map import RoadMap
from smarts.core.utils.math import (
fast_quaternion_from_angle,
lerp,
squared_dist,
vec_2d,
vec_to_radians,
)

Expand Down
47 changes: 24 additions & 23 deletions smarts/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,15 @@ def road_map(self) -> RoadMap:
"""The road map this plan is relative to."""
return self._road_map

def create_route(self, mission: Mission):
def create_route(self, mission: Mission, radius: Optional[float] = None):
"""Generates a route that conforms to a mission.
Args:
mission (Mission):
A mission the agent should follow. Defaults to endless if `None`.
radius (Optional[float]):
Radius (meter) to find the nearest starting lane for the given
mission. Defaults to `_default_lane_width` of the underlying
road_map.
"""
assert not self._route, "already called create_route()"
self._mission = mission or Mission.random_endless_mission(self._road_map)
Expand All @@ -335,45 +339,42 @@ def create_route(self, mission: Mission):

assert isinstance(self._mission.goal, PositionalGoal)

start_lane = self._road_map.nearest_lane(
start_lanes = self._road_map.nearest_lanes(
self._mission.start.point,
include_junctions=False,
include_junctions=True,
radius=radius,
)

if not start_lane:
# it's possible that the Mission's start point wasn't explicitly
# specified by a user, but rather determined during the scenario run
# from the current position of a vehicle, in which case it may be
# in a junction. But we only allow this if the previous query fails.
start_lane = self._road_map.nearest_lane(
self._mission.start.point,
include_junctions=True,
)
if start_lane is None:
if not start_lanes:
self._mission = Mission.endless_mission(Pose.origin())
raise PlanningError("Cannot find start lane. Route must start in a lane.")
start_road = start_lane.road
raise PlanningError("Starting lane not found. Route must start in a lane.")

via_roads = [self._road_map.road_by_id(via) for via in self._mission.route_vias]

end_lane = self._road_map.nearest_lane(
self._mission.goal.position,
include_junctions=False,
)
assert end_lane is not None, "route must end in a lane"
end_road = end_lane.road

via_roads = [self._road_map.road_by_id(via) for via in self._mission.route_vias]

self._route = self._road_map.generate_routes(
start_road, end_road, via_roads, 1
)[0]
# When an agent is in an intersection, the `nearest_lanes` method might
# not return the correct road as the first choice. Hence, nearest
# starting lanes are tried in sequence until a route is found or until
# all nearby starting lane options are exhausted.
for start_lane, _ in start_lanes:
self._route = self._road_map.generate_routes(
start_lane.road, end_lane.road, via_roads, 1
)[0]
if self._route.road_length > 0:
break

if len(self._route.roads) == 0:
self._mission = Mission.endless_mission(Pose.origin())
start_road_ids = [start_lane.road.road_id for start_lane, _ in start_lanes]
raise PlanningError(
"Unable to find a route between start={} and end={}. If either of "
"these are junctions (not well supported today) please switch to "
"roads and ensure there is a > 0 offset into the road if it is "
"after a junction.".format(start_road.road_id, end_road.road_id)
"after a junction.".format(start_road_ids, end_lane.road.road_id)
)

return
9 changes: 6 additions & 3 deletions smarts/core/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
Optional,
Sequence,
Tuple,
Union,
)

import cloudpickle
Expand Down Expand Up @@ -570,7 +571,7 @@ def get_vehicle_start_at_time(
Start(
np.array([pos_x + hhx, pos_y + hhy]),
Heading(heading),
from_front_bumper=False,
from_front_bumper=True,
),
speed,
)
Expand Down Expand Up @@ -717,7 +718,7 @@ def _extract_mission(mission, road_map):
the corresponding SMARTS mission types.
"""

def resolve_offset(offset, lane_length):
def resolve_offset(offset: Union[str, float], lane_length: float):
# epsilon to ensure we are within this edge and not the subsequent one
epsilon = 1e-6
lane_length -= epsilon
Expand All @@ -730,7 +731,9 @@ def resolve_offset(offset, lane_length):
else:
return float(offset)

def to_position_and_heading(road_id, lane_index, offset, road_map):
def to_position_and_heading(
road_id: str, lane_index: int, offset: Union[str, float], road_map: RoadMap
):
road = road_map.road_by_id(road_id)
lane = road.lane_at_index(lane_index)
offset = resolve_offset(offset, lane.length)
Expand Down
10 changes: 9 additions & 1 deletion smarts/env/hiway_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ def scenario_log(self) -> Dict[str, Union[float, str]]:
"mission_hash": str(hash(frozenset(scenario.missions.items()))),
}

@property
def scenario(self) -> Scenario:
"""Returns underlying scenario.
Returns:
Scenario: Current simulated scenario.
"""
return self._smarts.scenario

def seed(self, seed: int) -> int:
"""Sets random number generator seed number.
Expand Down Expand Up @@ -273,4 +282,3 @@ def close(self):
"""Closes the environment and releases all resources."""
if self._smarts is not None:
self._smarts.destroy()
self._smarts = None
155 changes: 155 additions & 0 deletions smarts/env/wrappers/metric/completion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright (C) 2022. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import logging
from dataclasses import dataclass
from typing import Callable

from smarts.core.coordinates import Heading, Point
from smarts.core.plan import Mission, Plan, PlanningError, PositionalGoal, Start
from smarts.core.road_map import RoadMap
from smarts.core.sensors import Observation
from smarts.core.utils.math import running_mean

logger = logging.getLogger(__file__)


@dataclass(frozen=True)
class Completion:
"""Parameters to compute the percentage of scenario tasks completed."""

dist_remainder: float = 0
"""Shortest road distance between current position and goal position.
"""
dist_tot: float = 0
"""Shortest route distance between start position and goal position.
"""


class CompletionError(Exception):
"""Raised when computation of `Completion` metric fails."""

pass


def get_dist(road_map: RoadMap, point_a: Point, point_b: Point) -> float:
"""
Computes the shortest route distance from point_a to point_b in the road
map. If no routes are available from point_a to point_b, then distance of
the shortest road length from point_b to point_a is instead computed. Both
points should lie on a road in the road map.
Args:
road_map (RoadMap): Scenario road map.
point_a (Point): A point, in world-map coordinates, which lies on a road.
point_b (Point): A point, in world-map coordinates, which lies on a road.
Returns:
float: Shortest road distance between two points in the road map.
"""

def _get_dist(start: Point, end: Point) -> float:
mission = Mission(
start=Start(
position=start.as_np_array,
heading=Heading(0),
from_front_bumper=False,
),
goal=PositionalGoal(
position=end,
radius=3,
),
)
plan = Plan(road_map=road_map, mission=mission, find_route=False)
plan.create_route(mission=mission, radius=5)
from_route_point = RoadMap.Route.RoutePoint(pt=start)
to_route_point = RoadMap.Route.RoutePoint(pt=end)

dist_tot = plan.route.distance_between(
start=from_route_point, end=to_route_point
)
if dist_tot == None:
raise CompletionError("Unable to find road on route near given points.")
elif dist_tot < 0:
raise CompletionError(
"Path from start point to end point flows in "
"the opposite direction of the generated route."
)

return dist_tot

try:
dist_tot = _get_dist(point_a, point_b)
except PlanningError as e:
if e.args[0].startswith("Unable to find a route"):
# Vehicle might end (i) in a dead-end, (ii) in a one-way road, or
# (iii) in a road without u-turn, causing the route planner to fail.
# When there is no legal route, the road distance in the reverse
# direction is returned as the distance between point_a and point_b.
# Thus, proceed to find a walkable route in the reverse direction.
dist_tot = _get_dist(point_b, point_a)
logger.info(
"completion.get dist(): Did not find a route from "
"%s to %s, instead found a reversed route from %s to %s.",
point_a,
point_b,
point_b,
point_a,
)

return dist_tot


def _dist_remainder():
mean: float = 0
step: int = 0

def func(
road_map: RoadMap, obs: Observation, initial_compl: Completion
) -> Completion:
nonlocal mean, step

if obs.events.reached_goal:
dist = 0
else:
cur_pos = Point(*obs.ego_vehicle_state.position)
# pytype: disable=attribute-error
goal_pos = obs.ego_vehicle_state.mission.goal.position
# pytype: enable=attribute-error
dist = get_dist(road_map=road_map, point_a=cur_pos, point_b=goal_pos)

# Cap remainder distance
c_dist = min(dist, initial_compl.dist_tot)

mean, step = running_mean(prev_mean=mean, prev_step=step, new_val=c_dist)
return Completion(dist_remainder=mean)

return func


@dataclass(frozen=True)
class CompletionFuncs:
"""Functions to compute scenario completion metrics. Each function computes
the running mean completion value over number of episodes, for a given
scenario."""

# fmt: off
dist_remainder: Callable[[RoadMap, Observation, Completion], Completion] = _dist_remainder()
# fmt: on
Loading

0 comments on commit 14149b5

Please sign in to comment.